package nnet import ( "git.noahlan.cn/noahlan/nnet/config" "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/nnet/session" "git.noahlan.cn/noahlan/ntool/ndef" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/panjf2000/ants/v2" "math" "net" "time" ) // Engine 引擎 type Engine struct { config.EngineConf // 引擎配置 middlewares []rt.Middleware // 中间件 routes []rt.Route // 路由 router rt.Router // 消息处理器 dieChan chan struct{} // 应用程序退出信号 packerBuilder packet.PackerBuilder // 封包、拆包器 serializer ndef.Serializer // 消息 序列化/反序列化 pool *ants.Pool // goroutine池 connMgr *conn.ConnManager // 连接管理器 evtMgr event.EventManager // 事件管理器 sessIdMgr *session.IDMgr // SessionId管理器 } func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine { ngin := &Engine{ EngineConf: conf, middlewares: make([]rt.Middleware, 0), routes: make([]rt.Route, 0), router: rt.NewDefaultRouter(), dieChan: make(chan struct{}), connMgr: conn.NewConnManager(), evtMgr: event.NewEventManager(), sessIdMgr: session.NewSessionIDMgr(), packerBuilder: func() packet.Packer { return nil }, } for _, opt := range opts { opt(ngin) } if ngin.EngineConf.SendChannelSize == 0 { ngin.EngineConf.SendChannelSize = config.DefaultSendChannelSize } if ngin.EngineConf.WriteChannelSize == 0 { ngin.EngineConf.WriteChannelSize = config.DefaultWriteChannelSize } if ngin.pool == nil { ngin.pool, _ = ants.NewPool(math.MaxInt32) } return ngin } func (ngin *Engine) Pool() *ants.Pool { return ngin.pool } func (ngin *Engine) Use(middleware ...rt.Middleware) { ngin.middlewares = append(ngin.middlewares, middleware...) } func (ngin *Engine) AddRoutes(rs ...rt.Route) { ngin.routes = append(ngin.routes, rs...) err := ngin.bindRoutes() nlog.Must(err) } func (ngin *Engine) bindRoutes() error { for _, fr := range ngin.routes { if err := ngin.bindRoute(fr); err != nil { return err } } return nil } func (ngin *Engine) bindRoute(route rt.Route) error { // TODO default middleware chain := rt.NewChain() // build chain for _, middleware := range ngin.middlewares { chain.Append(rt.ConvertMiddleware(middleware)) } return ngin.router.Register(route.Matches, route.Handler) } func (ngin *Engine) setup() error { if err := ngin.bindRoutes(); err != nil { return err } return nil } func (ngin *Engine) Stop() { nlog.Infof("%s server is stopping...", ngin.LogPrefix()) ngin.connMgr.PeekConn(func(_ int64, c *conn.Connection) bool { _ = c.Close() return false }) close(ngin.dieChan) } func (ngin *Engine) handle(rawC net.Conn) *conn.Connection { nc := conn.NewConnection(ngin.sessIdMgr.SessionID(), rawC, ngin.SendChannelSize, ngin.WriteChannelSize) ngin.evtMgr.OnConnected(nc) ngin.serveConn(nc, ngin.packerBuilder()) err := ngin.connMgr.Store(conn.DefaultGroupName, nc) nlog.Must(err) return nc } func (ngin *Engine) serveConn(nc *conn.Connection, packer packet.Packer) { _ = ngin.pool.Submit(func() { ngin.readLoop(nc, packer) }) _ = ngin.pool.Submit(func() { ngin.writeLoop(nc, packer) }) _ = ngin.pool.Submit(func() { select { case <-nc.ChDie(): if ngin.ShallLogDebug() { nlog.Debugf("%s Close connection, ID=%d, Remote=%s", ngin.LogPrefix(), nc.ID(), nc.Conn().RemoteAddr().String()) } _ = ngin.connMgr.Remove(nc) ngin.evtMgr.OnClose(nc) } }) } func (ngin *Engine) readLoop(nc *conn.Connection, packer packet.Packer) { defer func() { _ = nc.Close() //if ngin.ShallLogDebug() { // nlog.Debugf("%s [readLoop] connection read goroutine exit, ID=%d, UID=%s, Remote=%s", // ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr()) //} }() buf := make([]byte, 4096) for { select { case <-nc.ChDie(): // connection close signal return default: if ngin.Deadline != 0 { _ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline)) } if ngin.ReadDeadline != 0 { _ = nc.Conn().SetReadDeadline(time.Now().Add(ngin.ReadDeadline)) } var ( err error n int msgTyp int ) // 兼容websocket if nc.Type() == conn.ConnTypeWS { var bb []byte if msgTyp, bb, err = nc.WsConn().ReadMessage(); err == nil { copy(buf, bb) n = len(bb) } } else { n, err = nc.Conn().Read(buf) } if err != nil { ngin.evtMgr.OnDisconnected(nc, err) // TODO 断线重连 (仅限客户端) nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately", ngin.LogPrefix(), err.Error()) return } if n == 0 { ngin.evtMgr.OnReceiveError(nc, conn.ErrReceiveZero) nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately", ngin.LogPrefix()) return } // 兼容websocket if nc.Type() == conn.ConnTypeWS { ngin.processPacket(nc, packet.NewWSPacket(msgTyp, buf[:n])) } else { if packer == nil { ngin.evtMgr.OnReceiveError(nc, conn.ErrNoPacker) nlog.Errorf("%s [readLoop] unexpected error: packer is nil", ngin.LogPrefix()) return } //nlog.Debugf("receive data %v", buf[:n]) // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 packets, err := packer.Unpack(buf[:n]) if err != nil { ngin.evtMgr.OnReceiveError(nc, conn.ErrUnpack) nlog.Errorf("%s unpack err: %s", ngin.LogPrefix(), err.Error()) } // packets 处理 for _, p := range packets { ngin.processPacket(nc, p) } } } } } func (ngin *Engine) writeLoop(nc *conn.Connection, packer packet.Packer) { defer func() { _ = nc.Close() //if ngin.ShallLogDebug() { // nlog.Debugf("%s [writeLoop] connection write goroutine exit, ID=%d, UID=%s, Remote=%s", // ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr()) //} }() for { select { case data := <-nc.ChSend(): // marshal packet body (data) if ngin.serializer == nil { if _, ok := data.Payload.([]byte); !ok { ngin.evtMgr.OnSendError(nc, data, conn.ErrSendPayload) nlog.Errorf("%s [writeLoop] serializer is nil, but payload type not []byte", ngin.LogPrefix()) break } } else { payload, err := ngin.serializer.Marshal(data.Payload) if err != nil { ngin.evtMgr.OnSendError(nc, data, conn.ErrSendMarshal) nlog.Errorf("%s [writeLoop] message body marshal err: %v", ngin.LogPrefix(), err) break } data.Payload = payload } // 对websocket的兼容 if nc.Type() == conn.ConnTypeWS { messageTyp, ok := data.Header.(int) if !ok { ngin.evtMgr.OnSendError(nc, data, conn.ErrSendWSType) nlog.Errorf("%s [writeLoop] websocket message type not found", ngin.LogPrefix()) break } // deadline if ngin.Deadline != 0 { _ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline)) } if ngin.WriteDeadline != 0 { _ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline)) } err := nc.WsConn().WriteMessage(messageTyp, data.Payload.([]byte)) if err != nil { ngin.evtMgr.OnSendError(nc, data, conn.ErrSend) nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err) break } // event ngin.evtMgr.OnSend(nc, data) } else { // packet pack data if packer == nil { ngin.evtMgr.OnSendError(nc, data, conn.ErrNoPacker) nlog.Errorf("%s [writeLoop] unexpected error: packer is nil", ngin.LogPrefix()) break } p, err := packer.Pack(data.Header, data.Payload.([]byte)) if err != nil { ngin.evtMgr.OnSendError(nc, data, conn.ErrPack) nlog.Errorf("%s [writeLoop] pack err: %v", ngin.LogPrefix(), err) break } nc.ChWrite() <- p } case data := <-nc.ChWrite(): // 回写数据 if ngin.Deadline != 0 { _ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline)) } if ngin.WriteDeadline != 0 { _ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline)) } if _, err := nc.Conn().Write(data); err != nil { ngin.evtMgr.OnSendError(nc, data, conn.ErrSend) nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err) break } // event ngin.evtMgr.OnSend(nc, data) //nlog.Debugf("write data %v", data) case <-nc.ChDie(): // connection close signal return } } } func (ngin *Engine) processPacket(nc *conn.Connection, p packet.IPacket) { // event ngin.evtMgr.OnReceive(nc, p) if nc.Status() == conn.StatusWorking { // 处理包消息 _ = ngin.pool.Submit(func() { ngin.router.Handle(nc, p) }) } }