diff --git a/engine.go b/engine.go index 8f20957..6be393d 100644 --- a/engine.go +++ b/engine.go @@ -38,22 +38,21 @@ func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine { router: rt.NewDefaultRouter(), dieChan: make(chan struct{}), connMgr: conn.NewConnManager(), + evtMgr: event.NewEventManager(), sessIdMgr: session.NewSessionIDMgr(), packerBuilder: func() packet.Packer { return nil }, } - if ngin.pool == nil { - ngin.pool, _ = ants.NewPool(math.MaxInt32) - } - - ngin.evtMgr = event.NewEventManager(ngin.pool) - for _, opt := range opts { opt(ngin) } + if ngin.pool == nil { + ngin.pool, _ = ants.NewPool(math.MaxInt32) + } + return ngin } diff --git a/event/event.go b/event/event.go index f9cd987..882c207 100644 --- a/event/event.go +++ b/event/event.go @@ -5,7 +5,6 @@ import ( "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntool/nlog" - "github.com/panjf2000/ants/v2" ) var ErrEventTypeIllegal = errors.New("EventType illegal") @@ -65,8 +64,6 @@ type ( } eventManager struct { - pool *ants.Pool - onConnected []OnConnectedFn onConnectError []OnConnectErrorFn onDisconnected []OnDisconnectedFn @@ -82,9 +79,8 @@ type ( ///////////////// type-align var _ Event = (*eventManager)(nil) -func NewEventManager(pool *ants.Pool) EventManager { +func NewEventManager() EventManager { return &eventManager{ - pool: pool, onConnected: make([]OnConnectedFn, 0), onConnectError: make([]OnConnectErrorFn, 0), onDisconnected: make([]OnDisconnectedFn, 0), @@ -228,9 +224,7 @@ func (m *eventManager) OnConnected(nc *conn.Connection) { return } for _, fn := range m.onConnected { - _ = m.pool.Submit(func() { - fn(nc) - }) + fn(nc) } } @@ -239,9 +233,7 @@ func (m *eventManager) OnConnectError(err error) { return } for _, fn := range m.onConnectError { - _ = m.pool.Submit(func() { - fn(err) - }) + fn(err) } } @@ -250,9 +242,7 @@ func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) { return } for _, fn := range m.onDisconnected { - _ = m.pool.Submit(func() { - fn(nc, err) - }) + fn(nc, err) } } @@ -261,9 +251,7 @@ func (m *eventManager) OnClose(nc *conn.Connection) { return } for _, fn := range m.onClose { - _ = m.pool.Submit(func() { - fn(nc) - }) + fn(nc) } } @@ -272,9 +260,7 @@ func (m *eventManager) OnSend(nc *conn.Connection, v any) { return } for _, fn := range m.onSend { - _ = m.pool.Submit(func() { - fn(nc, v) - }) + fn(nc, v) } } @@ -283,9 +269,7 @@ func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) { return } for _, fn := range m.onSendError { - _ = m.pool.Submit(func() { - fn(nc, v, err) - }) + fn(nc, v, err) } } @@ -294,9 +278,7 @@ func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) { return } for _, fn := range m.onReceive { - _ = m.pool.Submit(func() { - fn(nc, p) - }) + fn(nc, p) } } @@ -305,8 +287,6 @@ func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) { return } for _, fn := range m.onReceiveError { - _ = m.pool.Submit(func() { - fn(nc, err) - }) + fn(nc, err) } } diff --git a/middleware/heartbeat.go b/middleware/heartbeat.go index 596c000..1c13703 100644 --- a/middleware/heartbeat.go +++ b/middleware/heartbeat.go @@ -29,27 +29,29 @@ func WithHeartbeat(interval time.Duration, hbdFn func(conn *conn.Connection) []b } var startFn event.OnConnectedFn = func(nc *conn.Connection) { - ticker := time.NewTicker(m.interval) + go func() { + ticker := time.NewTicker(m.interval) - defer func() { - ticker.Stop() - }() + defer func() { + ticker.Stop() + }() - for { - select { - case <-ticker.C: - deadline := time.Now().Add(-2 * m.interval).Unix() - if atomic.LoadInt64(&m.lastAt) < deadline { - nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) - break - } - err := nc.SendBytes(m.hbdFn(nc)) - if err != nil { - nlog.Errorf("Heartbeat err: %v", err) - return + for { + select { + case <-ticker.C: + deadline := time.Now().Add(-2 * m.interval).Unix() + if atomic.LoadInt64(&m.lastAt) < deadline { + nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) + break + } + err := nc.SendBytes(m.hbdFn(nc)) + if err != nil { + nlog.Errorf("Heartbeat err: %v", err) + return + } } } - } + }() } return func(ngin *nnet.Engine) { diff --git a/middleware/heartbeat_ws.go b/middleware/heartbeat_ws.go index fba8490..b7dcec6 100644 --- a/middleware/heartbeat_ws.go +++ b/middleware/heartbeat_ws.go @@ -32,32 +32,33 @@ func WithHeartbeatWS(interval time.Duration, hbdFn WsHeartbeatFn) nnet.RunOption } var startFn event.OnConnectedFn = func(nc *conn.Connection) { - ticker := time.NewTicker(m.interval) + go func() { + ticker := time.NewTicker(m.interval) - defer func() { - ticker.Stop() - }() - - for { - select { - case <-ticker.C: - if nc.Type() != conn.ConnTypeWS { - break - } + defer func() { + ticker.Stop() + }() + for { + select { + case <-ticker.C: + if nc.Type() != conn.ConnTypeWS { + break + } - //deadline := time.Now().Add(-2 * m.interval).Unix() - //if atomic.LoadInt64(&m.lastAt) < deadline { - // nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) - // break - //} + //deadline := time.Now().Add(-2 * m.interval).Unix() + //if atomic.LoadInt64(&m.lastAt) < deadline { + // nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) + // break + //} - err := m.hbdFn(nc.WsConn()) - if err != nil { - nlog.Errorf("Heartbeat err: %v", err) - return + err := m.hbdFn(nc.WsConn()) + if err != nil { + nlog.Errorf("Heartbeat err: %v", err) + return + } } } - } + }() } return func(ngin *nnet.Engine) {