package nnet import ( "git.noahlan.cn/noahlan/nnet/config" "git.noahlan.cn/noahlan/nnet/connection" "git.noahlan.cn/noahlan/nnet/lifetime" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/nnet/serialize" "git.noahlan.cn/noahlan/nnet/session" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/panjf2000/ants/v2" "math" "net" ) // Engine 引擎 type Engine struct { config.EngineConf // 引擎配置 middlewares []rt.Middleware // 中间件 routes []rt.Route // 路由 router rt.Router // 消息处理器 dieChan chan struct{} // 应用程序退出信号 pipeline connection.Pipeline // 消息管道 packerBuilder packet.PackerBuilder // 封包、拆包器 serializer serialize.Serializer // 消息 序列化/反序列化 goPool *ants.Pool // goroutine池 connManager *connection.Manager // 连接管理器 lifetime *lifetime.Mgr // 生命周期 sessIdMgr *session.IDMgr // SessionId管理器 } func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine { ngin := &Engine{ EngineConf: conf, middlewares: make([]rt.Middleware, 0), routes: make([]rt.Route, 0), router: rt.NewDefaultRouter(), packerBuilder: nil, serializer: nil, dieChan: make(chan struct{}), pipeline: connection.NewPipeline(), connManager: connection.NewManager(), lifetime: lifetime.NewLifetime(), sessIdMgr: session.NewSessionIDMgr(), goPool: nil, } for _, opt := range opts { opt(ngin) } if ngin.goPool == nil { ngin.goPool, _ = ants.NewPool(math.MaxInt32) } return ngin } func (ngin *Engine) Use(middleware ...rt.Middleware) { ngin.middlewares = append(ngin.middlewares, middleware...) } func (ngin *Engine) AddRoutes(rs ...rt.Route) { ngin.routes = append(ngin.routes, rs...) err := ngin.bindRoutes() nlog.Must(err) } func (ngin *Engine) bindRoutes() error { for _, fr := range ngin.routes { if err := ngin.bindRoute(fr); err != nil { return err } } return nil } func (ngin *Engine) bindRoute(route rt.Route) error { // TODO default middleware chain := rt.NewChain() // build chain for _, middleware := range ngin.middlewares { chain.Append(rt.ConvertMiddleware(middleware)) } return ngin.router.Register(route.Matches, route.Handler) } func (ngin *Engine) setup() error { if err := ngin.bindRoutes(); err != nil { return err } if err := ngin.goPool.Submit(func() { scheduler.Schedule(ngin.TaskTimerPrecision) }); err != nil { return err } return nil } func (ngin *Engine) Stop() { nlog.Infof("%s is stopping...", ngin.LogPrefix()) close(ngin.dieChan) scheduler.Close() } func (ngin *Engine) handle(conn net.Conn) *connection.Connection { nc := connection.NewConnection( ngin.sessIdMgr.SessionID(), conn, ngin.goPool, connection.Config{LogDebug: ngin.ShallLogDebug(), LogPrefix: ngin.LogPrefix()}, ngin.packerBuilder, ngin.serializer, ngin.pipeline, ngin.router.Handle, ) nc.Serve() err := ngin.connManager.Store(connection.DefaultGroupName, nc) nlog.Must(err) // dieChan go func() { // lifetime ngin.lifetime.Open(nc) select { case <-nc.DieChan(): scheduler.PushTask(func() { ngin.lifetime.Close(nc) }) _ = ngin.connManager.Remove(nc) } }() return nc }