From d39b9921cf64718075408a1fd5a10f74f8ea991d Mon Sep 17 00:00:00 2001 From: NoahLan <6995syu@163.com> Date: Wed, 19 Jul 2023 09:14:02 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=8F=96=E6=B6=88pipeline/lifetime?= =?UTF-8?q?/scheduler=EF=BC=8C=E6=B7=BB=E5=8A=A0=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=9C=BA=E5=88=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client_tcp.go | 8 +- client_ws.go | 64 +++++ config/client_ws.go | 8 +- config/config.go | 6 + config/server_ws.go | 15 +- config/ws.go | 14 +- conn/conn.go | 147 +++++++++++ conn/errors.go | 20 ++ {connection => conn}/group.go | 2 +- {connection => conn}/manager.go | 24 +- {connection => conn}/serial.go | 2 +- {connection => conn}/serial_addr.go | 2 +- conn/vars.go | 27 +++ {connection => conn}/ws.go | 2 +- connection/connection.go | 337 -------------------------- connection/pipeline.go | 83 ------- engine.go | 275 +++++++++++++++++---- event/event.go | 292 ++++++++++++++++++++++ event/events.go | 39 --- go.mod | 1 + go.sum | 2 + lifetime/lifetime.go | 54 ----- middleware/heartbeat.go | 15 +- options.go | 43 +--- packet/entry.go | 14 -- packet/pakcer.go | 2 +- packet/ws.go | 37 +++ protocol/nnet/client_event_nnet.go | 70 ++++++ protocol/nnet/client_pipeline_nnet.go | 65 ----- protocol/nnet/event_nnet.go | 85 +++++++ protocol/nnet/nnet.go | 8 +- protocol/nnet/pipeline_nnet.go | 78 ------ protocol/nnet/router_nnet.go | 4 +- protocol/plain/event_plain.go | 22 ++ protocol/plain/pipeline_plain.go | 22 -- protocol/plain/plain.go | 2 +- protocol/plain/router_plain.go | 13 +- router/router.go | 14 +- server_serial.go | 4 +- server_tcp.go | 6 +- server_ws.go | 49 +--- test/test_nnet.go | 18 +- test/test_nnet_test.go | 4 +- test/test_websocket.go | 69 ++++++ test/test_websocket_test.go | 24 ++ ws.go | 61 +++++ 46 files changed, 1254 insertions(+), 899 deletions(-) create mode 100644 conn/conn.go create mode 100644 conn/errors.go rename {connection => conn}/group.go (99%) rename {connection => conn}/manager.go (72%) rename {connection => conn}/serial.go (98%) rename {connection => conn}/serial_addr.go (97%) create mode 100644 conn/vars.go rename {connection => conn}/ws.go (98%) delete mode 100644 connection/connection.go delete mode 100644 connection/pipeline.go create mode 100644 event/event.go delete mode 100644 event/events.go delete mode 100644 lifetime/lifetime.go delete mode 100644 packet/entry.go create mode 100644 packet/ws.go create mode 100644 protocol/nnet/client_event_nnet.go delete mode 100644 protocol/nnet/client_pipeline_nnet.go create mode 100644 protocol/nnet/event_nnet.go delete mode 100644 protocol/nnet/pipeline_nnet.go create mode 100644 protocol/plain/event_plain.go delete mode 100644 protocol/plain/pipeline_plain.go create mode 100644 test/test_websocket.go create mode 100644 test/test_websocket_test.go create mode 100644 ws.go diff --git a/client_tcp.go b/client_tcp.go index cc4a8ac..5b4521e 100644 --- a/client_tcp.go +++ b/client_tcp.go @@ -1,23 +1,23 @@ package nnet import ( - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/ntool/nlog" "net" ) // DialTCP 连接服务器 -func (ngin *Engine) DialTCP(addr string) (*connection.Connection, error) { +func (ngin *Engine) DialTCP(addr string) (*conn.Connection, error) { err := ngin.setup() if err != nil { nlog.Errorf("%s failed to setup server, err:%v", ngin.LogPrefix(), err) return nil, err } - conn, err := net.Dial("tcp", addr) + rc, err := net.Dial("tcp", addr) nlog.Must(err) nlog.Infof("%s now connect to %s...", ngin.LogPrefix(), addr) - return ngin.handle(conn), nil + return ngin.handle(rc), nil } diff --git a/client_ws.go b/client_ws.go index 93328a5..0c0c8fe 100644 --- a/client_ws.go +++ b/client_ws.go @@ -1 +1,65 @@ package nnet + +import ( + "crypto/tls" + "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/ntool/nlog" + "github.com/gorilla/websocket" + "github.com/jpillora/backoff" + "net/http" + "time" +) + +// DialWebsocket websocket方式 连接服务器 +func (ngin *Engine) DialWebsocket(url string, conf config.WSClientFullConf, evtOpts ...WsEventOption) (*conn.Connection, error) { + for _, opt := range evtOpts { + opt(conf.WSEvent) + } + + ngin.ReadDeadline = conf.ReadDeadline + ngin.WriteDeadline = conf.WriteDeadline + + err := ngin.setup() + if err != nil { + nlog.Errorf("%s failed to setup server, err:%v", ngin.LogPrefix(), err) + return nil, err + } + + dialer := websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + HandshakeTimeout: conf.HandshakeTimeout, + ReadBufferSize: conf.ReadBufferSize, + WriteBufferSize: conf.WriteBufferSize, + EnableCompression: conf.Compression, + } + + // 连接重试 + b := &backoff.Backoff{ + Factor: conf.RecFactor, + Jitter: true, + Min: conf.MinRecTime, + Max: conf.MaxRecTime, + } + + var wsConn *websocket.Conn + for { + nextRec := b.Duration() + wsConn, _, err = dialer.Dial(url, http.Header{}) + if err != nil { + ngin.evtMgr.OnConnectError(err) + time.Sleep(nextRec) + continue + } + if conf.ReadLimit != 0 { + wsConn.SetReadLimit(conf.ReadLimit) + } + break + } + + nlog.Infof("%s now connect to %s...", ngin.LogPrefix(), url) + + return ngin.handleWS(wsConn, conf.WSEvent), nil +} diff --git a/config/client_ws.go b/config/client_ws.go index 2f1443f..8e9c4a8 100644 --- a/config/client_ws.go +++ b/config/client_ws.go @@ -4,12 +4,6 @@ import "time" type ( WSClientConf struct { - // Url 连接地址 - Url string `json:",default=0.0.0.0:9876,env=WS_URL"` - // ReadBufferSize 读缓冲区大小 - ReadBufferSize int `json:",default=2048"` - // WriteBufferSize 写缓冲区大小 - WriteBufferSize int `json:",default=1024"` // ReadLimit 单条消息支持的最大消息长度,默认 8MB ReadLimit int64 `json:",default=8192"` // WriteDeadline 写超时,默认5s @@ -30,7 +24,9 @@ type ( // WSClientFullConf 完整的客户端配置 WSClientFullConf struct { + WSConf WSClientConf BackoffConf + WSEvent `json:"-"` } ) diff --git a/config/config.go b/config/config.go index 8788f1b..f28dfa6 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,12 @@ type ( Mode string `json:",default=dev,options=[dev,test,prod]"` // Name 引擎名称 Name string `json:",default=NL,env=ENGINE_NAME"` + // ReadDeadline 读数据超时时长,0为不超时 + ReadDeadline time.Duration `json:",default=0s"` + // WriteDeadline 写数据超时时长,0为不超时 + WriteDeadline time.Duration `json:",default=0s"` + // Deadline 读+写数据超时时长,0为不超时 + Deadline time.Duration `json:",default=0s"` } ) diff --git a/config/server_ws.go b/config/server_ws.go index 967026f..32de9cc 100644 --- a/config/server_ws.go +++ b/config/server_ws.go @@ -6,7 +6,7 @@ import ( ) type ( - WSServerConf struct { + WSConf struct { // Addr 服务地址 // 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP // 如果端口号不填或端口号为0,例如:"127.0.0.1:" 或 ":0",服务端将选择随机可用端口 @@ -27,18 +27,13 @@ type ( TLSKey string `json:",optional"` } WSServerFullConf struct { - WSServerConf + WSConf + WSEvent `json:"-"` // check origin - CheckOrigin func(*http.Request) bool - // PingHandler Ping - PingHandler func(appData string) - // PongHandler Pong - PongHandler func(appData string) - // CloseHandler Close - CloseHandler func(closeCode int, closeText string) error + CheckOrigin func(*http.Request) bool `json:"-"` } ) -func (c WSServerConf) IsTLS() bool { +func (c WSConf) IsTLS() bool { return len(c.TLSCertificate) > 0 && len(c.TLSKey) > 0 } diff --git a/config/ws.go b/config/ws.go index 85d9284..c47c89e 100644 --- a/config/ws.go +++ b/config/ws.go @@ -2,13 +2,11 @@ package config type ( WSEvent struct { - // 连接成功回调 - OnConnected func() - // 连接异常回调,在准备进行连接的过程中发生异常时触发 - OnConnectError func(err error) - // 连接断开回调,网络异常,服务端掉线等情况时触发 - OnDisconnected func(err error) - // 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发 - OnClose func(code int, text string) + // PingHandler Ping + PingHandler func(appData string) + // PongHandler Pong + PongHandler func(appData string) + // CloseHandler Close + CloseHandler func(closeCode int, closeText string) error } ) diff --git a/conn/conn.go b/conn/conn.go new file mode 100644 index 0000000..e4c7aba --- /dev/null +++ b/conn/conn.go @@ -0,0 +1,147 @@ +package conn + +import ( + "git.noahlan.cn/noahlan/nnet/session" + "net" + "sync/atomic" +) + +type ( + Connection struct { + session *session.Session // Session + + status int32 // 连接状态 + conn net.Conn // low-level conn fd + typ ConnType // 连接类型 + + lastMid uint64 // 最近一次消息ID + + chDie chan struct{} // 停止通道 + chSend chan PendingMessage // 消息发送通道(结构化消息) + chWrite chan []byte // 消息发送通道(二进制消息) + } +) + +func NewConnection(id int64, rawC net.Conn) *Connection { + r := &Connection{ + session: session.NewSession(id), + + status: StatusStart, + conn: rawC, + typ: ConnTypeTCP, + + lastMid: 0, + + chDie: make(chan struct{}), + chSend: make(chan PendingMessage, 128), + chWrite: make(chan []byte, 128), + } + if _, ok := rawC.(*WSConn); ok { + r.typ = ConnTypeWS + return r + } + if _, ok := rawC.(*SerialConn); ok { + r.typ = ConnTypeSerial + return r + } + return r +} + +func (r *Connection) Send(header, payload any) (err error) { + defer func() { + if e := recover(); e != nil { + err = ErrBrokenPipe + } + }() + r.chSend <- PendingMessage{ + Header: header, + Payload: payload, + } + return err +} + +func (r *Connection) SendBytes(data []byte) (err error) { + defer func() { + if e := recover(); e != nil { + err = ErrBrokenPipe + } + }() + r.chWrite <- data + return err +} + +func (r *Connection) Status() int32 { + return atomic.LoadInt32(&r.status) +} + +func (r *Connection) SetStatus(s int32) { + atomic.StoreInt32(&r.status, s) +} + +func (r *Connection) Type() ConnType { + return r.typ +} + +func (r *Connection) Conn() net.Conn { + return r.conn +} + +func (r *Connection) WsConn() *WSConn { + if r.typ == ConnTypeWS { + return r.conn.(*WSConn) + } + return nil +} + +func (r *Connection) SerialConn() *SerialConn { + if r.typ == ConnTypeSerial { + return r.conn.(*SerialConn) + } + return nil +} + +func (r *Connection) ID() int64 { + return r.session.ID() +} + +func (r *Connection) Session() *session.Session { + return r.session +} + +func (r *Connection) LastMID() uint64 { + return r.lastMid +} + +func (r *Connection) SetLastMID(mid uint64) { + atomic.StoreUint64(&r.lastMid, mid) +} + +func (r *Connection) ChDie() chan struct{} { + return r.chDie +} + +func (r *Connection) ChSend() chan PendingMessage { + return r.chSend +} + +func (r *Connection) ChWrite() chan []byte { + return r.chWrite +} + +func (r *Connection) Close() error { + if r.Status() == StatusClosed { + return ErrCloseClosedSession + } + r.SetStatus(StatusClosed) + + select { + case <-r.chDie: + close(r.chSend) + close(r.chWrite) + default: + close(r.chDie) + } + + r.session.Close() + return r.conn.Close() +} diff --git a/conn/errors.go b/conn/errors.go new file mode 100644 index 0000000..92ea0a2 --- /dev/null +++ b/conn/errors.go @@ -0,0 +1,20 @@ +package conn + +import "errors" + +var ( + ErrCloseClosedSession = errors.New("close closed session") + // ErrBrokenPipe represents the low-level connection has broken. + ErrBrokenPipe = errors.New("broken low-level pipe") + + ErrSendPayload = errors.New("serializer is nil, but payload type not []byte") + ErrSendMarshal = errors.New("message body marshal err") + ErrSend = errors.New("send err") + ErrSendWSType = errors.New("websocket message type err") + + ErrPack = errors.New("pack err") + ErrUnpack = errors.New("unPacker err") + ErrNoPacker = errors.New("no packer") + + ErrReceiveZero = errors.New("receive zero") +) diff --git a/connection/group.go b/conn/group.go similarity index 99% rename from connection/group.go rename to conn/group.go index e7c2b28..841db30 100644 --- a/connection/group.go +++ b/conn/group.go @@ -1,4 +1,4 @@ -package connection +package conn import ( "errors" diff --git a/connection/manager.go b/conn/manager.go similarity index 72% rename from connection/manager.go rename to conn/manager.go index 9aebbef..478872a 100644 --- a/connection/manager.go +++ b/conn/manager.go @@ -1,10 +1,10 @@ -package connection +package conn import ( "sync" ) -type Manager struct { +type ConnManager struct { sync.RWMutex // 分组 @@ -13,8 +13,8 @@ type Manager struct { conns map[int64]*Connection } -func NewManager() *Manager { - return &Manager{ +func NewConnManager() *ConnManager { + return &ConnManager{ RWMutex: sync.RWMutex{}, groups: make(map[string]*Group), conns: make(map[int64]*Connection), @@ -22,7 +22,7 @@ func NewManager() *Manager { } // Store 保存连接,同时加入到指定分组,若给定分组名为空,则不进行分组操作 -func (m *Manager) Store(groupName string, c *Connection) error { +func (m *ConnManager) Store(groupName string, c *Connection) error { m.Lock() m.conns[c.Session().ID()] = c m.Unlock() @@ -34,7 +34,7 @@ func (m *Manager) Store(groupName string, c *Connection) error { return group.Add(c) } -func (m *Manager) Remove(c *Connection) error { +func (m *ConnManager) Remove(c *Connection) error { m.Lock() defer m.Unlock() delete(m.conns, c.Session().ID()) @@ -49,7 +49,7 @@ func (m *Manager) Remove(c *Connection) error { return nil } -func (m *Manager) RemoveFromGroup(groupName string, c *Connection) error { +func (m *ConnManager) RemoveFromGroup(groupName string, c *Connection) error { m.Lock() delete(m.conns, c.Session().ID()) m.Unlock() @@ -63,7 +63,7 @@ func (m *Manager) RemoveFromGroup(groupName string, c *Connection) error { } // NewGroup 新增分组,若分组已存在,则返回现有分组 -func (m *Manager) NewGroup(name string) *Group { +func (m *ConnManager) NewGroup(name string) *Group { m.Lock() defer m.Unlock() @@ -79,7 +79,7 @@ func (m *Manager) NewGroup(name string) *Group { } // FindGroup 查找分组 -func (m *Manager) FindGroup(name string) (*Group, bool) { +func (m *ConnManager) FindGroup(name string) (*Group, bool) { m.RLock() defer m.RUnlock() @@ -88,7 +88,7 @@ func (m *Manager) FindGroup(name string) (*Group, bool) { } // FindConn 根据连接ID找到连接 -func (m *Manager) FindConn(id int64) (*Connection, bool) { +func (m *ConnManager) FindConn(id int64) (*Connection, bool) { m.RLock() defer m.RUnlock() @@ -97,7 +97,7 @@ func (m *Manager) FindConn(id int64) (*Connection, bool) { } // FindConnByUID 根据连接绑定的UID找到连接 -func (m *Manager) FindConnByUID(uid string) (*Connection, bool) { +func (m *ConnManager) FindConnByUID(uid string) (*Connection, bool) { m.RLock() defer m.RUnlock() @@ -111,7 +111,7 @@ func (m *Manager) FindConnByUID(uid string) (*Connection, bool) { // PeekConn 循环所有连接 // fn 返回true跳过循环,反之一直循环 -func (m *Manager) PeekConn(fn func(id int64, c *Connection) bool) { +func (m *ConnManager) PeekConn(fn func(id int64, c *Connection) bool) { m.RLock() defer m.RUnlock() diff --git a/connection/serial.go b/conn/serial.go similarity index 98% rename from connection/serial.go rename to conn/serial.go index 63daf84..b204af7 100644 --- a/connection/serial.go +++ b/conn/serial.go @@ -1,4 +1,4 @@ -package connection +package conn import ( "git.noahlan.cn/noahlan/ntool/nlog" diff --git a/connection/serial_addr.go b/conn/serial_addr.go similarity index 97% rename from connection/serial_addr.go rename to conn/serial_addr.go index 9443af3..db10640 100644 --- a/connection/serial_addr.go +++ b/conn/serial_addr.go @@ -1,4 +1,4 @@ -package connection +package conn import ( "fmt" diff --git a/conn/vars.go b/conn/vars.go new file mode 100644 index 0000000..309d5e7 --- /dev/null +++ b/conn/vars.go @@ -0,0 +1,27 @@ +package conn + +const ( + // StatusStart 开始阶段 + StatusStart int32 = iota + 1 + // StatusPrepare 准备阶段 + StatusPrepare + // StatusPending 等待工作阶段 + StatusPending + // StatusWorking 工作阶段 + StatusWorking + // StatusClosed 连接关闭 + StatusClosed +) + +type ConnType int + +const ( + ConnTypeTCP ConnType = iota // TCP connection + ConnTypeWS // Websocket connection + ConnTypeSerial // Websocket connection +) + +type PendingMessage struct { + Header any + Payload any +} diff --git a/connection/ws.go b/conn/ws.go similarity index 98% rename from connection/ws.go rename to conn/ws.go index 3d2a514..3d5281f 100644 --- a/connection/ws.go +++ b/conn/ws.go @@ -1,4 +1,4 @@ -package connection +package conn import ( "github.com/gorilla/websocket" diff --git a/connection/connection.go b/connection/connection.go deleted file mode 100644 index a32be1b..0000000 --- a/connection/connection.go +++ /dev/null @@ -1,337 +0,0 @@ -package connection - -import ( - "errors" - "fmt" - "git.noahlan.cn/noahlan/nnet/packet" - "git.noahlan.cn/noahlan/nnet/session" - "git.noahlan.cn/noahlan/ntool/ndef" - "git.noahlan.cn/noahlan/ntool/nlog" - "github.com/panjf2000/ants/v2" - "net" - "sync/atomic" -) - -var ( - ErrCloseClosedSession = errors.New("close closed session") - // ErrBrokenPipe represents the low-level connection has broken. - ErrBrokenPipe = errors.New("broken low-level pipe") -) - -const ( - // StatusStart 开始阶段 - StatusStart int32 = iota + 1 - // StatusPrepare 准备阶段 - StatusPrepare - // StatusPending 等待工作阶段 - StatusPending - // StatusWorking 工作阶段 - StatusWorking - // StatusClosed 连接关闭 - StatusClosed -) - -type ConnType int - -const ( - ConnTypeTCP ConnType = iota // TCP connection - ConnTypeWS // Websocket connection - ConnTypeSerial // Websocket connection -) - -type ( - Connection struct { - conf Config // 配置 - session *session.Session // Session - pool *ants.Pool // 连接池 - - status int32 // 连接状态 - conn net.Conn // low-level conn fd - typ ConnType // 连接类型 - - packer packet.Packer // 封包、拆包器 - serializer ndef.Serializer // 消息序列化/反序列化器 - pipeline Pipeline // 连接生命周期管理 - handleFn func(conn *Connection, pkg packet.IPacket) // 消息处理方法 - - lastMid uint64 // 最近一次消息ID - - chDie chan struct{} // 停止通道 - chSend chan PendingMessage // 消息发送通道(结构化消息) - chWrite chan []byte // 消息发送通道(二进制消息) - } - - packetFn func(conn *Connection, pkg packet.IPacket) - - Config struct { - LogDebug bool - LogPrefix string - } - - PendingMessage struct { - header any - payload any - } -) - -func NewConnection( - id int64, - conn net.Conn, - pool *ants.Pool, - conf Config, - packerBuilder packet.PackerBuilder, - serializer ndef.Serializer, - pipeline Pipeline, - handleFn packetFn) *Connection { - r := &Connection{ - conf: conf, - session: session.NewSession(id), - pool: pool, - - status: StatusStart, - conn: conn, - typ: ConnTypeTCP, - - packer: packerBuilder(), - serializer: serializer, - pipeline: pipeline, - handleFn: handleFn, - - lastMid: 0, - - chDie: make(chan struct{}), - chSend: make(chan PendingMessage, 128), - chWrite: make(chan []byte, 128), - } - if _, ok := conn.(*WSConn); ok { - r.typ = ConnTypeWS - return r - } - if _, ok := conn.(*SerialConn); ok { - r.typ = ConnTypeSerial - return r - } - return r -} - -func (r *Connection) Send(header, payload any) (err error) { - defer func() { - if e := recover(); e != nil { - err = ErrBrokenPipe - } - }() - r.chSend <- PendingMessage{ - header: header, - payload: payload, - } - return err -} - -func (r *Connection) SendBytes(data []byte) (err error) { - defer func() { - if e := recover(); e != nil { - err = ErrBrokenPipe - } - }() - r.chWrite <- data - return err -} - -func (r *Connection) Status() int32 { - return atomic.LoadInt32(&r.status) -} - -func (r *Connection) SetStatus(s int32) { - atomic.StoreInt32(&r.status, s) -} - -func (r *Connection) Conn() (net.Conn, ConnType) { - return r.conn, r.typ -} - -func (r *Connection) ID() int64 { - return r.session.ID() -} - -func (r *Connection) Session() *session.Session { - return r.session -} - -func (r *Connection) LastMID() uint64 { - return r.lastMid -} - -func (r *Connection) SetLastMID(mid uint64) { - atomic.StoreUint64(&r.lastMid, mid) -} - -func (r *Connection) Serve() { - _ = r.pool.Submit(func() { - r.write() - }) - - _ = r.pool.Submit(func() { - r.read() - }) -} - -func (r *Connection) write() { - defer func() { - close(r.chSend) - close(r.chWrite) - _ = r.Close() - - if r.conf.LogDebug { - nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s", - r.conf.LogPrefix, r.ID(), r.session.UID()) - } - }() - - for { - select { - case data := <-r.chSend: - // marshal packet body (data) - if r.serializer == nil { - if _, ok := data.payload.([]byte); !ok { - nlog.Errorf("%s serializer is nil, but payload type not []byte", r.conf.LogPrefix) - break - } - } else { - payload, err := r.serializer.Marshal(data.payload) - if err != nil { - nlog.Errorf("%s message body marshal err: %v", r.conf.LogPrefix, err) - break - } - data.payload = payload - } - - // invoke pipeline - if pipe := r.pipeline; pipe != nil { - err := pipe.Outbound().Process(r, data) - if err != nil { - nlog.Errorf("%s pipeline err: %s", r.conf.LogPrefix, err.Error()) - } - } - - // packet pack data - p, err := r.packer.Pack(data.header, data.payload.([]byte)) - if err != nil { - nlog.Errorf("%s pack err: %s", r.conf.LogPrefix, err.Error()) - break - } - r.chWrite <- p - case data := <-r.chWrite: - // 回写数据 - if _, err := r.conn.Write(data); err != nil { - nlog.Errorf("%s write data err: %s", r.conf.LogPrefix, err.Error()) - break - } - //nlog.Debugf("write data %v", data) - case <-r.chDie: // connection close signal - return - // TODO - //case <-r.ngin.dieChan: // application quit signal - // return - } - } -} - -func (r *Connection) read() { - defer func() { - _ = r.Close() - }() - buf := make([]byte, 4096) - - var wsConn *WSConn - if r.typ == ConnTypeWS { - wsConn = r.conn.(*WSConn) - } - - for { - var ( - err error - n int - msgTyp int - ) - if r.typ == ConnTypeWS { - var bb []byte - if msgTyp, bb, err = wsConn.ReadMessage(); err == nil { - copy(buf, bb) - n = len(bb) - } - } else { - n, err = r.conn.Read(buf) - } - if err != nil { - nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately", - r.conf.LogPrefix, err.Error()) - return - } - - if n == 0 { - nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately", - r.conf.LogPrefix) - return - } - - if r.packer == nil { - nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.conf.LogPrefix) - return - } - - //nlog.Debugf("receive data %v", buf[:n]) - // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 - packets, err := r.packer.Unpack(msgTyp, buf[:n]) - if err != nil { - nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error()) - } - // packets 处理 - for _, p := range packets { - if err := r.processPacket(p); err != nil { - nlog.Errorf("%s process packet err: %s", r.conf.LogPrefix, err.Error()) - continue - } - } - } -} - -func (r *Connection) processPacket(packet packet.IPacket) error { - if pipe := r.pipeline; pipe != nil { - err := pipe.Inbound().Process(r, packet) - if err != nil { - return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error())) - } - } - - if r.Status() == StatusWorking { - // 处理包消息 - _ = r.pool.Submit(func() { - r.handleFn(r, packet) - }) - } - return nil -} - -func (r *Connection) DieChan() chan struct{} { - return r.chDie -} - -func (r *Connection) Close() error { - if r.Status() == StatusClosed { - return ErrCloseClosedSession - } - r.SetStatus(StatusClosed) - - if r.conf.LogDebug { - nlog.Debugf("%s close connection, ID: %d", r.conf.LogPrefix, r.ID()) - } - - select { - case <-r.chDie: - default: - close(r.chDie) - } - - r.session.Close() - return r.conn.Close() -} diff --git a/connection/pipeline.go b/connection/pipeline.go deleted file mode 100644 index ab55bf7..0000000 --- a/connection/pipeline.go +++ /dev/null @@ -1,83 +0,0 @@ -package connection - -import ( - "sync" -) - -type ( - Func func(c *Connection, v any) error - - // Pipeline 消息管道 - Pipeline interface { - Outbound() Channel - Inbound() Channel - } - - pipeline struct { - outbound, inbound *pipelineChannel - } - - Channel interface { - PushFront(h Func) - PushBack(h Func) - Process(c *Connection, v any) error - } - - pipelineChannel struct { - mu sync.RWMutex - handlers []Func - } -) - -func NewPipeline() Pipeline { - return &pipeline{ - outbound: &pipelineChannel{}, - inbound: &pipelineChannel{}, - } -} - -func (p *pipeline) Outbound() Channel { - return p.outbound -} - -func (p *pipeline) Inbound() Channel { - return p.inbound -} - -// PushFront 将func压入slice首位 -func (p *pipelineChannel) PushFront(h Func) { - p.mu.Lock() - defer p.mu.Unlock() - - handlers := make([]Func, len(p.handlers)+1) - handlers[0] = h - copy(handlers[1:], p.handlers) - - p.handlers = handlers -} - -// PushBack 将func压入slice末位 -func (p *pipelineChannel) PushBack(h Func) { - p.mu.Lock() - defer p.mu.Unlock() - - p.handlers = append(p.handlers, h) -} - -// Process 处理所有的pipeline方法 -func (p *pipelineChannel) Process(c *Connection, v any) error { - if len(p.handlers) < 1 { - return nil - } - - p.mu.RLock() - defer p.mu.RUnlock() - - for _, handler := range p.handlers { - err := handler(c, v) - if err != nil { - return err - } - } - return nil -} diff --git a/engine.go b/engine.go index 3a1787a..6a71bb8 100644 --- a/engine.go +++ b/engine.go @@ -2,17 +2,17 @@ package nnet import ( "git.noahlan.cn/noahlan/nnet/config" - "git.noahlan.cn/noahlan/nnet/connection" - "git.noahlan.cn/noahlan/nnet/lifetime" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" - "git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/nnet/session" "git.noahlan.cn/noahlan/ntool/ndef" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/panjf2000/ants/v2" "math" "net" + "time" ) // Engine 引擎 @@ -22,37 +22,35 @@ type Engine struct { routes []rt.Route // 路由 router rt.Router // 消息处理器 dieChan chan struct{} // 应用程序退出信号 - pipeline connection.Pipeline // 消息管道 packerBuilder packet.PackerBuilder // 封包、拆包器 serializer ndef.Serializer // 消息 序列化/反序列化 - goPool *ants.Pool // goroutine池 - connManager *connection.Manager // 连接管理器 - lifetime *lifetime.Mgr // 生命周期 + pool *ants.Pool // goroutine池 + connMgr *conn.ConnManager // 连接管理器 + evtMgr event.EventManager // 事件管理器 sessIdMgr *session.IDMgr // SessionId管理器 } func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine { ngin := &Engine{ - EngineConf: conf, - middlewares: make([]rt.Middleware, 0), - routes: make([]rt.Route, 0), - router: rt.NewDefaultRouter(), - packerBuilder: nil, - serializer: nil, - dieChan: make(chan struct{}), - pipeline: connection.NewPipeline(), - connManager: connection.NewManager(), - lifetime: lifetime.NewLifetime(), - sessIdMgr: session.NewSessionIDMgr(), - goPool: nil, + EngineConf: conf, + middlewares: make([]rt.Middleware, 0), + routes: make([]rt.Route, 0), + router: rt.NewDefaultRouter(), + dieChan: make(chan struct{}), + connMgr: conn.NewConnManager(), + evtMgr: event.NewEventManager(), + sessIdMgr: session.NewSessionIDMgr(), + packerBuilder: func() packet.Packer { + return nil + }, } for _, opt := range opts { opt(ngin) } - if ngin.goPool == nil { - ngin.goPool, _ = ants.NewPool(math.MaxInt32) + if ngin.pool == nil { + ngin.pool, _ = ants.NewPool(math.MaxInt32) } return ngin @@ -91,46 +89,229 @@ func (ngin *Engine) setup() error { if err := ngin.bindRoutes(); err != nil { return err } - if err := ngin.goPool.Submit(func() { - scheduler.Schedule(ngin.TaskTimerPrecision) - }); err != nil { - return err - } return nil } func (ngin *Engine) Stop() { - nlog.Infof("%s is stopping...", ngin.LogPrefix()) + nlog.Infof("%s server is stopping...", ngin.LogPrefix()) + + ngin.connMgr.PeekConn(func(_ int64, c *conn.Connection) bool { + _ = c.Close() + return false + }) close(ngin.dieChan) - scheduler.Close() } -func (ngin *Engine) handle(conn net.Conn) *connection.Connection { - nc := connection.NewConnection( - ngin.sessIdMgr.SessionID(), - conn, - ngin.goPool, - connection.Config{LogDebug: ngin.ShallLogDebug(), LogPrefix: ngin.LogPrefix()}, - ngin.packerBuilder, ngin.serializer, ngin.pipeline, - ngin.router.Handle, - ) +func (ngin *Engine) handle(rawC net.Conn) *conn.Connection { + nc := conn.NewConnection(ngin.sessIdMgr.SessionID(), rawC) + + ngin.evtMgr.OnConnected(nc) - nc.Serve() + ngin.serveConn(nc, ngin.packerBuilder()) - err := ngin.connManager.Store(connection.DefaultGroupName, nc) + err := ngin.connMgr.Store(conn.DefaultGroupName, nc) nlog.Must(err) + return nc +} - // dieChan - go func() { - // lifetime - ngin.lifetime.Open(nc) +func (ngin *Engine) serveConn(nc *conn.Connection, packer packet.Packer) { + _ = ngin.pool.Submit(func() { + ngin.readLoop(nc, packer) + }) + _ = ngin.pool.Submit(func() { + ngin.writeLoop(nc, packer) + }) + + _ = ngin.pool.Submit(func() { select { - case <-nc.DieChan(): - scheduler.PushTask(func() { ngin.lifetime.Close(nc) }) - _ = ngin.connManager.Remove(nc) + case <-nc.ChDie(): + if ngin.ShallLogDebug() { + nlog.Debugf("%s Close connection, ID=%d, Remote=%s", ngin.LogPrefix(), nc.ID(), nc.Conn().RemoteAddr().String()) + } + _ = ngin.connMgr.Remove(nc) + ngin.evtMgr.OnClose(nc) } + }) +} + +func (ngin *Engine) readLoop(nc *conn.Connection, packer packet.Packer) { + defer func() { + _ = nc.Close() + + //if ngin.ShallLogDebug() { + // nlog.Debugf("%s [readLoop] connection read goroutine exit, ID=%d, UID=%s, Remote=%s", + // ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr()) + //} }() - return nc + buf := make([]byte, 4096) + for { + select { + case <-nc.ChDie(): // connection close signal + return + default: + if ngin.Deadline != 0 { + _ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline)) + } + if ngin.ReadDeadline != 0 { + _ = nc.Conn().SetReadDeadline(time.Now().Add(ngin.ReadDeadline)) + } + var ( + err error + n int + msgTyp int + ) + // 兼容websocket + if nc.Type() == conn.ConnTypeWS { + var bb []byte + if msgTyp, bb, err = nc.WsConn().ReadMessage(); err == nil { + copy(buf, bb) + n = len(bb) + } + } else { + n, err = nc.Conn().Read(buf) + } + if err != nil { + ngin.evtMgr.OnDisconnected(nc, err) + // TODO 断线重连 (仅限客户端) + nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately", + ngin.LogPrefix(), err.Error()) + return + } + + if n == 0 { + ngin.evtMgr.OnReceiveError(nc, conn.ErrReceiveZero) + nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately", + ngin.LogPrefix()) + return + } + + // 兼容websocket + if nc.Type() == conn.ConnTypeWS { + ngin.processPacket(nc, packet.NewWSPacket(msgTyp, buf[:n])) + } else { + if packer == nil { + ngin.evtMgr.OnReceiveError(nc, conn.ErrNoPacker) + nlog.Errorf("%s [readLoop] unexpected error: packer is nil", ngin.LogPrefix()) + return + } + + //nlog.Debugf("receive data %v", buf[:n]) + // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 + packets, err := packer.Unpack(buf[:n]) + if err != nil { + ngin.evtMgr.OnReceiveError(nc, conn.ErrUnpack) + nlog.Errorf("%s unpack err: %s", ngin.LogPrefix(), err.Error()) + } + // packets 处理 + for _, p := range packets { + ngin.processPacket(nc, p) + } + } + } + } +} + +func (ngin *Engine) writeLoop(nc *conn.Connection, packer packet.Packer) { + defer func() { + _ = nc.Close() + + //if ngin.ShallLogDebug() { + // nlog.Debugf("%s [writeLoop] connection write goroutine exit, ID=%d, UID=%s, Remote=%s", + // ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr()) + //} + }() + + for { + select { + case data := <-nc.ChSend(): + // marshal packet body (data) + if ngin.serializer == nil { + if _, ok := data.Payload.([]byte); !ok { + ngin.evtMgr.OnSendError(nc, data, conn.ErrSendPayload) + nlog.Errorf("%s [writeLoop] serializer is nil, but payload type not []byte", ngin.LogPrefix()) + break + } + } else { + payload, err := ngin.serializer.Marshal(data.Payload) + if err != nil { + ngin.evtMgr.OnSendError(nc, data, conn.ErrSendMarshal) + nlog.Errorf("%s [writeLoop] message body marshal err: %v", ngin.LogPrefix(), err) + break + } + data.Payload = payload + } + + // 对websocket的兼容 + if nc.Type() == conn.ConnTypeWS { + messageTyp, ok := data.Header.(int) + if !ok { + ngin.evtMgr.OnSendError(nc, data, conn.ErrSendWSType) + nlog.Errorf("%s [writeLoop] websocket message type not found", ngin.LogPrefix()) + break + } + // deadline + if ngin.Deadline != 0 { + _ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline)) + } + if ngin.WriteDeadline != 0 { + _ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline)) + } + err := nc.WsConn().WriteMessage(messageTyp, data.Payload.([]byte)) + if err != nil { + ngin.evtMgr.OnSendError(nc, data, conn.ErrSend) + nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err) + break + } + // event + ngin.evtMgr.OnSend(nc, data) + } else { + // packet pack data + if packer == nil { + ngin.evtMgr.OnSendError(nc, data, conn.ErrNoPacker) + nlog.Errorf("%s [writeLoop] unexpected error: packer is nil", ngin.LogPrefix()) + break + } + p, err := packer.Pack(data.Header, data.Payload.([]byte)) + if err != nil { + ngin.evtMgr.OnSendError(nc, data, conn.ErrPack) + nlog.Errorf("%s [writeLoop] pack err: %v", ngin.LogPrefix(), err) + break + } + nc.ChWrite() <- p + } + case data := <-nc.ChWrite(): + // 回写数据 + if ngin.Deadline != 0 { + _ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline)) + } + if ngin.WriteDeadline != 0 { + _ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline)) + } + if _, err := nc.Conn().Write(data); err != nil { + ngin.evtMgr.OnSendError(nc, data, conn.ErrSend) + nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err) + break + } + // event + ngin.evtMgr.OnSend(nc, data) + + //nlog.Debugf("write data %v", data) + case <-nc.ChDie(): // connection close signal + return + } + } +} + +func (ngin *Engine) processPacket(nc *conn.Connection, p packet.IPacket) { + // event + ngin.evtMgr.OnReceive(nc, p) + + if nc.Status() == conn.StatusWorking { + // 处理包消息 + _ = ngin.pool.Submit(func() { + ngin.router.Handle(nc, p) + }) + } } diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..882c207 --- /dev/null +++ b/event/event.go @@ -0,0 +1,292 @@ +package event + +import ( + "errors" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/ntool/nlog" +) + +var ErrEventTypeIllegal = errors.New("EventType illegal") + +type EvtType string + +const ( + EvtOnConnected = "OnConnected" + EvtOnConnectError = "OnConnectError" + EvtOnDisconnected = "OnDisconnected" + EvtOnClose = "OnClose" + + EvtOnSend = "OnSend" + EvtOnSendError = "OnSendError" + EvtOnReceive = "OnReceive" + EvtOnReceiveError = "OnReceiveError" +) + +type ( + OnConnectedFn func(nc *conn.Connection) + OnConnectErrorFn func(err error) + OnDisconnectedFn func(nc *conn.Connection, err error) + OnCloseFn func(conn *conn.Connection) + + OnSendFn func(nc *conn.Connection, v any) + OnSendErrorFn func(nc *conn.Connection, v any, err error) + OnReceiveFn func(nc *conn.Connection, p packet.IPacket) + OnReceiveErrorFn func(nc *conn.Connection, err error) + + Event interface { + // OnConnected 连接成功回调 + OnConnected(nc *conn.Connection) + // OnConnectError 连接异常回调, 在准备进行连接的过程中发生异常时触发 + OnConnectError(err error) + // OnDisconnected 连接断开回调,网络异常,服务端掉线等情况时触发 + OnDisconnected(nc *conn.Connection, err error) + // OnClose 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发 + OnClose(nc *conn.Connection) + + // OnSend 消息发送回调,消息序列化后的回调 + OnSend(nc *conn.Connection, v any) + // OnSendError 发送消息异常回调 + OnSendError(nc *conn.Connection, v any, err error) + // OnReceive 消息接收回调,消息解包后的回调 + OnReceive(nc *conn.Connection, p packet.IPacket) + // OnReceiveError 接收消息异常回调 + OnReceiveError(nc *conn.Connection, err error) + } + + EventManager interface { + Event + + // RegisterEventFront 向头部注册事件处理器 + RegisterEventFront(evtType EvtType, fn any) + // RegisterEvent 注册事件处理器 + RegisterEvent(evtType EvtType, fn any) + } + + eventManager struct { + onConnected []OnConnectedFn + onConnectError []OnConnectErrorFn + onDisconnected []OnDisconnectedFn + onClose []OnCloseFn + + onSend []OnSendFn + onSendError []OnSendErrorFn + onReceive []OnReceiveFn + onReceiveError []OnReceiveErrorFn + } +) + +///////////////// type-align +var _ Event = (*eventManager)(nil) + +func NewEventManager() EventManager { + return &eventManager{ + onConnected: make([]OnConnectedFn, 0), + onConnectError: make([]OnConnectErrorFn, 0), + onDisconnected: make([]OnDisconnectedFn, 0), + onClose: make([]OnCloseFn, 0), + + onSend: make([]OnSendFn, 0), + onSendError: make([]OnSendErrorFn, 0), + onReceive: make([]OnReceiveFn, 0), + onReceiveError: make([]OnReceiveErrorFn, 0), + } +} + +func (m *eventManager) registerEvent(evtType EvtType, fn any, front bool) { + switch evtType { + case EvtOnConnected: + if f, ok := fn.(OnConnectedFn); ok { + if front { + fns := make([]OnConnectedFn, len(m.onConnected)+1) + fns[0] = f + copy(fns[1:], m.onConnected) + m.onConnected = fns + } else { + m.onConnected = append(m.onConnected, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnConnectError: + if f, ok := fn.(OnConnectErrorFn); ok { + if front { + fns := make([]OnConnectErrorFn, len(m.onConnectError)+1) + fns[0] = f + copy(fns[1:], m.onConnectError) + m.onConnectError = fns + } else { + m.onConnectError = append(m.onConnectError, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnDisconnected: + if f, ok := fn.(OnDisconnectedFn); ok { + if front { + fns := make([]OnDisconnectedFn, len(m.onDisconnected)+1) + fns[0] = f + copy(fns[1:], m.onDisconnected) + m.onDisconnected = fns + } else { + m.onDisconnected = append(m.onDisconnected, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnClose: + if f, ok := fn.(OnCloseFn); ok { + if front { + fns := make([]OnCloseFn, len(m.onClose)+1) + fns[0] = f + copy(fns[1:], m.onClose) + m.onClose = fns + } else { + m.onClose = append(m.onClose, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnSend: + if f, ok := fn.(OnSendFn); ok { + if front { + fns := make([]OnSendFn, len(m.onSend)+1) + fns[0] = f + copy(fns[1:], m.onSend) + m.onSend = fns + } else { + m.onSend = append(m.onSend, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnSendError: + if f, ok := fn.(OnSendErrorFn); ok { + if front { + fns := make([]OnSendErrorFn, len(m.onSendError)+1) + fns[0] = f + copy(fns[1:], m.onSendError) + m.onSendError = fns + } else { + m.onSendError = append(m.onSendError, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnReceive: + if f, ok := fn.(OnReceiveFn); ok { + if front { + fns := make([]OnReceiveFn, len(m.onReceive)+1) + fns[0] = f + copy(fns[1:], m.onReceive) + m.onReceive = fns + } else { + m.onReceive = append(m.onReceive, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + case EvtOnReceiveError: + if f, ok := fn.(OnReceiveErrorFn); ok { + if front { + fns := make([]OnReceiveErrorFn, len(m.onReceiveError)+1) + fns[0] = f + copy(fns[1:], m.onReceiveError) + m.onReceiveError = fns + } else { + m.onReceiveError = append(m.onReceiveError, f) + } + } else { + nlog.Error(ErrEventTypeIllegal) + return + } + } + nlog.Infof("Register event [EvtType: %s] successfully", evtType) +} + +func (m *eventManager) RegisterEventFront(evtType EvtType, fn any) { + m.registerEvent(evtType, fn, true) +} + +func (m *eventManager) RegisterEvent(evtType EvtType, fn any) { + m.registerEvent(evtType, fn, false) +} + +func (m *eventManager) OnConnected(nc *conn.Connection) { + if len(m.onConnected) == 0 { + return + } + for _, fn := range m.onConnected { + fn(nc) + } +} + +func (m *eventManager) OnConnectError(err error) { + if len(m.onConnectError) == 0 { + return + } + for _, fn := range m.onConnectError { + fn(err) + } +} + +func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) { + if len(m.onDisconnected) == 0 { + return + } + for _, fn := range m.onDisconnected { + fn(nc, err) + } +} + +func (m *eventManager) OnClose(nc *conn.Connection) { + if len(m.onClose) == 0 { + return + } + for _, fn := range m.onClose { + fn(nc) + } +} + +func (m *eventManager) OnSend(nc *conn.Connection, v any) { + if len(m.onSend) == 0 { + return + } + for _, fn := range m.onSend { + fn(nc, v) + } +} + +func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) { + if len(m.onSendError) == 0 { + return + } + for _, fn := range m.onSendError { + fn(nc, v, err) + } +} + +func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) { + if len(m.onReceive) == 0 { + return + } + for _, fn := range m.onReceive { + fn(nc, p) + } +} + +func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) { + if len(m.onReceiveError) == 0 { + return + } + for _, fn := range m.onReceiveError { + fn(nc, err) + } +} diff --git a/event/events.go b/event/events.go deleted file mode 100644 index 41463db..0000000 --- a/event/events.go +++ /dev/null @@ -1,39 +0,0 @@ -package event - -import "git.noahlan.cn/noahlan/nnet/connection" - -type ( - ConnFn func(conn *connection.Connection) - ErrFn func(err error) - - // ConnEvents 连接事件 - ConnEvents interface { - // OnConnected 连接成功回调 - OnConnected(h ConnFn) - // OnConnectError 连接异常回调, 在准备进行连接的过程中发生异常时触发 - OnConnectError(err error) - // OnDisconnected 连接断开回调,网络异常,服务端掉线等情况时触发 - OnDisconnected(conn *connection.Connection, err error) - // OnClose 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发 - OnClose(details any, err error) - } - - // MessageEvents 消息事件 - MessageEvents interface { - // OnSentError 发送消息异常回调 - OnSentError(details any, err error) - // OnReceiveError 接收消息异常回调 - OnReceiveError(details any, err error) - } -) - -type Manager struct { - ConnEvents - MessageEvents - - onConnected []OnConnectedFn -} - -func NewEventManager() *Manager { - return &Manager{} -} diff --git a/go.mod b/go.mod index f38bca0..e3e46f1 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( require ( github.com/gofrs/uuid/v5 v5.0.0 // indirect github.com/gookit/color v1.5.3 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect diff --git a/go.sum b/go.sum index 0b38f63..ee50311 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/gookit/color v1.5.3 h1:twfIhZs4QLCtimkP7MOxlF3A0U/5cDPseRT9M/+2SCE= github.com/gookit/color v1.5.3/go.mod h1:NUzwzeehUfl7GIb36pqId+UGmRfQcU/WiiyTTeNjHtE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/lifetime/lifetime.go b/lifetime/lifetime.go deleted file mode 100644 index e47b654..0000000 --- a/lifetime/lifetime.go +++ /dev/null @@ -1,54 +0,0 @@ -package lifetime - -import ( - "git.noahlan.cn/noahlan/nnet/connection" -) - -type ( - Handler func(conn *connection.Connection) - - Lifetime interface { - OnClosed(h Handler) - OnOpen(h Handler) - } - - Mgr struct { - onOpen []Handler - onClosed []Handler - } -) - -func NewLifetime() *Mgr { - return &Mgr{ - onOpen: make([]Handler, 0), - onClosed: make([]Handler, 0), - } -} - -func (lt *Mgr) OnClosed(h Handler) { - lt.onClosed = append(lt.onClosed, h) -} - -func (lt *Mgr) OnOpen(h Handler) { - lt.onOpen = append(lt.onOpen, h) -} - -func (lt *Mgr) Open(conn *connection.Connection) { - if len(lt.onOpen) <= 0 { - return - } - - for _, handler := range lt.onOpen { - handler(conn) - } -} - -func (lt *Mgr) Close(conn *connection.Connection) { - if len(lt.onClosed) <= 0 { - return - } - - for _, handler := range lt.onClosed { - handler(conn) - } -} diff --git a/middleware/heartbeat.go b/middleware/heartbeat.go index 00a1ea9..0a8a65d 100644 --- a/middleware/heartbeat.go +++ b/middleware/heartbeat.go @@ -2,7 +2,8 @@ package middleware import ( "git.noahlan.cn/noahlan/nnet" - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/ntool/nlog" @@ -13,10 +14,10 @@ import ( type HeartbeatMiddleware struct { lastAt int64 interval time.Duration - hbdFn func(conn *connection.Connection) []byte + hbdFn func(conn *conn.Connection) []byte } -func WithHeartbeat(interval time.Duration, hbdFn func(conn *connection.Connection) []byte) nnet.RunOption { +func WithHeartbeat(interval time.Duration, hbdFn func(conn *conn.Connection) []byte) nnet.RunOption { m := &HeartbeatMiddleware{ lastAt: time.Now().Unix(), interval: interval, @@ -28,10 +29,10 @@ func WithHeartbeat(interval time.Duration, hbdFn func(conn *connection.Connectio } return func(ngin *nnet.Engine) { - ngin.Lifetime().OnOpen(m.start) + ngin.EventManager().RegisterEvent(event.EvtOnConnected, m.start) ngin.Use(func(next rt.HandlerFunc) rt.HandlerFunc { - return func(conn *connection.Connection, pkg packet.IPacket) { + return func(conn *conn.Connection, pkg packet.IPacket) { m.handle(conn, pkg) next(conn, pkg) @@ -40,7 +41,7 @@ func WithHeartbeat(interval time.Duration, hbdFn func(conn *connection.Connectio } } -func (m *HeartbeatMiddleware) start(conn *connection.Connection) { +func (m *HeartbeatMiddleware) start(conn *conn.Connection) { ticker := time.NewTicker(m.interval) defer func() { @@ -64,6 +65,6 @@ func (m *HeartbeatMiddleware) start(conn *connection.Connection) { } } -func (m *HeartbeatMiddleware) handle(_ *connection.Connection, _ packet.IPacket) { +func (m *HeartbeatMiddleware) handle(_ *conn.Connection, _ packet.IPacket) { atomic.StoreInt64(&m.lastAt, time.Now().Unix()) } diff --git a/options.go b/options.go index e8ae6d6..b625bac 100644 --- a/options.go +++ b/options.go @@ -1,8 +1,8 @@ package nnet import ( - "git.noahlan.cn/noahlan/nnet/connection" - "git.noahlan.cn/noahlan/nnet/lifetime" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/ntool/ndef" @@ -16,19 +16,14 @@ type ( RunOption func(ngin *Engine) ) -// Pipeline returns inner pipeline -func (ngin *Engine) Pipeline() connection.Pipeline { - return ngin.pipeline -} - -// Lifetime returns lifetime interface. -func (ngin *Engine) Lifetime() lifetime.Lifetime { - return ngin.lifetime +// EventManager returns EventManager. +func (ngin *Engine) EventManager() event.EventManager { + return ngin.evtMgr } // ConnManager returns connection manager -func (ngin *Engine) ConnManager() *connection.Manager { - return ngin.connManager +func (ngin *Engine) ConnManager() *conn.ConnManager { + return ngin.connMgr } //////////////////////// Options @@ -82,33 +77,13 @@ func WithSerializer(s ndef.Serializer) RunOption { // WithPool 设置使用自定义的工作池 func WithPool(pl *ants.Pool) RunOption { return func(ngin *Engine) { - ngin.goPool = pl + ngin.pool = pl } } // WithPoolCfg 设置工作池配置 func WithPoolCfg(cfg npool.Config) RunOption { return func(ngin *Engine) { - ngin.goPool, _ = ants.NewPool(cfg.PoolSize, ants.WithOptions(cfg.Options())) - } -} - -//////////////////// Pipeline - -// WithPipeline 使用自定义 pipeline -func WithPipeline(pipeline connection.Pipeline) RunOption { - return func(ngin *Engine) { - ngin.pipeline = pipeline - } -} - -type PipelineOption func(opts connection.Pipeline) - -// WithPipelineOpt 使用默认Pipeline并设置其配置 -func WithPipelineOpt(opts ...func(connection.Pipeline)) RunOption { - return func(ngin *Engine) { - for _, opt := range opts { - opt(ngin.pipeline) - } + ngin.pool, _ = ants.NewPool(cfg.PoolSize, ants.WithOptions(cfg.Options())) } } diff --git a/packet/entry.go b/packet/entry.go deleted file mode 100644 index 8d02546..0000000 --- a/packet/entry.go +++ /dev/null @@ -1,14 +0,0 @@ -package packet - -// Entry 入口原始数据 -type Entry struct { - Header any - Raw []byte -} - -func NewEntry(header any, raw []byte) *Entry { - return &Entry{ - Header: header, - Raw: raw, - } -} diff --git a/packet/pakcer.go b/packet/pakcer.go index 4dcbe08..4af868f 100644 --- a/packet/pakcer.go +++ b/packet/pakcer.go @@ -7,7 +7,7 @@ type ( Pack(header any, data []byte) ([]byte, error) // Unpack 解包 - Unpack(header any, data []byte) ([]IPacket, error) + Unpack(data []byte) ([]IPacket, error) } // PackerBuilder Packer构建器 diff --git a/packet/ws.go b/packet/ws.go new file mode 100644 index 0000000..45582b4 --- /dev/null +++ b/packet/ws.go @@ -0,0 +1,37 @@ +package packet + +import "fmt" + +type WSPacket struct { + MessageType int + Len uint64 + Raw []byte +} + +func NewWSPacket(typ int, data []byte) IPacket { + l := len(data) + raw := make([]byte, l) + copy(raw, data) + + return &WSPacket{ + MessageType: typ, + Len: uint64(l), + Raw: raw, + } +} + +func (p *WSPacket) GetHeader() any { + return p.MessageType +} + +func (p *WSPacket) GetLen() uint64 { + return p.Len +} + +func (p *WSPacket) GetBody() []byte { + return p.Raw +} + +func (p *WSPacket) String() string { + return fmt.Sprintf("MessageType=%d, Len=%d, RawStr=%s", p.MessageType, p.Len, string(p.Raw)) +} diff --git a/protocol/nnet/client_event_nnet.go b/protocol/nnet/client_event_nnet.go new file mode 100644 index 0000000..e98c17a --- /dev/null +++ b/protocol/nnet/client_event_nnet.go @@ -0,0 +1,70 @@ +package nnet + +import ( + "encoding/json" + "fmt" + "git.noahlan.cn/noahlan/nnet" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/ntool/nlog" +) + +type OnReadyFunc func() + +func WithNNetClientEvents(onReady OnReadyFunc, packer packet.Packer) nnet.RunOption { + return func(ngin *nnet.Engine) { + ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onReceiveEvent(ngin, onReady, packer)) + } +} + +func onReceiveEvent(ngin *nnet.Engine, onReady OnReadyFunc, packer packet.Packer) event.OnReceiveFn { + return func(nc *conn.Connection, p packet.IPacket) { + pkg, ok := p.(*Packet) + if !ok { + nlog.Error(packet.ErrWrongPacketType) + return + } + // Server to client + switch pkg.PacketType { + case Handshake: + var handshakeData HandshakeResp + err := json.Unmarshal(pkg.Data, &handshakeData) + nlog.Must(err) + + hrd, _ := packer.Pack(Header{ + PacketType: HandshakeAck, + MessageHeader: MessageHeader{}, + }, nil) + if err := nc.SendBytes(hrd); err != nil { + return + } + nc.SetStatus(conn.StatusWorking) + // onReady + if onReady != nil { + onReady() + } + if ngin.ShallLogDebug() { + nlog.Debugf("connection handshake Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr()) + } + case Kick: + _ = nc.Close() + case Data: + status := nc.Status() + if status != conn.StatusWorking { + nlog.Errorf(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", + nc.Conn().RemoteAddr())) + return + } + + var lastMid uint64 + switch pkg.MsgType { + case Response: + lastMid = pkg.ID + case Notify: + lastMid = 0 + } + nc.SetLastMID(lastMid) + } + } +} diff --git a/protocol/nnet/client_pipeline_nnet.go b/protocol/nnet/client_pipeline_nnet.go deleted file mode 100644 index 8d4054b..0000000 --- a/protocol/nnet/client_pipeline_nnet.go +++ /dev/null @@ -1,65 +0,0 @@ -package nnet - -import ( - "encoding/json" - "errors" - "fmt" - "git.noahlan.cn/noahlan/nnet" - "git.noahlan.cn/noahlan/nnet/connection" - "git.noahlan.cn/noahlan/nnet/packet" - "git.noahlan.cn/noahlan/ntool/nlog" -) - -type OnReadyFunc func() - -func WithNNetClientPipeline(onReady OnReadyFunc, packer packet.Packer) nnet.RunOption { - return func(ngin *nnet.Engine) { - ngin.Pipeline().Inbound().PushFront(func(conn *connection.Connection, v any) error { - pkg, ok := v.(*Packet) - if !ok { - return packet.ErrWrongPacketType - } - nc, _ := conn.Conn() - - // Server to client - switch pkg.PacketType { - case Handshake: - var handshakeData HandshakeResp - err := json.Unmarshal(pkg.Data, &handshakeData) - nlog.Must(err) - - hrd, _ := packer.Pack(Header{ - PacketType: HandshakeAck, - MessageHeader: MessageHeader{}, - }, nil) - if err := conn.SendBytes(hrd); err != nil { - return err - } - conn.SetStatus(connection.StatusWorking) - // onReady - if onReady != nil { - onReady() - } - nlog.Debugf("connection handshake Id=%d, Remote=%s", conn.Session().ID(), nc.RemoteAddr()) - case Kick: - _ = conn.Close() - case Data: - status := conn.Status() - if status != connection.StatusWorking { - return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", - nc.RemoteAddr())) - } - - var lastMid uint64 - switch pkg.MsgType { - case Response: - lastMid = pkg.ID - case Notify: - lastMid = 0 - } - conn.SetLastMID(lastMid) - } - return nil - }) - } -} diff --git a/protocol/nnet/event_nnet.go b/protocol/nnet/event_nnet.go new file mode 100644 index 0000000..d850293 --- /dev/null +++ b/protocol/nnet/event_nnet.go @@ -0,0 +1,85 @@ +package nnet + +import ( + "encoding/json" + "git.noahlan.cn/noahlan/nnet" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/ntool/nlog" +) + +type ( + HandshakeValidatorFunc func(*HandshakeReq) error + HandshakeAckPayloadFunc func() any +) + +func withNNetEvents( + handshakeResp *HandshakeResp, + validator HandshakeValidatorFunc, + packer packet.Packer, +) nnet.RunOption { + return func(ngin *nnet.Engine) { + ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onServerReceiveEvent(handshakeResp, validator, packer)) + } +} + +func onServerReceiveEvent( + handshakeResp *HandshakeResp, + validator HandshakeValidatorFunc, + packer packet.Packer, ) event.OnReceiveFn { + return func(nc *conn.Connection, p packet.IPacket) { + pkg, ok := p.(*Packet) + if !ok { + nlog.Error(packet.ErrWrongPacketType) + return + } + switch pkg.PacketType { + case Handshake: + var handshakeData HandshakeReq + err := json.Unmarshal(pkg.Data, &handshakeData) + nlog.Must(err) + + if err := validator(&handshakeData); err != nil { + nlog.Error(err) + return + } + handshakeResp.Payload = handshakeData.Payload + + data, err := json.Marshal(handshakeResp) + nlog.Must(err) + + hrd, _ := packer.Pack(Header{ + PacketType: Handshake, + MessageHeader: MessageHeader{}, + }, data) + if err := nc.SendBytes(hrd); err != nil { + nlog.Error(err) + return + } + nc.SetStatus(conn.StatusPrepare) + nlog.Debugf("connection handshake Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr()) + case HandshakeAck: + nc.SetStatus(conn.StatusPending) + nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr()) + case Data: + if nc.Status() < conn.StatusPending { + nlog.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", + nc.Conn().RemoteAddr()) + return + } + nc.SetStatus(conn.StatusWorking) + + var lastMid uint64 + switch pkg.MsgType { + case Request: + lastMid = pkg.ID + case Notify: + lastMid = 0 + default: + nlog.Errorf("Invalid message type: %s ", pkg.MsgType.String()) + } + nc.SetLastMID(lastMid) + } + } +} diff --git a/protocol/nnet/nnet.go b/protocol/nnet/nnet.go index fe9f72d..22adef5 100644 --- a/protocol/nnet/nnet.go +++ b/protocol/nnet/nnet.go @@ -2,7 +2,7 @@ package nnet import ( "git.noahlan.cn/noahlan/nnet" - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/middleware" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntool/nlog" @@ -40,7 +40,7 @@ func WithNNetClientProtocol(onReady OnReadyFunc) []nnet.RunOption { router := NewRouter().(*nRouter) packer := NewPacker(router.routeMap) opts := []nnet.RunOption{ - WithNNetClientPipeline(onReady, packer), + WithNNetClientEvents(onReady, packer), nnet.WithRouter(router), nnet.WithPackerBuilder(func() packet.Packer { return NewPacker(router.routeMap) }), } @@ -62,7 +62,7 @@ func WithNNetProtocol(config Config) []nnet.RunOption { packer := NewPacker(router.routeMap) opts := []nnet.RunOption{ - withNNetPipeline(handshakeAckData, config.HandshakeValidator, packer), + withNNetEvents(handshakeAckData, config.HandshakeValidator, packer), nnet.WithRouter(router), nnet.WithPackerBuilder(func() packet.Packer { return NewPacker(router.routeMap) }), } @@ -71,7 +71,7 @@ func WithNNetProtocol(config Config) []nnet.RunOption { hbd, err := packer.Pack(Heartbeat, nil) nlog.Must(err) - opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ *connection.Connection) []byte { + opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ *conn.Connection) []byte { return hbd })) } diff --git a/protocol/nnet/pipeline_nnet.go b/protocol/nnet/pipeline_nnet.go deleted file mode 100644 index 5b10b4a..0000000 --- a/protocol/nnet/pipeline_nnet.go +++ /dev/null @@ -1,78 +0,0 @@ -package nnet - -import ( - "encoding/json" - "errors" - "fmt" - "git.noahlan.cn/noahlan/nnet" - "git.noahlan.cn/noahlan/nnet/connection" - "git.noahlan.cn/noahlan/nnet/packet" - "git.noahlan.cn/noahlan/ntool/nlog" -) - -type ( - HandshakeValidatorFunc func(*HandshakeReq) error - HandshakeAckPayloadFunc func() any -) - -func withNNetPipeline( - handshakeResp *HandshakeResp, - validator HandshakeValidatorFunc, - packer packet.Packer, -) nnet.RunOption { - return func(ngin *nnet.Engine) { - ngin.Pipeline().Inbound().PushFront(func(conn *connection.Connection, v any) error { - pkg, ok := v.(*Packet) - if !ok { - return packet.ErrWrongPacketType - } - nc, _ := conn.Conn() - - switch pkg.PacketType { - case Handshake: - var handshakeData HandshakeReq - err := json.Unmarshal(pkg.Data, &handshakeData) - nlog.Must(err) - - if err := validator(&handshakeData); err != nil { - return err - } - handshakeResp.Payload = handshakeData.Payload - - data, err := json.Marshal(handshakeResp) - nlog.Must(err) - - hrd, _ := packer.Pack(Header{ - PacketType: Handshake, - MessageHeader: MessageHeader{}, - }, data) - if err := conn.SendBytes(hrd); err != nil { - return err - } - conn.SetStatus(connection.StatusPrepare) - nlog.Debugf("connection handshake Id=%d, Remote=%s", conn.Session().ID(), nc.RemoteAddr()) - case HandshakeAck: - conn.SetStatus(connection.StatusPending) - nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", conn.Session().ID(), nc.RemoteAddr()) - case Data: - if conn.Status() < connection.StatusPending { - return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", - nc.RemoteAddr())) - } - conn.SetStatus(connection.StatusWorking) - - var lastMid uint64 - switch pkg.MsgType { - case Request: - lastMid = pkg.ID - case Notify: - lastMid = 0 - default: - return fmt.Errorf("Invalid message type: %s ", pkg.MsgType.String()) - } - conn.SetLastMID(lastMid) - } - return nil - }) - } -} diff --git a/protocol/nnet/router_nnet.go b/protocol/nnet/router_nnet.go index 850b17a..fac6a0d 100644 --- a/protocol/nnet/router_nnet.go +++ b/protocol/nnet/router_nnet.go @@ -3,7 +3,7 @@ package nnet import ( "errors" "fmt" - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/ntool/nlog" @@ -42,7 +42,7 @@ func NewRouter() rt.Router { } } -func (r *nRouter) Handle(conn *connection.Connection, p packet.IPacket) { +func (r *nRouter) Handle(conn *conn.Connection, p packet.IPacket) { pkg, ok := p.(*Packet) if !ok { nlog.Error(packet.ErrWrongPacketType) diff --git a/protocol/plain/event_plain.go b/protocol/plain/event_plain.go new file mode 100644 index 0000000..03dadae --- /dev/null +++ b/protocol/plain/event_plain.go @@ -0,0 +1,22 @@ +package plain + +import ( + "git.noahlan.cn/noahlan/nnet" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" + "git.noahlan.cn/noahlan/nnet/packet" +) + +func withEvents() nnet.RunOption { + return func(ngin *nnet.Engine) { + ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onReceiveEvent()) + } +} + +func onReceiveEvent() event.OnReceiveFn { + return func(nc *conn.Connection, _ packet.IPacket) { + if nc.Status() != conn.StatusWorking { + nc.SetStatus(conn.StatusWorking) + } + } +} diff --git a/protocol/plain/pipeline_plain.go b/protocol/plain/pipeline_plain.go deleted file mode 100644 index 3c20cb1..0000000 --- a/protocol/plain/pipeline_plain.go +++ /dev/null @@ -1,22 +0,0 @@ -package plain - -import ( - "git.noahlan.cn/noahlan/nnet" - "git.noahlan.cn/noahlan/nnet/connection" - "git.noahlan.cn/noahlan/nnet/packet" -) - -func withPipeline() nnet.RunOption { - return func(ngin *nnet.Engine) { - ngin.Pipeline().Inbound().PushFront(func(conn *connection.Connection, v any) error { - _, ok := v.(*Packet) - if !ok { - return packet.ErrWrongPacketType - } - if conn.Status() != connection.StatusWorking { - conn.SetStatus(connection.StatusWorking) - } - return nil - }) - } -} diff --git a/protocol/plain/plain.go b/protocol/plain/plain.go index b238bbf..dc51fcc 100644 --- a/protocol/plain/plain.go +++ b/protocol/plain/plain.go @@ -7,7 +7,7 @@ import ( func WithPlainProtocol() []nnet.RunOption { opts := []nnet.RunOption{ - withPipeline(), + withEvents(), nnet.WithRouter(NewRouter()), nnet.WithPackerBuilder(func() packet.Packer { return NewPacker() }), } diff --git a/protocol/plain/router_plain.go b/protocol/plain/router_plain.go index ac8954e..60bbada 100644 --- a/protocol/plain/router_plain.go +++ b/protocol/plain/router_plain.go @@ -1,7 +1,7 @@ package plain import ( - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/ntool/nlog" @@ -16,21 +16,16 @@ func NewRouter() router.Router { return &Router{} } -func (r *Router) Handle(conn *connection.Connection, pkg packet.IPacket) { - p, ok := pkg.(*Packet) - if !ok { - nlog.Error(packet.ErrWrongPacketType) - return - } +func (r *Router) Handle(nc *conn.Connection, pkg packet.IPacket) { if r.plainHandler == nil { if r.notFound == nil { nlog.Error("message handler not found") return } - r.notFound.Handle(conn, p) + r.notFound.Handle(nc, pkg) return } - r.plainHandler.Handle(conn, p) + r.plainHandler.Handle(nc, pkg) } func (r *Router) Register(_ any, handler router.Handler) error { diff --git a/router/router.go b/router/router.go index ffe91d9..3aa996e 100644 --- a/router/router.go +++ b/router/router.go @@ -1,17 +1,17 @@ package router import ( - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntool/nlog" ) type ( Handler interface { - Handle(c *connection.Connection, pkg packet.IPacket) + Handle(c *conn.Connection, pkg packet.IPacket) } // HandlerFunc 消息处理方法 - HandlerFunc func(conn *connection.Connection, pkg packet.IPacket) + HandlerFunc func(conn *conn.Connection, pkg packet.IPacket) Middleware func(next HandlerFunc) HandlerFunc @@ -29,13 +29,13 @@ type ( Constructor func(Handler) Handler ) -func notFound(conn *connection.Connection, _ packet.IPacket) { +func notFound(conn *conn.Connection, _ packet.IPacket) { nlog.Error("handler not found") _ = conn.SendBytes([]byte("404")) } func NotFoundHandler(next Handler) Handler { - return HandlerFunc(func(c *connection.Connection, packet packet.IPacket) { + return HandlerFunc(func(c *conn.Connection, packet packet.IPacket) { h := next if next == nil { h = HandlerFunc(notFound) @@ -45,7 +45,7 @@ func NotFoundHandler(next Handler) Handler { }) } -func (f HandlerFunc) Handle(c *connection.Connection, pkg packet.IPacket) { +func (f HandlerFunc) Handle(c *conn.Connection, pkg packet.IPacket) { f(c, pkg) } @@ -94,7 +94,7 @@ func NewDefaultRouter() Router { return &plainRouter{} } -func (p *plainRouter) Handle(c *connection.Connection, pkg packet.IPacket) { +func (p *plainRouter) Handle(c *conn.Connection, pkg packet.IPacket) { if p.handler == nil { return } diff --git a/server_serial.go b/server_serial.go index a470aee..6026c93 100644 --- a/server_serial.go +++ b/server_serial.go @@ -1,7 +1,7 @@ package nnet import ( - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/goburrow/serial" "sync" @@ -30,7 +30,7 @@ func (ngin *Engine) ListenSerial(conf serial.Config) error { var wg sync.WaitGroup wg.Add(1) - ngin.handle(connection.NewSerialConn(port, &conf)) + ngin.handle(conn.NewSerialConn(port, &conf)) go func() { for { diff --git a/server_tcp.go b/server_tcp.go index 073e5dd..cbc8441 100644 --- a/server_tcp.go +++ b/server_tcp.go @@ -25,7 +25,7 @@ func (ngin *Engine) ListenTCP(conf config.TCPServerConf) error { ngin.Stop() }() for { - conn, err := listener.Accept() + rc, err := listener.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { nlog.Errorf("%s connection closed, err:%v", ngin.LogPrefix(), err) @@ -35,8 +35,8 @@ func (ngin *Engine) ListenTCP(conf config.TCPServerConf) error { continue } - err = ngin.goPool.Submit(func() { - ngin.handle(conn) + err = ngin.pool.Submit(func() { + ngin.handle(rc) }) if err != nil { nlog.Errorf("%s submit conn pool err: %ng", ngin.LogPrefix(), err.Error()) diff --git a/server_ws.go b/server_ws.go index f4595a1..f9f506c 100644 --- a/server_ws.go +++ b/server_ws.go @@ -3,7 +3,6 @@ package nnet import ( "fmt" "git.noahlan.cn/noahlan/nnet/config" - "git.noahlan.cn/noahlan/nnet/connection" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/gorilla/websocket" "net/http" @@ -11,19 +10,22 @@ import ( "strings" ) -type WsConfOption func(conf config.WSServerFullConf) +type WsServerOption func(conf config.WSServerFullConf) -func WithWSCheckOrigin(fn func(*http.Request) bool) WsConfOption { +func WithWSCheckOrigin(fn func(*http.Request) bool) WsServerOption { return func(conf config.WSServerFullConf) { conf.CheckOrigin = fn } } // ListenWebsocket 开始监听Websocket -func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, opts ...WsConfOption) error { - for _, opt := range opts { +func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, serverOpts []WsServerOption, evtOpts ...WsEventOption) error { + for _, opt := range serverOpts { opt(conf) } + for _, opt := range evtOpts { + opt(conf.WSEvent) + } err := ngin.setup() if err != nil { @@ -46,37 +48,6 @@ func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, opts ...WsConf return nil } -func (ngin *Engine) handleWS(conn *websocket.Conn, conf config.WSServerFullConf) { - wsConn := connection.NewWSConn(conn) - - //defaultCloseHandler := conn.CloseHandler() - //conn.SetCloseHandler(func(code int, text string) error { - // result := defaultCloseHandler(code, text) - // //wsConn.Close() - // return result - //}) - - // ping - defaultPingHandler := wsConn.PingHandler() - wsConn.SetPingHandler(func(appData string) error { - if conf.PingHandler != nil { - conf.PingHandler(appData) - } - return defaultPingHandler(appData) - }) - - // pong - defaultPongHandler := wsConn.PongHandler() - wsConn.SetPongHandler(func(appData string) error { - if conf.PongHandler != nil { - conf.PongHandler(appData) - } - return defaultPongHandler(appData) - }) - - ngin.handle(wsConn) -} - func (ngin *Engine) upgradeWebsocket(conf config.WSServerFullConf) { upgrade := websocket.Upgrader{ HandshakeTimeout: conf.HandshakeTimeout, @@ -88,13 +59,13 @@ func (ngin *Engine) upgradeWebsocket(conf config.WSServerFullConf) { path := fmt.Sprintf("/%s", strings.TrimPrefix(conf.Path, "/")) http.HandleFunc(path, func(writer http.ResponseWriter, request *http.Request) { - conn, err := upgrade.Upgrade(writer, request, nil) + wc, err := upgrade.Upgrade(writer, request, nil) if err != nil { nlog.Errorf("%s Upgrade failure, URI=%ng, Error=%ng", ngin.LogPrefix(), request.RequestURI, err.Error()) return } - err = ngin.goPool.Submit(func() { - ngin.handleWS(conn, conf) + err = ngin.pool.Submit(func() { + _ = ngin.handleWS(wc, conf.WSEvent) }) if err != nil { nlog.Errorf("%s submit conn pool err: %v", ngin.LogPrefix(), err.Error()) diff --git a/test/test_nnet.go b/test/test_nnet.go index 3d5e533..90fdd0b 100644 --- a/test/test_nnet.go +++ b/test/test_nnet.go @@ -4,7 +4,8 @@ import ( "encoding/json" "git.noahlan.cn/noahlan/nnet" "git.noahlan.cn/noahlan/nnet/config" - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" protocol_nnet "git.noahlan.cn/noahlan/nnet/protocol/nnet" rt "git.noahlan.cn/noahlan/nnet/router" @@ -14,6 +15,10 @@ import ( "time" ) +var ttt event.OnDisconnectedFn = func(nc *conn.Connection, err error) { + nlog.Debugf("ttt %v", err) +} + func runServer(addr string) { nginOpts := make([]nnet.RunOption, 0) nginOpts = append(nginOpts, nnet.WithPoolCfg(npool.Config{ @@ -28,6 +33,9 @@ func runServer(addr string) { HeartbeatInterval: 0, HandshakeValidator: nil, })...) + nginOpts = append(nginOpts, func(ngin *nnet.Engine) { + ngin.EventManager().RegisterEvent(event.EvtOnDisconnected, ttt) + }) ngin := nnet.NewEngine(config.EngineConf{ TaskTimerPrecision: 0, Mode: "dev", @@ -38,7 +46,7 @@ func runServer(addr string) { Route: "ping", Code: 1, }, - Handler: func(conn *connection.Connection, pkg packet.IPacket) { + Handler: func(conn *conn.Connection, pkg packet.IPacket) { nlog.Info("client ping, server pong -> ") err := conn.Send(protocol_nnet.Header{ PacketType: protocol_nnet.Data, @@ -64,7 +72,7 @@ func runServer(addr string) { } -func runClient(addr string) (*nnet.Engine, *connection.Connection) { +func runClient(addr string) (*nnet.Engine, *conn.Connection) { chReady := make(chan struct{}) nginOpts := make([]nnet.RunOption, 0) @@ -90,11 +98,11 @@ func runClient(addr string) (*nnet.Engine, *connection.Connection) { Route: "test.client", Code: 1, }, - Handler: func(conn *connection.Connection, pkg packet.IPacket) { + Handler: func(conn *conn.Connection, pkg packet.IPacket) { nlog.Info("client hahaha") }, }) - conn, err := ngin.Dial(addr) + conn, err := ngin.DialTCP(addr) nlog.Must(err) handshake, err := json.Marshal(&protocol_nnet.HandshakeReq{ diff --git a/test/test_nnet_test.go b/test/test_nnet_test.go index 543a131..f0fb4e0 100644 --- a/test/test_nnet_test.go +++ b/test/test_nnet_test.go @@ -1,7 +1,7 @@ package main import ( - "git.noahlan.cn/noahlan/nnet/connection" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/protocol/nnet" rt "git.noahlan.cn/noahlan/nnet/router" @@ -21,7 +21,7 @@ func TestClient(t *testing.T) { Route: "pong", Code: 2, }, - Handler: func(conn *connection.Connection, pkg packet.IPacket) { + Handler: func(conn *conn.Connection, pkg packet.IPacket) { nlog.Info("server pong, client ping ->") _ = et.Send(nnet.Header{ PacketType: nnet.Data, diff --git a/test/test_websocket.go b/test/test_websocket.go new file mode 100644 index 0000000..9eefa7f --- /dev/null +++ b/test/test_websocket.go @@ -0,0 +1,69 @@ +package main + +import ( + "git.noahlan.cn/noahlan/nnet" + "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/nnet/protocol/plain" + rt "git.noahlan.cn/noahlan/nnet/router" + "git.noahlan.cn/noahlan/ntool/nlog" + "time" +) + +func runWSServer(addr, path string) { + nginOpts := make([]nnet.RunOption, 0) + nginOpts = append(nginOpts, plain.WithPlainProtocol()...) + + ngin := nnet.NewEngine(config.EngineConf{ + Mode: config.DevMode, + Name: "DevNL", + }, nginOpts...) + + ngin.AddRoutes(rt.Route{ + Matches: nil, + Handler: func(conn *conn.Connection, pkg packet.IPacket) { + nlog.Debugf("route fn: %v", pkg) + }, + }) + + defer ngin.Stop() + + err := ngin.ListenWebsocket(config.WSServerFullConf{ + WSConf: config.WSConf{ + Addr: addr, + Path: path, + HandshakeTimeout: time.Second * 5, + }, + WSEvent: config.WSEvent{}, + }, nil) + + if err != nil { + return + } +} + +func runWSClient(addr string) (*nnet.Engine, *conn.Connection) { + //chReady := make(chan struct{}) + + nginOpts := make([]nnet.RunOption, 0) + nginOpts = append(nginOpts, plain.WithPlainProtocol()...) + + //var onReadyFn event.OnConnectedFn = func(nc *conn.Connection) { + // chReady <- struct{}{} + //} + + //nginOpts = append(nginOpts, func(ngin *nnet.Engine) { + // ngin.EventManager().RegisterEvent(event.EvtOnConnected, onReadyFn) + //}) + + ngin := nnet.NewEngine(config.EngineConf{ + Mode: config.DevMode, + Name: "DevNL-Client", + }, nginOpts...) + + nc, err := ngin.DialWebsocket(addr, config.WSClientFullConf{}) + nlog.Must(err) + + return ngin, nc +} diff --git a/test/test_websocket_test.go b/test/test_websocket_test.go new file mode 100644 index 0000000..ef21f96 --- /dev/null +++ b/test/test_websocket_test.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/gorilla/websocket" + "sync" + "testing" +) + +func TestWSServer(t *testing.T) { + runWSServer("0.0.0.0:14725", "/ws") +} + +func TestWSClient(t *testing.T) { + ngin, nc := runWSClient("ws://127.0.0.1:14725/ws") + + _ = nc.Send(websocket.TextMessage, []byte("hello world!")) + + ngin.LogPrefix() + + var wg sync.WaitGroup + wg.Add(1) + + wg.Wait() +} diff --git a/ws.go b/ws.go new file mode 100644 index 0000000..7268c60 --- /dev/null +++ b/ws.go @@ -0,0 +1,61 @@ +package nnet + +import ( + "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/conn" + "github.com/gorilla/websocket" +) + +type WsEventOption func(conf config.WSEvent) + +func WithPingHandler(fn func(appData string)) WsEventOption { + return func(conf config.WSEvent) { + conf.PingHandler = fn + } +} + +func WithPongHandler(fn func(appData string)) WsEventOption { + return func(conf config.WSEvent) { + conf.PongHandler = fn + } +} + +func WithCloseHandler(fn func(closeCode int, closeText string) error) WsEventOption { + return func(conf config.WSEvent) { + conf.CloseHandler = fn + } +} + +func (ngin *Engine) handleWS(wc *websocket.Conn, conf config.WSEvent) *conn.Connection { + wsConn := conn.NewWSConn(wc) + + nc := ngin.handle(wsConn) + + defaultCloseHandler := wsConn.CloseHandler() + wsConn.SetCloseHandler(func(code int, text string) error { + result := defaultCloseHandler(code, text) + _ = wsConn.Close() + ngin.evtMgr.OnClose(nc) + return result + }) + + // ping + defaultPingHandler := wsConn.PingHandler() + wsConn.SetPingHandler(func(appData string) error { + if conf.PingHandler != nil { + conf.PingHandler(appData) + } + return defaultPingHandler(appData) + }) + + // pong + defaultPongHandler := wsConn.PongHandler() + wsConn.SetPongHandler(func(appData string) error { + if conf.PongHandler != nil { + conf.PongHandler(appData) + } + return defaultPongHandler(appData) + }) + + return nc +}