fix: 修复event为goroutine

main v1.2.3
NoahLan 1 year ago
parent 570946dcea
commit 70f6cadbb6

@ -38,7 +38,6 @@ func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
router: rt.NewDefaultRouter(), router: rt.NewDefaultRouter(),
dieChan: make(chan struct{}), dieChan: make(chan struct{}),
connMgr: conn.NewConnManager(), connMgr: conn.NewConnManager(),
evtMgr: event.NewEventManager(),
sessIdMgr: session.NewSessionIDMgr(), sessIdMgr: session.NewSessionIDMgr(),
packerBuilder: func() packet.Packer { packerBuilder: func() packet.Packer {
return nil return nil
@ -53,9 +52,15 @@ func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
ngin.pool, _ = ants.NewPool(math.MaxInt32) ngin.pool, _ = ants.NewPool(math.MaxInt32)
} }
ngin.evtMgr = event.NewEventManager(ngin.pool)
return ngin return ngin
} }
func (ngin *Engine) Pool() *ants.Pool {
return ngin.pool
}
func (ngin *Engine) Use(middleware ...rt.Middleware) { func (ngin *Engine) Use(middleware ...rt.Middleware) {
ngin.middlewares = append(ngin.middlewares, middleware...) ngin.middlewares = append(ngin.middlewares, middleware...)
} }

@ -5,6 +5,7 @@ import (
"git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog" "git.noahlan.cn/noahlan/ntool/nlog"
"github.com/panjf2000/ants/v2"
) )
var ErrEventTypeIllegal = errors.New("EventType illegal") var ErrEventTypeIllegal = errors.New("EventType illegal")
@ -64,6 +65,8 @@ type (
} }
eventManager struct { eventManager struct {
pool *ants.Pool
onConnected []OnConnectedFn onConnected []OnConnectedFn
onConnectError []OnConnectErrorFn onConnectError []OnConnectErrorFn
onDisconnected []OnDisconnectedFn onDisconnected []OnDisconnectedFn
@ -79,8 +82,9 @@ type (
///////////////// type-align ///////////////// type-align
var _ Event = (*eventManager)(nil) var _ Event = (*eventManager)(nil)
func NewEventManager() EventManager { func NewEventManager(pool *ants.Pool) EventManager {
return &eventManager{ return &eventManager{
pool: pool,
onConnected: make([]OnConnectedFn, 0), onConnected: make([]OnConnectedFn, 0),
onConnectError: make([]OnConnectErrorFn, 0), onConnectError: make([]OnConnectErrorFn, 0),
onDisconnected: make([]OnDisconnectedFn, 0), onDisconnected: make([]OnDisconnectedFn, 0),
@ -224,7 +228,9 @@ func (m *eventManager) OnConnected(nc *conn.Connection) {
return return
} }
for _, fn := range m.onConnected { for _, fn := range m.onConnected {
_ = m.pool.Submit(func() {
fn(nc) fn(nc)
})
} }
} }
@ -233,7 +239,9 @@ func (m *eventManager) OnConnectError(err error) {
return return
} }
for _, fn := range m.onConnectError { for _, fn := range m.onConnectError {
_ = m.pool.Submit(func() {
fn(err) fn(err)
})
} }
} }
@ -242,7 +250,9 @@ func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) {
return return
} }
for _, fn := range m.onDisconnected { for _, fn := range m.onDisconnected {
_ = m.pool.Submit(func() {
fn(nc, err) fn(nc, err)
})
} }
} }
@ -251,7 +261,9 @@ func (m *eventManager) OnClose(nc *conn.Connection) {
return return
} }
for _, fn := range m.onClose { for _, fn := range m.onClose {
_ = m.pool.Submit(func() {
fn(nc) fn(nc)
})
} }
} }
@ -260,7 +272,9 @@ func (m *eventManager) OnSend(nc *conn.Connection, v any) {
return return
} }
for _, fn := range m.onSend { for _, fn := range m.onSend {
_ = m.pool.Submit(func() {
fn(nc, v) fn(nc, v)
})
} }
} }
@ -269,7 +283,9 @@ func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) {
return return
} }
for _, fn := range m.onSendError { for _, fn := range m.onSendError {
_ = m.pool.Submit(func() {
fn(nc, v, err) fn(nc, v, err)
})
} }
} }
@ -278,7 +294,9 @@ func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) {
return return
} }
for _, fn := range m.onReceive { for _, fn := range m.onReceive {
_ = m.pool.Submit(func() {
fn(nc, p) fn(nc, p)
})
} }
} }
@ -287,6 +305,8 @@ func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) {
return return
} }
for _, fn := range m.onReceiveError { for _, fn := range m.onReceiveError {
_ = m.pool.Submit(func() {
fn(nc, err) fn(nc, err)
})
} }
} }

Loading…
Cancel
Save