package event import ( "errors" "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/panjf2000/ants/v2" ) var ErrEventTypeIllegal = errors.New("EventType illegal") type EvtType string const ( EvtOnConnected = "OnConnected" EvtOnConnectError = "OnConnectError" EvtOnDisconnected = "OnDisconnected" EvtOnClose = "OnClose" EvtOnSend = "OnSend" EvtOnSendError = "OnSendError" EvtOnReceive = "OnReceive" EvtOnReceiveError = "OnReceiveError" ) type ( OnConnectedFn func(nc *conn.Connection) OnConnectErrorFn func(err error) OnDisconnectedFn func(nc *conn.Connection, err error) OnCloseFn func(conn *conn.Connection) OnSendFn func(nc *conn.Connection, v any) OnSendErrorFn func(nc *conn.Connection, v any, err error) OnReceiveFn func(nc *conn.Connection, p packet.IPacket) OnReceiveErrorFn func(nc *conn.Connection, err error) Event interface { // OnConnected 连接成功回调 OnConnected(nc *conn.Connection) // OnConnectError 连接异常回调, 在准备进行连接的过程中发生异常时触发 OnConnectError(err error) // OnDisconnected 连接断开回调,网络异常,服务端掉线等情况时触发 OnDisconnected(nc *conn.Connection, err error) // OnClose 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发 OnClose(nc *conn.Connection) // OnSend 消息发送回调,消息序列化后的回调 OnSend(nc *conn.Connection, v any) // OnSendError 发送消息异常回调 OnSendError(nc *conn.Connection, v any, err error) // OnReceive 消息接收回调,消息解包后的回调 OnReceive(nc *conn.Connection, p packet.IPacket) // OnReceiveError 接收消息异常回调 OnReceiveError(nc *conn.Connection, err error) } EventManager interface { Event // RegisterEventFront 向头部注册事件处理器 RegisterEventFront(evtType EvtType, fn any) // RegisterEvent 注册事件处理器 RegisterEvent(evtType EvtType, fn any) } eventManager struct { pool *ants.Pool onConnected []OnConnectedFn onConnectError []OnConnectErrorFn onDisconnected []OnDisconnectedFn onClose []OnCloseFn onSend []OnSendFn onSendError []OnSendErrorFn onReceive []OnReceiveFn onReceiveError []OnReceiveErrorFn } ) ///////////////// type-align var _ Event = (*eventManager)(nil) func NewEventManager(pool *ants.Pool) EventManager { return &eventManager{ pool: pool, onConnected: make([]OnConnectedFn, 0), onConnectError: make([]OnConnectErrorFn, 0), onDisconnected: make([]OnDisconnectedFn, 0), onClose: make([]OnCloseFn, 0), onSend: make([]OnSendFn, 0), onSendError: make([]OnSendErrorFn, 0), onReceive: make([]OnReceiveFn, 0), onReceiveError: make([]OnReceiveErrorFn, 0), } } func (m *eventManager) registerEvent(evtType EvtType, fn any, front bool) { switch evtType { case EvtOnConnected: if f, ok := fn.(OnConnectedFn); ok { if front { fns := make([]OnConnectedFn, len(m.onConnected)+1) fns[0] = f copy(fns[1:], m.onConnected) m.onConnected = fns } else { m.onConnected = append(m.onConnected, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnConnectError: if f, ok := fn.(OnConnectErrorFn); ok { if front { fns := make([]OnConnectErrorFn, len(m.onConnectError)+1) fns[0] = f copy(fns[1:], m.onConnectError) m.onConnectError = fns } else { m.onConnectError = append(m.onConnectError, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnDisconnected: if f, ok := fn.(OnDisconnectedFn); ok { if front { fns := make([]OnDisconnectedFn, len(m.onDisconnected)+1) fns[0] = f copy(fns[1:], m.onDisconnected) m.onDisconnected = fns } else { m.onDisconnected = append(m.onDisconnected, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnClose: if f, ok := fn.(OnCloseFn); ok { if front { fns := make([]OnCloseFn, len(m.onClose)+1) fns[0] = f copy(fns[1:], m.onClose) m.onClose = fns } else { m.onClose = append(m.onClose, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnSend: if f, ok := fn.(OnSendFn); ok { if front { fns := make([]OnSendFn, len(m.onSend)+1) fns[0] = f copy(fns[1:], m.onSend) m.onSend = fns } else { m.onSend = append(m.onSend, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnSendError: if f, ok := fn.(OnSendErrorFn); ok { if front { fns := make([]OnSendErrorFn, len(m.onSendError)+1) fns[0] = f copy(fns[1:], m.onSendError) m.onSendError = fns } else { m.onSendError = append(m.onSendError, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnReceive: if f, ok := fn.(OnReceiveFn); ok { if front { fns := make([]OnReceiveFn, len(m.onReceive)+1) fns[0] = f copy(fns[1:], m.onReceive) m.onReceive = fns } else { m.onReceive = append(m.onReceive, f) } } else { nlog.Error(ErrEventTypeIllegal) return } case EvtOnReceiveError: if f, ok := fn.(OnReceiveErrorFn); ok { if front { fns := make([]OnReceiveErrorFn, len(m.onReceiveError)+1) fns[0] = f copy(fns[1:], m.onReceiveError) m.onReceiveError = fns } else { m.onReceiveError = append(m.onReceiveError, f) } } else { nlog.Error(ErrEventTypeIllegal) return } } nlog.Infof("Register event [EvtType: %s] successfully", evtType) } func (m *eventManager) RegisterEventFront(evtType EvtType, fn any) { m.registerEvent(evtType, fn, true) } func (m *eventManager) RegisterEvent(evtType EvtType, fn any) { m.registerEvent(evtType, fn, false) } func (m *eventManager) OnConnected(nc *conn.Connection) { if len(m.onConnected) == 0 { return } for _, fn := range m.onConnected { _ = m.pool.Submit(func() { fn(nc) }) } } func (m *eventManager) OnConnectError(err error) { if len(m.onConnectError) == 0 { return } for _, fn := range m.onConnectError { _ = m.pool.Submit(func() { fn(err) }) } } func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) { if len(m.onDisconnected) == 0 { return } for _, fn := range m.onDisconnected { _ = m.pool.Submit(func() { fn(nc, err) }) } } func (m *eventManager) OnClose(nc *conn.Connection) { if len(m.onClose) == 0 { return } for _, fn := range m.onClose { _ = m.pool.Submit(func() { fn(nc) }) } } func (m *eventManager) OnSend(nc *conn.Connection, v any) { if len(m.onSend) == 0 { return } for _, fn := range m.onSend { _ = m.pool.Submit(func() { fn(nc, v) }) } } func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) { if len(m.onSendError) == 0 { return } for _, fn := range m.onSendError { _ = m.pool.Submit(func() { fn(nc, v, err) }) } } func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) { if len(m.onReceive) == 0 { return } for _, fn := range m.onReceive { _ = m.pool.Submit(func() { fn(nc, p) }) } } func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) { if len(m.onReceiveError) == 0 { return } for _, fn := range m.onReceiveError { _ = m.pool.Submit(func() { fn(nc, err) }) } }