package core import ( "errors" "git.noahlan.cn/noahlan/nnet/config" conn2 "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/pipeline" "git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/nnet/serialize" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "git.noahlan.cn/noahlan/ntools-go/core/pool" "github.com/gorilla/websocket" "log" "net" "net/http" "os" "os/signal" "strings" "syscall" "time" ) func NotFound(conn entity.NetworkEntity, _ packet.IPacket) { nlog.Error("handler not found") _ = conn.SendBytes([]byte("handler not found")) } func NotFoundHandler() Handler { return HandlerFunc(NotFound) } type ( // engine TCP-engine engine struct { conf config.EngineConf // conf 配置 middlewares []Middleware // 中间件 routes []Route // 路由 // handler 消息处理器 handler Handler // dieChan 应用程序退出信号 dieChan chan struct{} pipeline pipeline.Pipeline // 消息管道 packerFn packet.NewPackerFunc // 封包、拆包器 serializer serialize.Serializer // 消息 序列化/反序列化 retryInterval time.Duration // 消息重试间隔时长 wsOpt wsOptions // websocket } wsOptions struct { IsWebsocket bool // 是否为websocket服务端 WebsocketPath string // ws地址(ws://127.0.0.1/WebsocketPath) TLSCertificate string // TLS 证书地址 (websocket) TLSKey string // TLS 证书key地址 (websocket) CheckOrigin func(*http.Request) bool // check origin } ) func newEngine(conf config.EngineConf) *engine { s := &engine{ conf: conf, dieChan: make(chan struct{}), pipeline: pipeline.New(), middlewares: make([]Middleware, 0), routes: make([]Route, 0), } pool.InitPool(conf.Pool) return s } func (ng *engine) use(middleware ...Middleware) { ng.middlewares = append(ng.middlewares, middleware...) } func (ng *engine) addRoutes(route ...Route) { ng.routes = append(ng.routes, route...) } func (ng *engine) bindRoutes(router Router) error { for _, fr := range ng.routes { if err := ng.bindRoute(router, fr); err != nil { return err } } return nil } func (ng *engine) bindRoute(router Router, route Route) error { // TODO default middleware chain := newChain() // build chain for _, middleware := range ng.middlewares { chain.Append(convertMiddleware(middleware)) } return router.Register(route.Matches, route.Handler) } func convertMiddleware(ware Middleware) func(Handler) Handler { return func(next Handler) Handler { return ware(next.Handle) } } func (ng *engine) serve(router Router) error { ng.handler = router if err := ng.bindRoutes(router); err != nil { return err } go scheduler.Schedule() defer func() { nlog.Info("Server is stopping...") ng.shutdown() scheduler.Close() }() if ng.wsOpt.IsWebsocket { if len(ng.wsOpt.TLSCertificate) != 0 && len(ng.wsOpt.TLSKey) != 0 { ng.listenAndServeWSTLS() } else { ng.listenAndServeWS() } } else { ng.listenAndServe() } sg := make(chan os.Signal) signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM) select { case <-ng.dieChan: nlog.Info("Server will shutdown in a few seconds") case s := <-sg: nlog.Infof("Server got signal: %s", s) } return nil } func (ng *engine) close() { close(ng.dieChan) } func (ng *engine) shutdown() { } func (ng *engine) listenAndServe() { listener, err := net.Listen(ng.conf.Protocol, ng.conf.Addr) if err != nil { panic(err) } // 监听成功,服务已启动 nlog.Infof("now listening %s on %s.", ng.conf.Protocol, ng.conf.Addr) defer func() { _ = listener.Close() ng.shutdown() ng.close() }() for { conn, err := listener.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { nlog.Errorf("服务器网络错误 %+v", err) return } nlog.Errorf("监听错误 %v", err) continue } err = pool.Submit(func() { ng.handle(conn) }) if err != nil { nlog.Errorf("submit conn pool err: %ng", err.Error()) continue } } } func (ng *engine) listenAndServeWS() { ng.setupWS() if err := http.ListenAndServe(ng.conf.Addr, nil); err != nil { log.Fatal(err.Error()) } } func (ng *engine) listenAndServeWSTLS() { ng.setupWS() if err := http.ListenAndServeTLS(ng.conf.Addr, ng.wsOpt.TLSCertificate, ng.wsOpt.TLSKey, nil); err != nil { log.Fatal(err.Error()) } } func (ng *engine) setupWS() { upgrade := websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: ng.wsOpt.CheckOrigin, } http.HandleFunc("/"+strings.TrimPrefix(ng.wsOpt.WebsocketPath, "/"), func(w http.ResponseWriter, r *http.Request) { conn, err := upgrade.Upgrade(w, r, nil) if err != nil { nlog.Errorf("Upgrade failure, URI=%ng, Error=%ng", r.RequestURI, err.Error()) return } err = pool.Submit(func() { ng.handleWS(conn) }) if err != nil { log.Fatalf("submit conn pool err: %ng", err.Error()) } }) } func (ng *engine) handleWS(conn *websocket.Conn) { c := newWSConn(conn) ng.handle(c) } func (ng *engine) handle(conn net.Conn) { c := newConnection(ng, conn) conn2.ConnManager.Store(conn2.DefaultGroupName, c) c.serve() // hook Lifetime.Open(c) } func (ng *engine) notFoundHandler(next Handler) Handler { return HandlerFunc(func(entity entity.NetworkEntity, packet packet.IPacket) { h := next if next == nil { h = NotFoundHandler() } // TODO write to client h.Handle(entity, packet) }) }