package core import ( "errors" "fmt" "git.noahlan.cn/noahlan/nnet/config" conn2 "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/lifetime" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/pipeline" "git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/nnet/serialize" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "git.noahlan.cn/noahlan/ntools-go/core/pool" "github.com/gorilla/websocket" "log" "net" "net/http" "strings" "time" ) func NotFound(conn entity.NetworkEntity, _ packet.IPacket) { nlog.Error("handler not found") _ = conn.SendBytes([]byte("handler not found")) } func NotFoundHandler() Handler { return HandlerFunc(NotFound) } type ( // engine TCP-engine engine struct { conf config.EngineConf // conf 配置 taskTimerPrecision time.Duration middlewares []Middleware // 中间件 routes []Route // 路由 // handler 消息处理器 handler Handler // dieChan 应用程序退出信号 dieChan chan struct{} pipeline pipeline.Pipeline // 消息管道 lifetime *lifetime.Mgr // 连接的生命周期管理器 packerFn packet.NewPackerFunc // 封包、拆包器 serializer serialize.Serializer // 消息 序列化/反序列化 wsOpt wsOptions // websocket connManager *conn2.Manager sessIdMgr *sessionIDMgr } wsOptions struct { IsWebsocket bool // 是否为websocket服务端 WebsocketPath string // ws地址(ws://127.0.0.1/WebsocketPath) TLSCertificate string // TLS 证书地址 (websocket) TLSKey string // TLS 证书key地址 (websocket) CheckOrigin func(*http.Request) bool // check origin } ) func newEngine(conf config.EngineConf) *engine { s := &engine{ conf: conf, dieChan: make(chan struct{}), pipeline: pipeline.New(), middlewares: make([]Middleware, 0), routes: make([]Route, 0), taskTimerPrecision: conf.TaskTimerPrecision, connManager: conn2.NewManager(), sessIdMgr: newSessionIDMgr(), lifetime: lifetime.NewLifetime(), } pool.InitPool(conf.Pool) return s } func (ng *engine) shallLogDebug() bool { return config.ShallLogDebug(ng.conf.Mode) } func (ng *engine) logPrefix() string { return fmt.Sprintf("[NNet-%s]", ng.conf.Name) } func (ng *engine) use(middleware ...Middleware) { ng.middlewares = append(ng.middlewares, middleware...) } func (ng *engine) addRoutes(route ...Route) { ng.routes = append(ng.routes, route...) } func (ng *engine) bindRoutes(router Router) error { for _, fr := range ng.routes { if err := ng.bindRoute(router, fr); err != nil { return err } } return nil } func (ng *engine) bindRoute(router Router, route Route) error { // TODO default middleware chain := newChain() // build chain for _, middleware := range ng.middlewares { chain.Append(convertMiddleware(middleware)) } return router.Register(route.Matches, route.Handler) } func convertMiddleware(ware Middleware) func(Handler) Handler { return func(next Handler) Handler { return ware(next.Handle) } } func (ng *engine) dial(addr string, router Router) (entity.NetworkEntity, error) { ng.handler = router if err := ng.bindRoutes(router); err != nil { return nil, err } go scheduler.Schedule(ng.taskTimerPrecision) // connection conn, err := net.Dial("tcp", addr) nlog.Must(err) c := newConnection(ng, conn) c.serve() // hook ng.lifetime.Open(c) // connection manager err = ng.connManager.Store(conn2.DefaultGroupName, c) nlog.Must(err) // 连接成功,客户端已启动 if ng.shallLogDebug() { nlog.Debugf("now connect to %s.", addr) } return c, nil } func (ng *engine) serve(router Router) error { ng.handler = router if err := ng.bindRoutes(router); err != nil { return err } go scheduler.Schedule(ng.taskTimerPrecision) defer func() { nlog.Infof("%s is stopping...", ng.logPrefix()) ng.shutdown() scheduler.Close() }() if ng.wsOpt.IsWebsocket { if len(ng.wsOpt.TLSCertificate) != 0 && len(ng.wsOpt.TLSKey) != 0 { ng.listenAndServeWSTLS() } else { ng.listenAndServeWS() } } else { ng.listenAndServe() } return nil } func (ng *engine) close() { close(ng.dieChan) } func (ng *engine) shutdown() { } func (ng *engine) listenAndServe() { listener, err := net.Listen(ng.conf.Protocol, ng.conf.Addr) nlog.Must(err) // 监听成功,服务已启动 if ng.shallLogDebug() { nlog.Debugf("%s now listening %s at %s.", ng.logPrefix(), ng.conf.Protocol, ng.conf.Addr) } defer func() { _ = listener.Close() ng.close() }() for { conn, err := listener.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { nlog.Errorf("%s 服务器网络错误 %+v", ng.logPrefix(), err) return } nlog.Errorf("%s 监听错误 %v", ng.logPrefix(), err) continue } err = pool.Submit(func() { ng.handle(conn) }) if err != nil { nlog.Errorf("%s submit conn pool err: %ng", ng.logPrefix(), err.Error()) continue } } } func (ng *engine) listenAndServeWS() { ng.setupWS() if ng.shallLogDebug() { nlog.Debugf("%s now listening websocket at %s.", ng.logPrefix(), ng.conf.Addr) } if err := http.ListenAndServe(ng.conf.Addr, nil); err != nil { log.Fatal(err.Error()) } } func (ng *engine) listenAndServeWSTLS() { ng.setupWS() if ng.shallLogDebug() { nlog.Debugf("%s now listening websocket with tls at %s.", ng.logPrefix(), ng.conf.Addr) } if err := http.ListenAndServeTLS(ng.conf.Addr, ng.wsOpt.TLSCertificate, ng.wsOpt.TLSKey, nil); err != nil { log.Fatal(err.Error()) } } func (ng *engine) setupWS() { upgrade := websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: ng.wsOpt.CheckOrigin, } http.HandleFunc("/"+strings.TrimPrefix(ng.wsOpt.WebsocketPath, "/"), func(w http.ResponseWriter, r *http.Request) { conn, err := upgrade.Upgrade(w, r, nil) if err != nil { nlog.Errorf("%s Upgrade failure, URI=%ng, Error=%ng", ng.logPrefix(), r.RequestURI, err.Error()) return } err = pool.Submit(func() { ng.handleWS(conn) }) if err != nil { log.Fatalf("%s submit conn pool err: %v", ng.logPrefix(), err.Error()) } }) } func (ng *engine) handleWS(conn *websocket.Conn) { c := newWSConn(conn) ng.handle(c) } func (ng *engine) handle(conn net.Conn) { c := newConnection(ng, conn) err := ng.connManager.Store(conn2.DefaultGroupName, c) nlog.Must(err) c.serve() // hook ng.lifetime.Open(c) } func (ng *engine) notFoundHandler(next Handler) Handler { return HandlerFunc(func(entity entity.NetworkEntity, packet packet.IPacket) { h := next if next == nil { h = NotFoundHandler() } // TODO write to client h.Handle(entity, packet) }) }