From e9bb90ba8c80d627ea4bc00e7b4612d899d6f8df Mon Sep 17 00:00:00 2001 From: NorthLan <6995syu@163.com> Date: Sun, 16 Jul 2023 23:22:36 +0800 Subject: [PATCH] =?UTF-8?q?wip:=20=E6=9E=84=E6=80=9D=E6=96=B0=E7=9A=84even?= =?UTF-8?q?t=E7=B3=BB=E7=BB=9F=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client_tcp.go | 4 ++-- client_ws.go | 1 + config/client_ws.go | 36 ++++++++++++++++++++++++++++++++++++ config/server_ws.go | 8 +++++++- config/ws.go | 14 ++++++++++++++ connection/connection.go | 25 ++++++++++++++++++++++--- event/events.go | 39 +++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 2 ++ packet/entry.go | 14 ++++++++++++++ packet/packet.go | 12 ------------ packet/pakcer.go | 15 +++++++++++++++ server_ws.go | 30 ++++++++++++++++++++++++++++-- 13 files changed, 181 insertions(+), 21 deletions(-) create mode 100644 client_ws.go create mode 100644 config/client_ws.go create mode 100644 config/ws.go create mode 100644 event/events.go create mode 100644 packet/entry.go create mode 100644 packet/pakcer.go diff --git a/client_tcp.go b/client_tcp.go index b850de3..cc4a8ac 100644 --- a/client_tcp.go +++ b/client_tcp.go @@ -6,8 +6,8 @@ import ( "net" ) -// Dial 连接服务器 -func (ngin *Engine) Dial(addr string) (*connection.Connection, error) { +// DialTCP 连接服务器 +func (ngin *Engine) DialTCP(addr string) (*connection.Connection, error) { err := ngin.setup() if err != nil { nlog.Errorf("%s failed to setup server, err:%v", ngin.LogPrefix(), err) diff --git a/client_ws.go b/client_ws.go new file mode 100644 index 0000000..93328a5 --- /dev/null +++ b/client_ws.go @@ -0,0 +1 @@ +package nnet diff --git a/config/client_ws.go b/config/client_ws.go new file mode 100644 index 0000000..2f1443f --- /dev/null +++ b/config/client_ws.go @@ -0,0 +1,36 @@ +package config + +import "time" + +type ( + WSClientConf struct { + // Url 连接地址 + Url string `json:",default=0.0.0.0:9876,env=WS_URL"` + // ReadBufferSize 读缓冲区大小 + ReadBufferSize int `json:",default=2048"` + // WriteBufferSize 写缓冲区大小 + WriteBufferSize int `json:",default=1024"` + // ReadLimit 单条消息支持的最大消息长度,默认 8MB + ReadLimit int64 `json:",default=8192"` + // WriteDeadline 写超时,默认5s + WriteDeadline time.Duration `json:",default=5s"` + // ReadDeadline 读超时,控制断线检测 默认60s + ReadDeadline time.Duration `json:",default=60s"` + } + + // BackoffConf 自动重连配置 + BackoffConf struct { + // MinRecTime 最小重连时间间隔 默认2s + MinRecTime time.Duration `json:",default=2s"` + // MaxRecTime 最大重连时间间隔 默认60s + MaxRecTime time.Duration `json:",default=60s"` + // RecFactor 每次重连失败继续重连的时间间隔递增的乘数因子,递增到最大重连时间间隔为止 + RecFactor float64 `json:",default=1.5"` + } + + // WSClientFullConf 完整的客户端配置 + WSClientFullConf struct { + WSClientConf + BackoffConf + } +) diff --git a/config/server_ws.go b/config/server_ws.go index bc098fa..967026f 100644 --- a/config/server_ws.go +++ b/config/server_ws.go @@ -29,7 +29,13 @@ type ( WSServerFullConf struct { WSServerConf // check origin - CheckOrigin func(*http.Request) bool `json:",optional"` + CheckOrigin func(*http.Request) bool + // PingHandler Ping + PingHandler func(appData string) + // PongHandler Pong + PongHandler func(appData string) + // CloseHandler Close + CloseHandler func(closeCode int, closeText string) error } ) diff --git a/config/ws.go b/config/ws.go new file mode 100644 index 0000000..85d9284 --- /dev/null +++ b/config/ws.go @@ -0,0 +1,14 @@ +package config + +type ( + WSEvent struct { + // 连接成功回调 + OnConnected func() + // 连接异常回调,在准备进行连接的过程中发生异常时触发 + OnConnectError func(err error) + // 连接断开回调,网络异常,服务端掉线等情况时触发 + OnDisconnected func(err error) + // 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发 + OnClose func(code int, text string) + } +) diff --git a/connection/connection.go b/connection/connection.go index fa0d846..a32be1b 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -241,9 +241,27 @@ func (r *Connection) read() { _ = r.Close() }() buf := make([]byte, 4096) + + var wsConn *WSConn + if r.typ == ConnTypeWS { + wsConn = r.conn.(*WSConn) + } + for { - n, err := r.conn.Read(buf) - //nlog.Debugf("receive data %v", buf[:n]) + var ( + err error + n int + msgTyp int + ) + if r.typ == ConnTypeWS { + var bb []byte + if msgTyp, bb, err = wsConn.ReadMessage(); err == nil { + copy(buf, bb) + n = len(bb) + } + } else { + n, err = r.conn.Read(buf) + } if err != nil { nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately", r.conf.LogPrefix, err.Error()) @@ -261,8 +279,9 @@ func (r *Connection) read() { return } + //nlog.Debugf("receive data %v", buf[:n]) // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 - packets, err := r.packer.Unpack(buf[:n]) + packets, err := r.packer.Unpack(msgTyp, buf[:n]) if err != nil { nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error()) } diff --git a/event/events.go b/event/events.go new file mode 100644 index 0000000..41463db --- /dev/null +++ b/event/events.go @@ -0,0 +1,39 @@ +package event + +import "git.noahlan.cn/noahlan/nnet/connection" + +type ( + ConnFn func(conn *connection.Connection) + ErrFn func(err error) + + // ConnEvents 连接事件 + ConnEvents interface { + // OnConnected 连接成功回调 + OnConnected(h ConnFn) + // OnConnectError 连接异常回调, 在准备进行连接的过程中发生异常时触发 + OnConnectError(err error) + // OnDisconnected 连接断开回调,网络异常,服务端掉线等情况时触发 + OnDisconnected(conn *connection.Connection, err error) + // OnClose 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发 + OnClose(details any, err error) + } + + // MessageEvents 消息事件 + MessageEvents interface { + // OnSentError 发送消息异常回调 + OnSentError(details any, err error) + // OnReceiveError 接收消息异常回调 + OnReceiveError(details any, err error) + } +) + +type Manager struct { + ConnEvents + MessageEvents + + onConnected []OnConnectedFn +} + +func NewEventManager() *Manager { + return &Manager{} +} diff --git a/go.mod b/go.mod index 663d554..f38bca0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( ) require ( - git.noahlan.cn/noahlan/ntool v1.1.1 + git.noahlan.cn/noahlan/ntool v1.1.2 github.com/goburrow/serial v0.1.0 google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 ) diff --git a/go.sum b/go.sum index 3d608ce..0b38f63 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ git.noahlan.cn/noahlan/ntool v1.1.1 h1:XtixcgmHj/BQ/9VQXFzSoQ7boKF+Q6gfjlrngFD5G0o= git.noahlan.cn/noahlan/ntool v1.1.1/go.mod h1:pzoXErnQDLaHhvEHOGDoINf5VP1MDiU8NAcnaMEhxa4= +git.noahlan.cn/noahlan/ntool v1.1.2 h1:XwQgR5BubZNstt6UEjyaY/k3Qsipph1C7dztKWs/RK4= +git.noahlan.cn/noahlan/ntool v1.1.2/go.mod h1:pzoXErnQDLaHhvEHOGDoINf5VP1MDiU8NAcnaMEhxa4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/packet/entry.go b/packet/entry.go new file mode 100644 index 0000000..8d02546 --- /dev/null +++ b/packet/entry.go @@ -0,0 +1,14 @@ +package packet + +// Entry 入口原始数据 +type Entry struct { + Header any + Raw []byte +} + +func NewEntry(header any, raw []byte) *Entry { + return &Entry{ + Header: header, + Raw: raw, + } +} diff --git a/packet/packet.go b/packet/packet.go index 7cf3c21..32e5980 100644 --- a/packet/packet.go +++ b/packet/packet.go @@ -16,16 +16,4 @@ type ( GetLen() uint64 // 数据帧长度 8bytes,根据实际情况进行转换 GetBody() []byte // 数据 Body } - - // Packer 数据帧 封包/解包 - Packer interface { - // Pack 封包,将原始数据构造为二进制流数据帧 - Pack(header any, data []byte) ([]byte, error) - - // Unpack 解包 - Unpack(data []byte) ([]IPacket, error) - } - - // PackerBuilder Packer构建器 - PackerBuilder func() Packer ) diff --git a/packet/pakcer.go b/packet/pakcer.go new file mode 100644 index 0000000..4dcbe08 --- /dev/null +++ b/packet/pakcer.go @@ -0,0 +1,15 @@ +package packet + +type ( + // Packer 数据帧 封包/解包 + Packer interface { + // Pack 封包,将原始数据构造为二进制流数据帧 + Pack(header any, data []byte) ([]byte, error) + + // Unpack 解包 + Unpack(header any, data []byte) ([]IPacket, error) + } + + // PackerBuilder Packer构建器 + PackerBuilder func() Packer +) diff --git a/server_ws.go b/server_ws.go index fbba3e4..f4595a1 100644 --- a/server_ws.go +++ b/server_ws.go @@ -46,8 +46,34 @@ func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, opts ...WsConf return nil } -func (ngin *Engine) handleWS(conn *websocket.Conn) { +func (ngin *Engine) handleWS(conn *websocket.Conn, conf config.WSServerFullConf) { wsConn := connection.NewWSConn(conn) + + //defaultCloseHandler := conn.CloseHandler() + //conn.SetCloseHandler(func(code int, text string) error { + // result := defaultCloseHandler(code, text) + // //wsConn.Close() + // return result + //}) + + // ping + defaultPingHandler := wsConn.PingHandler() + wsConn.SetPingHandler(func(appData string) error { + if conf.PingHandler != nil { + conf.PingHandler(appData) + } + return defaultPingHandler(appData) + }) + + // pong + defaultPongHandler := wsConn.PongHandler() + wsConn.SetPongHandler(func(appData string) error { + if conf.PongHandler != nil { + conf.PongHandler(appData) + } + return defaultPongHandler(appData) + }) + ngin.handle(wsConn) } @@ -68,7 +94,7 @@ func (ngin *Engine) upgradeWebsocket(conf config.WSServerFullConf) { return } err = ngin.goPool.Submit(func() { - ngin.handleWS(conn) + ngin.handleWS(conn, conf) }) if err != nil { nlog.Errorf("%s submit conn pool err: %v", ngin.LogPrefix(), err.Error())