diff --git a/engine.go b/engine.go index 6a71bb8..caa38eb 100644 --- a/engine.go +++ b/engine.go @@ -38,7 +38,6 @@ func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine { router: rt.NewDefaultRouter(), dieChan: make(chan struct{}), connMgr: conn.NewConnManager(), - evtMgr: event.NewEventManager(), sessIdMgr: session.NewSessionIDMgr(), packerBuilder: func() packet.Packer { return nil @@ -53,9 +52,15 @@ func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine { ngin.pool, _ = ants.NewPool(math.MaxInt32) } + ngin.evtMgr = event.NewEventManager(ngin.pool) + return ngin } +func (ngin *Engine) Pool() *ants.Pool { + return ngin.pool +} + func (ngin *Engine) Use(middleware ...rt.Middleware) { ngin.middlewares = append(ngin.middlewares, middleware...) } diff --git a/event/event.go b/event/event.go index 882c207..f9cd987 100644 --- a/event/event.go +++ b/event/event.go @@ -5,6 +5,7 @@ import ( "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntool/nlog" + "github.com/panjf2000/ants/v2" ) var ErrEventTypeIllegal = errors.New("EventType illegal") @@ -64,6 +65,8 @@ type ( } eventManager struct { + pool *ants.Pool + onConnected []OnConnectedFn onConnectError []OnConnectErrorFn onDisconnected []OnDisconnectedFn @@ -79,8 +82,9 @@ type ( ///////////////// type-align var _ Event = (*eventManager)(nil) -func NewEventManager() EventManager { +func NewEventManager(pool *ants.Pool) EventManager { return &eventManager{ + pool: pool, onConnected: make([]OnConnectedFn, 0), onConnectError: make([]OnConnectErrorFn, 0), onDisconnected: make([]OnDisconnectedFn, 0), @@ -224,7 +228,9 @@ func (m *eventManager) OnConnected(nc *conn.Connection) { return } for _, fn := range m.onConnected { - fn(nc) + _ = m.pool.Submit(func() { + fn(nc) + }) } } @@ -233,7 +239,9 @@ func (m *eventManager) OnConnectError(err error) { return } for _, fn := range m.onConnectError { - fn(err) + _ = m.pool.Submit(func() { + fn(err) + }) } } @@ -242,7 +250,9 @@ func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) { return } for _, fn := range m.onDisconnected { - fn(nc, err) + _ = m.pool.Submit(func() { + fn(nc, err) + }) } } @@ -251,7 +261,9 @@ func (m *eventManager) OnClose(nc *conn.Connection) { return } for _, fn := range m.onClose { - fn(nc) + _ = m.pool.Submit(func() { + fn(nc) + }) } } @@ -260,7 +272,9 @@ func (m *eventManager) OnSend(nc *conn.Connection, v any) { return } for _, fn := range m.onSend { - fn(nc, v) + _ = m.pool.Submit(func() { + fn(nc, v) + }) } } @@ -269,7 +283,9 @@ func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) { return } for _, fn := range m.onSendError { - fn(nc, v, err) + _ = m.pool.Submit(func() { + fn(nc, v, err) + }) } } @@ -278,7 +294,9 @@ func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) { return } for _, fn := range m.onReceive { - fn(nc, p) + _ = m.pool.Submit(func() { + fn(nc, p) + }) } } @@ -287,6 +305,8 @@ func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) { return } for _, fn := range m.onReceiveError { - fn(nc, err) + _ = m.pool.Submit(func() { + fn(nc, err) + }) } }