refactor: 取消pipeline/lifetime/scheduler,添加事件处理机制。

main v1.2.0
NoahLan 1 year ago
parent e9bb90ba8c
commit d39b9921cf

@ -1,23 +1,23 @@
package nnet
import (
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/ntool/nlog"
"net"
)
// DialTCP 连接服务器
func (ngin *Engine) DialTCP(addr string) (*connection.Connection, error) {
func (ngin *Engine) DialTCP(addr string) (*conn.Connection, error) {
err := ngin.setup()
if err != nil {
nlog.Errorf("%s failed to setup server, err:%v", ngin.LogPrefix(), err)
return nil, err
}
conn, err := net.Dial("tcp", addr)
rc, err := net.Dial("tcp", addr)
nlog.Must(err)
nlog.Infof("%s now connect to %s...", ngin.LogPrefix(), addr)
return ngin.handle(conn), nil
return ngin.handle(rc), nil
}

@ -1 +1,65 @@
package nnet
import (
"crypto/tls"
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/gorilla/websocket"
"github.com/jpillora/backoff"
"net/http"
"time"
)
// DialWebsocket websocket方式 连接服务器
func (ngin *Engine) DialWebsocket(url string, conf config.WSClientFullConf, evtOpts ...WsEventOption) (*conn.Connection, error) {
for _, opt := range evtOpts {
opt(conf.WSEvent)
}
ngin.ReadDeadline = conf.ReadDeadline
ngin.WriteDeadline = conf.WriteDeadline
err := ngin.setup()
if err != nil {
nlog.Errorf("%s failed to setup server, err:%v", ngin.LogPrefix(), err)
return nil, err
}
dialer := websocket.Dialer{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
HandshakeTimeout: conf.HandshakeTimeout,
ReadBufferSize: conf.ReadBufferSize,
WriteBufferSize: conf.WriteBufferSize,
EnableCompression: conf.Compression,
}
// 连接重试
b := &backoff.Backoff{
Factor: conf.RecFactor,
Jitter: true,
Min: conf.MinRecTime,
Max: conf.MaxRecTime,
}
var wsConn *websocket.Conn
for {
nextRec := b.Duration()
wsConn, _, err = dialer.Dial(url, http.Header{})
if err != nil {
ngin.evtMgr.OnConnectError(err)
time.Sleep(nextRec)
continue
}
if conf.ReadLimit != 0 {
wsConn.SetReadLimit(conf.ReadLimit)
}
break
}
nlog.Infof("%s now connect to %s...", ngin.LogPrefix(), url)
return ngin.handleWS(wsConn, conf.WSEvent), nil
}

@ -4,12 +4,6 @@ import "time"
type (
WSClientConf struct {
// Url 连接地址
Url string `json:",default=0.0.0.0:9876,env=WS_URL"`
// ReadBufferSize 读缓冲区大小
ReadBufferSize int `json:",default=2048"`
// WriteBufferSize 写缓冲区大小
WriteBufferSize int `json:",default=1024"`
// ReadLimit 单条消息支持的最大消息长度,默认 8MB
ReadLimit int64 `json:",default=8192"`
// WriteDeadline 写超时默认5s
@ -30,7 +24,9 @@ type (
// WSClientFullConf 完整的客户端配置
WSClientFullConf struct {
WSConf
WSClientConf
BackoffConf
WSEvent `json:"-"`
}
)

@ -22,6 +22,12 @@ type (
Mode string `json:",default=dev,options=[dev,test,prod]"`
// Name 引擎名称
Name string `json:",default=NL,env=ENGINE_NAME"`
// ReadDeadline 读数据超时时长,0为不超时
ReadDeadline time.Duration `json:",default=0s"`
// WriteDeadline 写数据超时时长,0为不超时
WriteDeadline time.Duration `json:",default=0s"`
// Deadline 读+写数据超时时长,0为不超时
Deadline time.Duration `json:",default=0s"`
}
)

@ -6,7 +6,7 @@ import (
)
type (
WSServerConf struct {
WSConf struct {
// Addr 服务地址
// 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP
// 如果端口号不填或端口号为0例如"127.0.0.1:" 或 ":0",服务端将选择随机可用端口
@ -27,18 +27,13 @@ type (
TLSKey string `json:",optional"`
}
WSServerFullConf struct {
WSServerConf
WSConf
WSEvent `json:"-"`
// check origin
CheckOrigin func(*http.Request) bool
// PingHandler Ping
PingHandler func(appData string)
// PongHandler Pong
PongHandler func(appData string)
// CloseHandler Close
CloseHandler func(closeCode int, closeText string) error
CheckOrigin func(*http.Request) bool `json:"-"`
}
)
func (c WSServerConf) IsTLS() bool {
func (c WSConf) IsTLS() bool {
return len(c.TLSCertificate) > 0 && len(c.TLSKey) > 0
}

@ -2,13 +2,11 @@ package config
type (
WSEvent struct {
// 连接成功回调
OnConnected func()
// 连接异常回调,在准备进行连接的过程中发生异常时触发
OnConnectError func(err error)
// 连接断开回调,网络异常,服务端掉线等情况时触发
OnDisconnected func(err error)
// 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发
OnClose func(code int, text string)
// PingHandler Ping
PingHandler func(appData string)
// PongHandler Pong
PongHandler func(appData string)
// CloseHandler Close
CloseHandler func(closeCode int, closeText string) error
}
)

@ -0,0 +1,147 @@
package conn
import (
"git.noahlan.cn/noahlan/nnet/session"
"net"
"sync/atomic"
)
type (
Connection struct {
session *session.Session // Session
status int32 // 连接状态
conn net.Conn // low-level conn fd
typ ConnType // 连接类型
lastMid uint64 // 最近一次消息ID
chDie chan struct{} // 停止通道
chSend chan PendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
}
)
func NewConnection(id int64, rawC net.Conn) *Connection {
r := &Connection{
session: session.NewSession(id),
status: StatusStart,
conn: rawC,
typ: ConnTypeTCP,
lastMid: 0,
chDie: make(chan struct{}),
chSend: make(chan PendingMessage, 128),
chWrite: make(chan []byte, 128),
}
if _, ok := rawC.(*WSConn); ok {
r.typ = ConnTypeWS
return r
}
if _, ok := rawC.(*SerialConn); ok {
r.typ = ConnTypeSerial
return r
}
return r
}
func (r *Connection) Send(header, payload any) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chSend <- PendingMessage{
Header: header,
Payload: payload,
}
return err
}
func (r *Connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chWrite <- data
return err
}
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *Connection) Type() ConnType {
return r.typ
}
func (r *Connection) Conn() net.Conn {
return r.conn
}
func (r *Connection) WsConn() *WSConn {
if r.typ == ConnTypeWS {
return r.conn.(*WSConn)
}
return nil
}
func (r *Connection) SerialConn() *SerialConn {
if r.typ == ConnTypeSerial {
return r.conn.(*SerialConn)
}
return nil
}
func (r *Connection) ID() int64 {
return r.session.ID()
}
func (r *Connection) Session() *session.Session {
return r.session
}
func (r *Connection) LastMID() uint64 {
return r.lastMid
}
func (r *Connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
}
func (r *Connection) ChDie() chan struct{} {
return r.chDie
}
func (r *Connection) ChSend() chan PendingMessage {
return r.chSend
}
func (r *Connection) ChWrite() chan []byte {
return r.chWrite
}
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
select {
case <-r.chDie:
close(r.chSend)
close(r.chWrite)
default:
close(r.chDie)
}
r.session.Close()
return r.conn.Close()
}

@ -0,0 +1,20 @@
package conn
import "errors"
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
ErrSendPayload = errors.New("serializer is nil, but payload type not []byte")
ErrSendMarshal = errors.New("message body marshal err")
ErrSend = errors.New("send err")
ErrSendWSType = errors.New("websocket message type err")
ErrPack = errors.New("pack err")
ErrUnpack = errors.New("unPacker err")
ErrNoPacker = errors.New("no packer")
ErrReceiveZero = errors.New("receive zero")
)

@ -1,4 +1,4 @@
package connection
package conn
import (
"errors"

@ -1,10 +1,10 @@
package connection
package conn
import (
"sync"
)
type Manager struct {
type ConnManager struct {
sync.RWMutex
// 分组
@ -13,8 +13,8 @@ type Manager struct {
conns map[int64]*Connection
}
func NewManager() *Manager {
return &Manager{
func NewConnManager() *ConnManager {
return &ConnManager{
RWMutex: sync.RWMutex{},
groups: make(map[string]*Group),
conns: make(map[int64]*Connection),
@ -22,7 +22,7 @@ func NewManager() *Manager {
}
// Store 保存连接,同时加入到指定分组,若给定分组名为空,则不进行分组操作
func (m *Manager) Store(groupName string, c *Connection) error {
func (m *ConnManager) Store(groupName string, c *Connection) error {
m.Lock()
m.conns[c.Session().ID()] = c
m.Unlock()
@ -34,7 +34,7 @@ func (m *Manager) Store(groupName string, c *Connection) error {
return group.Add(c)
}
func (m *Manager) Remove(c *Connection) error {
func (m *ConnManager) Remove(c *Connection) error {
m.Lock()
defer m.Unlock()
delete(m.conns, c.Session().ID())
@ -49,7 +49,7 @@ func (m *Manager) Remove(c *Connection) error {
return nil
}
func (m *Manager) RemoveFromGroup(groupName string, c *Connection) error {
func (m *ConnManager) RemoveFromGroup(groupName string, c *Connection) error {
m.Lock()
delete(m.conns, c.Session().ID())
m.Unlock()
@ -63,7 +63,7 @@ func (m *Manager) RemoveFromGroup(groupName string, c *Connection) error {
}
// NewGroup 新增分组,若分组已存在,则返回现有分组
func (m *Manager) NewGroup(name string) *Group {
func (m *ConnManager) NewGroup(name string) *Group {
m.Lock()
defer m.Unlock()
@ -79,7 +79,7 @@ func (m *Manager) NewGroup(name string) *Group {
}
// FindGroup 查找分组
func (m *Manager) FindGroup(name string) (*Group, bool) {
func (m *ConnManager) FindGroup(name string) (*Group, bool) {
m.RLock()
defer m.RUnlock()
@ -88,7 +88,7 @@ func (m *Manager) FindGroup(name string) (*Group, bool) {
}
// FindConn 根据连接ID找到连接
func (m *Manager) FindConn(id int64) (*Connection, bool) {
func (m *ConnManager) FindConn(id int64) (*Connection, bool) {
m.RLock()
defer m.RUnlock()
@ -97,7 +97,7 @@ func (m *Manager) FindConn(id int64) (*Connection, bool) {
}
// FindConnByUID 根据连接绑定的UID找到连接
func (m *Manager) FindConnByUID(uid string) (*Connection, bool) {
func (m *ConnManager) FindConnByUID(uid string) (*Connection, bool) {
m.RLock()
defer m.RUnlock()
@ -111,7 +111,7 @@ func (m *Manager) FindConnByUID(uid string) (*Connection, bool) {
// PeekConn 循环所有连接
// fn 返回true跳过循环反之一直循环
func (m *Manager) PeekConn(fn func(id int64, c *Connection) bool) {
func (m *ConnManager) PeekConn(fn func(id int64, c *Connection) bool) {
m.RLock()
defer m.RUnlock()

@ -1,4 +1,4 @@
package connection
package conn
import (
"git.noahlan.cn/noahlan/ntool/nlog"

@ -1,4 +1,4 @@
package connection
package conn
import (
"fmt"

@ -0,0 +1,27 @@
package conn
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusPending 等待工作阶段
StatusPending
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type ConnType int
const (
ConnTypeTCP ConnType = iota // TCP connection
ConnTypeWS // Websocket connection
ConnTypeSerial // Websocket connection
)
type PendingMessage struct {
Header any
Payload any
}

@ -1,4 +1,4 @@
package connection
package conn
import (
"github.com/gorilla/websocket"

@ -1,337 +0,0 @@
package connection
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/session"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/panjf2000/ants/v2"
"net"
"sync/atomic"
)
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
)
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusPending 等待工作阶段
StatusPending
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type ConnType int
const (
ConnTypeTCP ConnType = iota // TCP connection
ConnTypeWS // Websocket connection
ConnTypeSerial // Websocket connection
)
type (
Connection struct {
conf Config // 配置
session *session.Session // Session
pool *ants.Pool // 连接池
status int32 // 连接状态
conn net.Conn // low-level conn fd
typ ConnType // 连接类型
packer packet.Packer // 封包、拆包器
serializer ndef.Serializer // 消息序列化/反序列化器
pipeline Pipeline // 连接生命周期管理
handleFn func(conn *Connection, pkg packet.IPacket) // 消息处理方法
lastMid uint64 // 最近一次消息ID
chDie chan struct{} // 停止通道
chSend chan PendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
}
packetFn func(conn *Connection, pkg packet.IPacket)
Config struct {
LogDebug bool
LogPrefix string
}
PendingMessage struct {
header any
payload any
}
)
func NewConnection(
id int64,
conn net.Conn,
pool *ants.Pool,
conf Config,
packerBuilder packet.PackerBuilder,
serializer ndef.Serializer,
pipeline Pipeline,
handleFn packetFn) *Connection {
r := &Connection{
conf: conf,
session: session.NewSession(id),
pool: pool,
status: StatusStart,
conn: conn,
typ: ConnTypeTCP,
packer: packerBuilder(),
serializer: serializer,
pipeline: pipeline,
handleFn: handleFn,
lastMid: 0,
chDie: make(chan struct{}),
chSend: make(chan PendingMessage, 128),
chWrite: make(chan []byte, 128),
}
if _, ok := conn.(*WSConn); ok {
r.typ = ConnTypeWS
return r
}
if _, ok := conn.(*SerialConn); ok {
r.typ = ConnTypeSerial
return r
}
return r
}
func (r *Connection) Send(header, payload any) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chSend <- PendingMessage{
header: header,
payload: payload,
}
return err
}
func (r *Connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chWrite <- data
return err
}
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *Connection) Conn() (net.Conn, ConnType) {
return r.conn, r.typ
}
func (r *Connection) ID() int64 {
return r.session.ID()
}
func (r *Connection) Session() *session.Session {
return r.session
}
func (r *Connection) LastMID() uint64 {
return r.lastMid
}
func (r *Connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
}
func (r *Connection) Serve() {
_ = r.pool.Submit(func() {
r.write()
})
_ = r.pool.Submit(func() {
r.read()
})
}
func (r *Connection) write() {
defer func() {
close(r.chSend)
close(r.chWrite)
_ = r.Close()
if r.conf.LogDebug {
nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s",
r.conf.LogPrefix, r.ID(), r.session.UID())
}
}()
for {
select {
case data := <-r.chSend:
// marshal packet body (data)
if r.serializer == nil {
if _, ok := data.payload.([]byte); !ok {
nlog.Errorf("%s serializer is nil, but payload type not []byte", r.conf.LogPrefix)
break
}
} else {
payload, err := r.serializer.Marshal(data.payload)
if err != nil {
nlog.Errorf("%s message body marshal err: %v", r.conf.LogPrefix, err)
break
}
data.payload = payload
}
// invoke pipeline
if pipe := r.pipeline; pipe != nil {
err := pipe.Outbound().Process(r, data)
if err != nil {
nlog.Errorf("%s pipeline err: %s", r.conf.LogPrefix, err.Error())
}
}
// packet pack data
p, err := r.packer.Pack(data.header, data.payload.([]byte))
if err != nil {
nlog.Errorf("%s pack err: %s", r.conf.LogPrefix, err.Error())
break
}
r.chWrite <- p
case data := <-r.chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
nlog.Errorf("%s write data err: %s", r.conf.LogPrefix, err.Error())
break
}
//nlog.Debugf("write data %v", data)
case <-r.chDie: // connection close signal
return
// TODO
//case <-r.ngin.dieChan: // application quit signal
// return
}
}
}
func (r *Connection) read() {
defer func() {
_ = r.Close()
}()
buf := make([]byte, 4096)
var wsConn *WSConn
if r.typ == ConnTypeWS {
wsConn = r.conn.(*WSConn)
}
for {
var (
err error
n int
msgTyp int
)
if r.typ == ConnTypeWS {
var bb []byte
if msgTyp, bb, err = wsConn.ReadMessage(); err == nil {
copy(buf, bb)
n = len(bb)
}
} else {
n, err = r.conn.Read(buf)
}
if err != nil {
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
r.conf.LogPrefix, err.Error())
return
}
if n == 0 {
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
r.conf.LogPrefix)
return
}
if r.packer == nil {
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.conf.LogPrefix)
return
}
//nlog.Debugf("receive data %v", buf[:n])
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := r.packer.Unpack(msgTyp, buf[:n])
if err != nil {
nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error())
}
// packets 处理
for _, p := range packets {
if err := r.processPacket(p); err != nil {
nlog.Errorf("%s process packet err: %s", r.conf.LogPrefix, err.Error())
continue
}
}
}
}
func (r *Connection) processPacket(packet packet.IPacket) error {
if pipe := r.pipeline; pipe != nil {
err := pipe.Inbound().Process(r, packet)
if err != nil {
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
}
}
if r.Status() == StatusWorking {
// 处理包消息
_ = r.pool.Submit(func() {
r.handleFn(r, packet)
})
}
return nil
}
func (r *Connection) DieChan() chan struct{} {
return r.chDie
}
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
if r.conf.LogDebug {
nlog.Debugf("%s close connection, ID: %d", r.conf.LogPrefix, r.ID())
}
select {
case <-r.chDie:
default:
close(r.chDie)
}
r.session.Close()
return r.conn.Close()
}

@ -1,83 +0,0 @@
package connection
import (
"sync"
)
type (
Func func(c *Connection, v any) error
// Pipeline 消息管道
Pipeline interface {
Outbound() Channel
Inbound() Channel
}
pipeline struct {
outbound, inbound *pipelineChannel
}
Channel interface {
PushFront(h Func)
PushBack(h Func)
Process(c *Connection, v any) error
}
pipelineChannel struct {
mu sync.RWMutex
handlers []Func
}
)
func NewPipeline() Pipeline {
return &pipeline{
outbound: &pipelineChannel{},
inbound: &pipelineChannel{},
}
}
func (p *pipeline) Outbound() Channel {
return p.outbound
}
func (p *pipeline) Inbound() Channel {
return p.inbound
}
// PushFront 将func压入slice首位
func (p *pipelineChannel) PushFront(h Func) {
p.mu.Lock()
defer p.mu.Unlock()
handlers := make([]Func, len(p.handlers)+1)
handlers[0] = h
copy(handlers[1:], p.handlers)
p.handlers = handlers
}
// PushBack 将func压入slice末位
func (p *pipelineChannel) PushBack(h Func) {
p.mu.Lock()
defer p.mu.Unlock()
p.handlers = append(p.handlers, h)
}
// Process 处理所有的pipeline方法
func (p *pipelineChannel) Process(c *Connection, v any) error {
if len(p.handlers) < 1 {
return nil
}
p.mu.RLock()
defer p.mu.RUnlock()
for _, handler := range p.handlers {
err := handler(c, v)
if err != nil {
return err
}
}
return nil
}

@ -2,17 +2,17 @@ package nnet
import (
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/lifetime"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/nnet/scheduler"
"git.noahlan.cn/noahlan/nnet/session"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/panjf2000/ants/v2"
"math"
"net"
"time"
)
// Engine 引擎
@ -22,37 +22,35 @@ type Engine struct {
routes []rt.Route // 路由
router rt.Router // 消息处理器
dieChan chan struct{} // 应用程序退出信号
pipeline connection.Pipeline // 消息管道
packerBuilder packet.PackerBuilder // 封包、拆包器
serializer ndef.Serializer // 消息 序列化/反序列化
goPool *ants.Pool // goroutine池
connManager *connection.Manager // 连接管理器
lifetime *lifetime.Mgr // 生命周期
pool *ants.Pool // goroutine池
connMgr *conn.ConnManager // 连接管理器
evtMgr event.EventManager // 事件管理器
sessIdMgr *session.IDMgr // SessionId管理器
}
func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
ngin := &Engine{
EngineConf: conf,
middlewares: make([]rt.Middleware, 0),
routes: make([]rt.Route, 0),
router: rt.NewDefaultRouter(),
packerBuilder: nil,
serializer: nil,
dieChan: make(chan struct{}),
pipeline: connection.NewPipeline(),
connManager: connection.NewManager(),
lifetime: lifetime.NewLifetime(),
sessIdMgr: session.NewSessionIDMgr(),
goPool: nil,
EngineConf: conf,
middlewares: make([]rt.Middleware, 0),
routes: make([]rt.Route, 0),
router: rt.NewDefaultRouter(),
dieChan: make(chan struct{}),
connMgr: conn.NewConnManager(),
evtMgr: event.NewEventManager(),
sessIdMgr: session.NewSessionIDMgr(),
packerBuilder: func() packet.Packer {
return nil
},
}
for _, opt := range opts {
opt(ngin)
}
if ngin.goPool == nil {
ngin.goPool, _ = ants.NewPool(math.MaxInt32)
if ngin.pool == nil {
ngin.pool, _ = ants.NewPool(math.MaxInt32)
}
return ngin
@ -91,46 +89,229 @@ func (ngin *Engine) setup() error {
if err := ngin.bindRoutes(); err != nil {
return err
}
if err := ngin.goPool.Submit(func() {
scheduler.Schedule(ngin.TaskTimerPrecision)
}); err != nil {
return err
}
return nil
}
func (ngin *Engine) Stop() {
nlog.Infof("%s is stopping...", ngin.LogPrefix())
nlog.Infof("%s server is stopping...", ngin.LogPrefix())
ngin.connMgr.PeekConn(func(_ int64, c *conn.Connection) bool {
_ = c.Close()
return false
})
close(ngin.dieChan)
scheduler.Close()
}
func (ngin *Engine) handle(conn net.Conn) *connection.Connection {
nc := connection.NewConnection(
ngin.sessIdMgr.SessionID(),
conn,
ngin.goPool,
connection.Config{LogDebug: ngin.ShallLogDebug(), LogPrefix: ngin.LogPrefix()},
ngin.packerBuilder, ngin.serializer, ngin.pipeline,
ngin.router.Handle,
)
func (ngin *Engine) handle(rawC net.Conn) *conn.Connection {
nc := conn.NewConnection(ngin.sessIdMgr.SessionID(), rawC)
ngin.evtMgr.OnConnected(nc)
nc.Serve()
ngin.serveConn(nc, ngin.packerBuilder())
err := ngin.connManager.Store(connection.DefaultGroupName, nc)
err := ngin.connMgr.Store(conn.DefaultGroupName, nc)
nlog.Must(err)
return nc
}
// dieChan
go func() {
// lifetime
ngin.lifetime.Open(nc)
func (ngin *Engine) serveConn(nc *conn.Connection, packer packet.Packer) {
_ = ngin.pool.Submit(func() {
ngin.readLoop(nc, packer)
})
_ = ngin.pool.Submit(func() {
ngin.writeLoop(nc, packer)
})
_ = ngin.pool.Submit(func() {
select {
case <-nc.DieChan():
scheduler.PushTask(func() { ngin.lifetime.Close(nc) })
_ = ngin.connManager.Remove(nc)
case <-nc.ChDie():
if ngin.ShallLogDebug() {
nlog.Debugf("%s Close connection, ID=%d, Remote=%s", ngin.LogPrefix(), nc.ID(), nc.Conn().RemoteAddr().String())
}
_ = ngin.connMgr.Remove(nc)
ngin.evtMgr.OnClose(nc)
}
})
}
func (ngin *Engine) readLoop(nc *conn.Connection, packer packet.Packer) {
defer func() {
_ = nc.Close()
//if ngin.ShallLogDebug() {
// nlog.Debugf("%s [readLoop] connection read goroutine exit, ID=%d, UID=%s, Remote=%s",
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
//}
}()
return nc
buf := make([]byte, 4096)
for {
select {
case <-nc.ChDie(): // connection close signal
return
default:
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.ReadDeadline != 0 {
_ = nc.Conn().SetReadDeadline(time.Now().Add(ngin.ReadDeadline))
}
var (
err error
n int
msgTyp int
)
// 兼容websocket
if nc.Type() == conn.ConnTypeWS {
var bb []byte
if msgTyp, bb, err = nc.WsConn().ReadMessage(); err == nil {
copy(buf, bb)
n = len(bb)
}
} else {
n, err = nc.Conn().Read(buf)
}
if err != nil {
ngin.evtMgr.OnDisconnected(nc, err)
// TODO 断线重连 (仅限客户端)
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
ngin.LogPrefix(), err.Error())
return
}
if n == 0 {
ngin.evtMgr.OnReceiveError(nc, conn.ErrReceiveZero)
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
ngin.LogPrefix())
return
}
// 兼容websocket
if nc.Type() == conn.ConnTypeWS {
ngin.processPacket(nc, packet.NewWSPacket(msgTyp, buf[:n]))
} else {
if packer == nil {
ngin.evtMgr.OnReceiveError(nc, conn.ErrNoPacker)
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", ngin.LogPrefix())
return
}
//nlog.Debugf("receive data %v", buf[:n])
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := packer.Unpack(buf[:n])
if err != nil {
ngin.evtMgr.OnReceiveError(nc, conn.ErrUnpack)
nlog.Errorf("%s unpack err: %s", ngin.LogPrefix(), err.Error())
}
// packets 处理
for _, p := range packets {
ngin.processPacket(nc, p)
}
}
}
}
}
func (ngin *Engine) writeLoop(nc *conn.Connection, packer packet.Packer) {
defer func() {
_ = nc.Close()
//if ngin.ShallLogDebug() {
// nlog.Debugf("%s [writeLoop] connection write goroutine exit, ID=%d, UID=%s, Remote=%s",
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
//}
}()
for {
select {
case data := <-nc.ChSend():
// marshal packet body (data)
if ngin.serializer == nil {
if _, ok := data.Payload.([]byte); !ok {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendPayload)
nlog.Errorf("%s [writeLoop] serializer is nil, but payload type not []byte", ngin.LogPrefix())
break
}
} else {
payload, err := ngin.serializer.Marshal(data.Payload)
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendMarshal)
nlog.Errorf("%s [writeLoop] message body marshal err: %v", ngin.LogPrefix(), err)
break
}
data.Payload = payload
}
// 对websocket的兼容
if nc.Type() == conn.ConnTypeWS {
messageTyp, ok := data.Header.(int)
if !ok {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendWSType)
nlog.Errorf("%s [writeLoop] websocket message type not found", ngin.LogPrefix())
break
}
// deadline
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.WriteDeadline != 0 {
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
}
err := nc.WsConn().WriteMessage(messageTyp, data.Payload.([]byte))
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
break
}
// event
ngin.evtMgr.OnSend(nc, data)
} else {
// packet pack data
if packer == nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrNoPacker)
nlog.Errorf("%s [writeLoop] unexpected error: packer is nil", ngin.LogPrefix())
break
}
p, err := packer.Pack(data.Header, data.Payload.([]byte))
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrPack)
nlog.Errorf("%s [writeLoop] pack err: %v", ngin.LogPrefix(), err)
break
}
nc.ChWrite() <- p
}
case data := <-nc.ChWrite():
// 回写数据
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.WriteDeadline != 0 {
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
}
if _, err := nc.Conn().Write(data); err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
break
}
// event
ngin.evtMgr.OnSend(nc, data)
//nlog.Debugf("write data %v", data)
case <-nc.ChDie(): // connection close signal
return
}
}
}
func (ngin *Engine) processPacket(nc *conn.Connection, p packet.IPacket) {
// event
ngin.evtMgr.OnReceive(nc, p)
if nc.Status() == conn.StatusWorking {
// 处理包消息
_ = ngin.pool.Submit(func() {
ngin.router.Handle(nc, p)
})
}
}

@ -0,0 +1,292 @@
package event
import (
"errors"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
)
var ErrEventTypeIllegal = errors.New("EventType illegal")
type EvtType string
const (
EvtOnConnected = "OnConnected"
EvtOnConnectError = "OnConnectError"
EvtOnDisconnected = "OnDisconnected"
EvtOnClose = "OnClose"
EvtOnSend = "OnSend"
EvtOnSendError = "OnSendError"
EvtOnReceive = "OnReceive"
EvtOnReceiveError = "OnReceiveError"
)
type (
OnConnectedFn func(nc *conn.Connection)
OnConnectErrorFn func(err error)
OnDisconnectedFn func(nc *conn.Connection, err error)
OnCloseFn func(conn *conn.Connection)
OnSendFn func(nc *conn.Connection, v any)
OnSendErrorFn func(nc *conn.Connection, v any, err error)
OnReceiveFn func(nc *conn.Connection, p packet.IPacket)
OnReceiveErrorFn func(nc *conn.Connection, err error)
Event interface {
// OnConnected 连接成功回调
OnConnected(nc *conn.Connection)
// OnConnectError 连接异常回调, 在准备进行连接的过程中发生异常时触发
OnConnectError(err error)
// OnDisconnected 连接断开回调,网络异常,服务端掉线等情况时触发
OnDisconnected(nc *conn.Connection, err error)
// OnClose 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发
OnClose(nc *conn.Connection)
// OnSend 消息发送回调,消息序列化后的回调
OnSend(nc *conn.Connection, v any)
// OnSendError 发送消息异常回调
OnSendError(nc *conn.Connection, v any, err error)
// OnReceive 消息接收回调,消息解包后的回调
OnReceive(nc *conn.Connection, p packet.IPacket)
// OnReceiveError 接收消息异常回调
OnReceiveError(nc *conn.Connection, err error)
}
EventManager interface {
Event
// RegisterEventFront 向头部注册事件处理器
RegisterEventFront(evtType EvtType, fn any)
// RegisterEvent 注册事件处理器
RegisterEvent(evtType EvtType, fn any)
}
eventManager struct {
onConnected []OnConnectedFn
onConnectError []OnConnectErrorFn
onDisconnected []OnDisconnectedFn
onClose []OnCloseFn
onSend []OnSendFn
onSendError []OnSendErrorFn
onReceive []OnReceiveFn
onReceiveError []OnReceiveErrorFn
}
)
///////////////// type-align
var _ Event = (*eventManager)(nil)
func NewEventManager() EventManager {
return &eventManager{
onConnected: make([]OnConnectedFn, 0),
onConnectError: make([]OnConnectErrorFn, 0),
onDisconnected: make([]OnDisconnectedFn, 0),
onClose: make([]OnCloseFn, 0),
onSend: make([]OnSendFn, 0),
onSendError: make([]OnSendErrorFn, 0),
onReceive: make([]OnReceiveFn, 0),
onReceiveError: make([]OnReceiveErrorFn, 0),
}
}
func (m *eventManager) registerEvent(evtType EvtType, fn any, front bool) {
switch evtType {
case EvtOnConnected:
if f, ok := fn.(OnConnectedFn); ok {
if front {
fns := make([]OnConnectedFn, len(m.onConnected)+1)
fns[0] = f
copy(fns[1:], m.onConnected)
m.onConnected = fns
} else {
m.onConnected = append(m.onConnected, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnConnectError:
if f, ok := fn.(OnConnectErrorFn); ok {
if front {
fns := make([]OnConnectErrorFn, len(m.onConnectError)+1)
fns[0] = f
copy(fns[1:], m.onConnectError)
m.onConnectError = fns
} else {
m.onConnectError = append(m.onConnectError, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnDisconnected:
if f, ok := fn.(OnDisconnectedFn); ok {
if front {
fns := make([]OnDisconnectedFn, len(m.onDisconnected)+1)
fns[0] = f
copy(fns[1:], m.onDisconnected)
m.onDisconnected = fns
} else {
m.onDisconnected = append(m.onDisconnected, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnClose:
if f, ok := fn.(OnCloseFn); ok {
if front {
fns := make([]OnCloseFn, len(m.onClose)+1)
fns[0] = f
copy(fns[1:], m.onClose)
m.onClose = fns
} else {
m.onClose = append(m.onClose, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnSend:
if f, ok := fn.(OnSendFn); ok {
if front {
fns := make([]OnSendFn, len(m.onSend)+1)
fns[0] = f
copy(fns[1:], m.onSend)
m.onSend = fns
} else {
m.onSend = append(m.onSend, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnSendError:
if f, ok := fn.(OnSendErrorFn); ok {
if front {
fns := make([]OnSendErrorFn, len(m.onSendError)+1)
fns[0] = f
copy(fns[1:], m.onSendError)
m.onSendError = fns
} else {
m.onSendError = append(m.onSendError, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnReceive:
if f, ok := fn.(OnReceiveFn); ok {
if front {
fns := make([]OnReceiveFn, len(m.onReceive)+1)
fns[0] = f
copy(fns[1:], m.onReceive)
m.onReceive = fns
} else {
m.onReceive = append(m.onReceive, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
case EvtOnReceiveError:
if f, ok := fn.(OnReceiveErrorFn); ok {
if front {
fns := make([]OnReceiveErrorFn, len(m.onReceiveError)+1)
fns[0] = f
copy(fns[1:], m.onReceiveError)
m.onReceiveError = fns
} else {
m.onReceiveError = append(m.onReceiveError, f)
}
} else {
nlog.Error(ErrEventTypeIllegal)
return
}
}
nlog.Infof("Register event [EvtType: %s] successfully", evtType)
}
func (m *eventManager) RegisterEventFront(evtType EvtType, fn any) {
m.registerEvent(evtType, fn, true)
}
func (m *eventManager) RegisterEvent(evtType EvtType, fn any) {
m.registerEvent(evtType, fn, false)
}
func (m *eventManager) OnConnected(nc *conn.Connection) {
if len(m.onConnected) == 0 {
return
}
for _, fn := range m.onConnected {
fn(nc)
}
}
func (m *eventManager) OnConnectError(err error) {
if len(m.onConnectError) == 0 {
return
}
for _, fn := range m.onConnectError {
fn(err)
}
}
func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) {
if len(m.onDisconnected) == 0 {
return
}
for _, fn := range m.onDisconnected {
fn(nc, err)
}
}
func (m *eventManager) OnClose(nc *conn.Connection) {
if len(m.onClose) == 0 {
return
}
for _, fn := range m.onClose {
fn(nc)
}
}
func (m *eventManager) OnSend(nc *conn.Connection, v any) {
if len(m.onSend) == 0 {
return
}
for _, fn := range m.onSend {
fn(nc, v)
}
}
func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) {
if len(m.onSendError) == 0 {
return
}
for _, fn := range m.onSendError {
fn(nc, v, err)
}
}
func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) {
if len(m.onReceive) == 0 {
return
}
for _, fn := range m.onReceive {
fn(nc, p)
}
}
func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) {
if len(m.onReceiveError) == 0 {
return
}
for _, fn := range m.onReceiveError {
fn(nc, err)
}
}

@ -1,39 +0,0 @@
package event
import "git.noahlan.cn/noahlan/nnet/connection"
type (
ConnFn func(conn *connection.Connection)
ErrFn func(err error)
// ConnEvents 连接事件
ConnEvents interface {
// OnConnected 连接成功回调
OnConnected(h ConnFn)
// OnConnectError 连接异常回调, 在准备进行连接的过程中发生异常时触发
OnConnectError(err error)
// OnDisconnected 连接断开回调,网络异常,服务端掉线等情况时触发
OnDisconnected(conn *connection.Connection, err error)
// OnClose 连接关闭回调,服务端发起关闭信号或客户端主动关闭时触发
OnClose(details any, err error)
}
// MessageEvents 消息事件
MessageEvents interface {
// OnSentError 发送消息异常回调
OnSentError(details any, err error)
// OnReceiveError 接收消息异常回调
OnReceiveError(details any, err error)
}
)
type Manager struct {
ConnEvents
MessageEvents
onConnected []OnConnectedFn
}
func NewEventManager() *Manager {
return &Manager{}
}

@ -16,6 +16,7 @@ require (
require (
github.com/gofrs/uuid/v5 v5.0.0 // indirect
github.com/gookit/color v1.5.3 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect

@ -18,6 +18,8 @@ github.com/gookit/color v1.5.3 h1:twfIhZs4QLCtimkP7MOxlF3A0U/5cDPseRT9M/+2SCE=
github.com/gookit/color v1.5.3/go.mod h1:NUzwzeehUfl7GIb36pqId+UGmRfQcU/WiiyTTeNjHtE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=

@ -1,54 +0,0 @@
package lifetime
import (
"git.noahlan.cn/noahlan/nnet/connection"
)
type (
Handler func(conn *connection.Connection)
Lifetime interface {
OnClosed(h Handler)
OnOpen(h Handler)
}
Mgr struct {
onOpen []Handler
onClosed []Handler
}
)
func NewLifetime() *Mgr {
return &Mgr{
onOpen: make([]Handler, 0),
onClosed: make([]Handler, 0),
}
}
func (lt *Mgr) OnClosed(h Handler) {
lt.onClosed = append(lt.onClosed, h)
}
func (lt *Mgr) OnOpen(h Handler) {
lt.onOpen = append(lt.onOpen, h)
}
func (lt *Mgr) Open(conn *connection.Connection) {
if len(lt.onOpen) <= 0 {
return
}
for _, handler := range lt.onOpen {
handler(conn)
}
}
func (lt *Mgr) Close(conn *connection.Connection) {
if len(lt.onClosed) <= 0 {
return
}
for _, handler := range lt.onClosed {
handler(conn)
}
}

@ -2,7 +2,8 @@ package middleware
import (
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/ntool/nlog"
@ -13,10 +14,10 @@ import (
type HeartbeatMiddleware struct {
lastAt int64
interval time.Duration
hbdFn func(conn *connection.Connection) []byte
hbdFn func(conn *conn.Connection) []byte
}
func WithHeartbeat(interval time.Duration, hbdFn func(conn *connection.Connection) []byte) nnet.RunOption {
func WithHeartbeat(interval time.Duration, hbdFn func(conn *conn.Connection) []byte) nnet.RunOption {
m := &HeartbeatMiddleware{
lastAt: time.Now().Unix(),
interval: interval,
@ -28,10 +29,10 @@ func WithHeartbeat(interval time.Duration, hbdFn func(conn *connection.Connectio
}
return func(ngin *nnet.Engine) {
ngin.Lifetime().OnOpen(m.start)
ngin.EventManager().RegisterEvent(event.EvtOnConnected, m.start)
ngin.Use(func(next rt.HandlerFunc) rt.HandlerFunc {
return func(conn *connection.Connection, pkg packet.IPacket) {
return func(conn *conn.Connection, pkg packet.IPacket) {
m.handle(conn, pkg)
next(conn, pkg)
@ -40,7 +41,7 @@ func WithHeartbeat(interval time.Duration, hbdFn func(conn *connection.Connectio
}
}
func (m *HeartbeatMiddleware) start(conn *connection.Connection) {
func (m *HeartbeatMiddleware) start(conn *conn.Connection) {
ticker := time.NewTicker(m.interval)
defer func() {
@ -64,6 +65,6 @@ func (m *HeartbeatMiddleware) start(conn *connection.Connection) {
}
}
func (m *HeartbeatMiddleware) handle(_ *connection.Connection, _ packet.IPacket) {
func (m *HeartbeatMiddleware) handle(_ *conn.Connection, _ packet.IPacket) {
atomic.StoreInt64(&m.lastAt, time.Now().Unix())
}

@ -1,8 +1,8 @@
package nnet
import (
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/lifetime"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/ntool/ndef"
@ -16,19 +16,14 @@ type (
RunOption func(ngin *Engine)
)
// Pipeline returns inner pipeline
func (ngin *Engine) Pipeline() connection.Pipeline {
return ngin.pipeline
}
// Lifetime returns lifetime interface.
func (ngin *Engine) Lifetime() lifetime.Lifetime {
return ngin.lifetime
// EventManager returns EventManager.
func (ngin *Engine) EventManager() event.EventManager {
return ngin.evtMgr
}
// ConnManager returns connection manager
func (ngin *Engine) ConnManager() *connection.Manager {
return ngin.connManager
func (ngin *Engine) ConnManager() *conn.ConnManager {
return ngin.connMgr
}
//////////////////////// Options
@ -82,33 +77,13 @@ func WithSerializer(s ndef.Serializer) RunOption {
// WithPool 设置使用自定义的工作池
func WithPool(pl *ants.Pool) RunOption {
return func(ngin *Engine) {
ngin.goPool = pl
ngin.pool = pl
}
}
// WithPoolCfg 设置工作池配置
func WithPoolCfg(cfg npool.Config) RunOption {
return func(ngin *Engine) {
ngin.goPool, _ = ants.NewPool(cfg.PoolSize, ants.WithOptions(cfg.Options()))
}
}
//////////////////// Pipeline
// WithPipeline 使用自定义 pipeline
func WithPipeline(pipeline connection.Pipeline) RunOption {
return func(ngin *Engine) {
ngin.pipeline = pipeline
}
}
type PipelineOption func(opts connection.Pipeline)
// WithPipelineOpt 使用默认Pipeline并设置其配置
func WithPipelineOpt(opts ...func(connection.Pipeline)) RunOption {
return func(ngin *Engine) {
for _, opt := range opts {
opt(ngin.pipeline)
}
ngin.pool, _ = ants.NewPool(cfg.PoolSize, ants.WithOptions(cfg.Options()))
}
}

@ -1,14 +0,0 @@
package packet
// Entry 入口原始数据
type Entry struct {
Header any
Raw []byte
}
func NewEntry(header any, raw []byte) *Entry {
return &Entry{
Header: header,
Raw: raw,
}
}

@ -7,7 +7,7 @@ type (
Pack(header any, data []byte) ([]byte, error)
// Unpack 解包
Unpack(header any, data []byte) ([]IPacket, error)
Unpack(data []byte) ([]IPacket, error)
}
// PackerBuilder Packer构建器

@ -0,0 +1,37 @@
package packet
import "fmt"
type WSPacket struct {
MessageType int
Len uint64
Raw []byte
}
func NewWSPacket(typ int, data []byte) IPacket {
l := len(data)
raw := make([]byte, l)
copy(raw, data)
return &WSPacket{
MessageType: typ,
Len: uint64(l),
Raw: raw,
}
}
func (p *WSPacket) GetHeader() any {
return p.MessageType
}
func (p *WSPacket) GetLen() uint64 {
return p.Len
}
func (p *WSPacket) GetBody() []byte {
return p.Raw
}
func (p *WSPacket) String() string {
return fmt.Sprintf("MessageType=%d, Len=%d, RawStr=%s", p.MessageType, p.Len, string(p.Raw))
}

@ -0,0 +1,70 @@
package nnet
import (
"encoding/json"
"fmt"
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
)
type OnReadyFunc func()
func WithNNetClientEvents(onReady OnReadyFunc, packer packet.Packer) nnet.RunOption {
return func(ngin *nnet.Engine) {
ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onReceiveEvent(ngin, onReady, packer))
}
}
func onReceiveEvent(ngin *nnet.Engine, onReady OnReadyFunc, packer packet.Packer) event.OnReceiveFn {
return func(nc *conn.Connection, p packet.IPacket) {
pkg, ok := p.(*Packet)
if !ok {
nlog.Error(packet.ErrWrongPacketType)
return
}
// Server to client
switch pkg.PacketType {
case Handshake:
var handshakeData HandshakeResp
err := json.Unmarshal(pkg.Data, &handshakeData)
nlog.Must(err)
hrd, _ := packer.Pack(Header{
PacketType: HandshakeAck,
MessageHeader: MessageHeader{},
}, nil)
if err := nc.SendBytes(hrd); err != nil {
return
}
nc.SetStatus(conn.StatusWorking)
// onReady
if onReady != nil {
onReady()
}
if ngin.ShallLogDebug() {
nlog.Debugf("connection handshake Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr())
}
case Kick:
_ = nc.Close()
case Data:
status := nc.Status()
if status != conn.StatusWorking {
nlog.Errorf(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
nc.Conn().RemoteAddr()))
return
}
var lastMid uint64
switch pkg.MsgType {
case Response:
lastMid = pkg.ID
case Notify:
lastMid = 0
}
nc.SetLastMID(lastMid)
}
}
}

@ -1,65 +0,0 @@
package nnet
import (
"encoding/json"
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
)
type OnReadyFunc func()
func WithNNetClientPipeline(onReady OnReadyFunc, packer packet.Packer) nnet.RunOption {
return func(ngin *nnet.Engine) {
ngin.Pipeline().Inbound().PushFront(func(conn *connection.Connection, v any) error {
pkg, ok := v.(*Packet)
if !ok {
return packet.ErrWrongPacketType
}
nc, _ := conn.Conn()
// Server to client
switch pkg.PacketType {
case Handshake:
var handshakeData HandshakeResp
err := json.Unmarshal(pkg.Data, &handshakeData)
nlog.Must(err)
hrd, _ := packer.Pack(Header{
PacketType: HandshakeAck,
MessageHeader: MessageHeader{},
}, nil)
if err := conn.SendBytes(hrd); err != nil {
return err
}
conn.SetStatus(connection.StatusWorking)
// onReady
if onReady != nil {
onReady()
}
nlog.Debugf("connection handshake Id=%d, Remote=%s", conn.Session().ID(), nc.RemoteAddr())
case Kick:
_ = conn.Close()
case Data:
status := conn.Status()
if status != connection.StatusWorking {
return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
nc.RemoteAddr()))
}
var lastMid uint64
switch pkg.MsgType {
case Response:
lastMid = pkg.ID
case Notify:
lastMid = 0
}
conn.SetLastMID(lastMid)
}
return nil
})
}
}

@ -0,0 +1,85 @@
package nnet
import (
"encoding/json"
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
)
type (
HandshakeValidatorFunc func(*HandshakeReq) error
HandshakeAckPayloadFunc func() any
)
func withNNetEvents(
handshakeResp *HandshakeResp,
validator HandshakeValidatorFunc,
packer packet.Packer,
) nnet.RunOption {
return func(ngin *nnet.Engine) {
ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onServerReceiveEvent(handshakeResp, validator, packer))
}
}
func onServerReceiveEvent(
handshakeResp *HandshakeResp,
validator HandshakeValidatorFunc,
packer packet.Packer, ) event.OnReceiveFn {
return func(nc *conn.Connection, p packet.IPacket) {
pkg, ok := p.(*Packet)
if !ok {
nlog.Error(packet.ErrWrongPacketType)
return
}
switch pkg.PacketType {
case Handshake:
var handshakeData HandshakeReq
err := json.Unmarshal(pkg.Data, &handshakeData)
nlog.Must(err)
if err := validator(&handshakeData); err != nil {
nlog.Error(err)
return
}
handshakeResp.Payload = handshakeData.Payload
data, err := json.Marshal(handshakeResp)
nlog.Must(err)
hrd, _ := packer.Pack(Header{
PacketType: Handshake,
MessageHeader: MessageHeader{},
}, data)
if err := nc.SendBytes(hrd); err != nil {
nlog.Error(err)
return
}
nc.SetStatus(conn.StatusPrepare)
nlog.Debugf("connection handshake Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr())
case HandshakeAck:
nc.SetStatus(conn.StatusPending)
nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr())
case Data:
if nc.Status() < conn.StatusPending {
nlog.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
nc.Conn().RemoteAddr())
return
}
nc.SetStatus(conn.StatusWorking)
var lastMid uint64
switch pkg.MsgType {
case Request:
lastMid = pkg.ID
case Notify:
lastMid = 0
default:
nlog.Errorf("Invalid message type: %s ", pkg.MsgType.String())
}
nc.SetLastMID(lastMid)
}
}
}

@ -2,7 +2,7 @@ package nnet
import (
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/middleware"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
@ -40,7 +40,7 @@ func WithNNetClientProtocol(onReady OnReadyFunc) []nnet.RunOption {
router := NewRouter().(*nRouter)
packer := NewPacker(router.routeMap)
opts := []nnet.RunOption{
WithNNetClientPipeline(onReady, packer),
WithNNetClientEvents(onReady, packer),
nnet.WithRouter(router),
nnet.WithPackerBuilder(func() packet.Packer { return NewPacker(router.routeMap) }),
}
@ -62,7 +62,7 @@ func WithNNetProtocol(config Config) []nnet.RunOption {
packer := NewPacker(router.routeMap)
opts := []nnet.RunOption{
withNNetPipeline(handshakeAckData, config.HandshakeValidator, packer),
withNNetEvents(handshakeAckData, config.HandshakeValidator, packer),
nnet.WithRouter(router),
nnet.WithPackerBuilder(func() packet.Packer { return NewPacker(router.routeMap) }),
}
@ -71,7 +71,7 @@ func WithNNetProtocol(config Config) []nnet.RunOption {
hbd, err := packer.Pack(Heartbeat, nil)
nlog.Must(err)
opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ *connection.Connection) []byte {
opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ *conn.Connection) []byte {
return hbd
}))
}

@ -1,78 +0,0 @@
package nnet
import (
"encoding/json"
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
)
type (
HandshakeValidatorFunc func(*HandshakeReq) error
HandshakeAckPayloadFunc func() any
)
func withNNetPipeline(
handshakeResp *HandshakeResp,
validator HandshakeValidatorFunc,
packer packet.Packer,
) nnet.RunOption {
return func(ngin *nnet.Engine) {
ngin.Pipeline().Inbound().PushFront(func(conn *connection.Connection, v any) error {
pkg, ok := v.(*Packet)
if !ok {
return packet.ErrWrongPacketType
}
nc, _ := conn.Conn()
switch pkg.PacketType {
case Handshake:
var handshakeData HandshakeReq
err := json.Unmarshal(pkg.Data, &handshakeData)
nlog.Must(err)
if err := validator(&handshakeData); err != nil {
return err
}
handshakeResp.Payload = handshakeData.Payload
data, err := json.Marshal(handshakeResp)
nlog.Must(err)
hrd, _ := packer.Pack(Header{
PacketType: Handshake,
MessageHeader: MessageHeader{},
}, data)
if err := conn.SendBytes(hrd); err != nil {
return err
}
conn.SetStatus(connection.StatusPrepare)
nlog.Debugf("connection handshake Id=%d, Remote=%s", conn.Session().ID(), nc.RemoteAddr())
case HandshakeAck:
conn.SetStatus(connection.StatusPending)
nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", conn.Session().ID(), nc.RemoteAddr())
case Data:
if conn.Status() < connection.StatusPending {
return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
nc.RemoteAddr()))
}
conn.SetStatus(connection.StatusWorking)
var lastMid uint64
switch pkg.MsgType {
case Request:
lastMid = pkg.ID
case Notify:
lastMid = 0
default:
return fmt.Errorf("Invalid message type: %s ", pkg.MsgType.String())
}
conn.SetLastMID(lastMid)
}
return nil
})
}
}

@ -3,7 +3,7 @@ package nnet
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/ntool/nlog"
@ -42,7 +42,7 @@ func NewRouter() rt.Router {
}
}
func (r *nRouter) Handle(conn *connection.Connection, p packet.IPacket) {
func (r *nRouter) Handle(conn *conn.Connection, p packet.IPacket) {
pkg, ok := p.(*Packet)
if !ok {
nlog.Error(packet.ErrWrongPacketType)

@ -0,0 +1,22 @@
package plain
import (
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
)
func withEvents() nnet.RunOption {
return func(ngin *nnet.Engine) {
ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onReceiveEvent())
}
}
func onReceiveEvent() event.OnReceiveFn {
return func(nc *conn.Connection, _ packet.IPacket) {
if nc.Status() != conn.StatusWorking {
nc.SetStatus(conn.StatusWorking)
}
}
}

@ -1,22 +0,0 @@
package plain
import (
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/packet"
)
func withPipeline() nnet.RunOption {
return func(ngin *nnet.Engine) {
ngin.Pipeline().Inbound().PushFront(func(conn *connection.Connection, v any) error {
_, ok := v.(*Packet)
if !ok {
return packet.ErrWrongPacketType
}
if conn.Status() != connection.StatusWorking {
conn.SetStatus(connection.StatusWorking)
}
return nil
})
}
}

@ -7,7 +7,7 @@ import (
func WithPlainProtocol() []nnet.RunOption {
opts := []nnet.RunOption{
withPipeline(),
withEvents(),
nnet.WithRouter(NewRouter()),
nnet.WithPackerBuilder(func() packet.Packer { return NewPacker() }),
}

@ -1,7 +1,7 @@
package plain
import (
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/ntool/nlog"
@ -16,21 +16,16 @@ func NewRouter() router.Router {
return &Router{}
}
func (r *Router) Handle(conn *connection.Connection, pkg packet.IPacket) {
p, ok := pkg.(*Packet)
if !ok {
nlog.Error(packet.ErrWrongPacketType)
return
}
func (r *Router) Handle(nc *conn.Connection, pkg packet.IPacket) {
if r.plainHandler == nil {
if r.notFound == nil {
nlog.Error("message handler not found")
return
}
r.notFound.Handle(conn, p)
r.notFound.Handle(nc, pkg)
return
}
r.plainHandler.Handle(conn, p)
r.plainHandler.Handle(nc, pkg)
}
func (r *Router) Register(_ any, handler router.Handler) error {

@ -1,17 +1,17 @@
package router
import (
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
)
type (
Handler interface {
Handle(c *connection.Connection, pkg packet.IPacket)
Handle(c *conn.Connection, pkg packet.IPacket)
}
// HandlerFunc 消息处理方法
HandlerFunc func(conn *connection.Connection, pkg packet.IPacket)
HandlerFunc func(conn *conn.Connection, pkg packet.IPacket)
Middleware func(next HandlerFunc) HandlerFunc
@ -29,13 +29,13 @@ type (
Constructor func(Handler) Handler
)
func notFound(conn *connection.Connection, _ packet.IPacket) {
func notFound(conn *conn.Connection, _ packet.IPacket) {
nlog.Error("handler not found")
_ = conn.SendBytes([]byte("404"))
}
func NotFoundHandler(next Handler) Handler {
return HandlerFunc(func(c *connection.Connection, packet packet.IPacket) {
return HandlerFunc(func(c *conn.Connection, packet packet.IPacket) {
h := next
if next == nil {
h = HandlerFunc(notFound)
@ -45,7 +45,7 @@ func NotFoundHandler(next Handler) Handler {
})
}
func (f HandlerFunc) Handle(c *connection.Connection, pkg packet.IPacket) {
func (f HandlerFunc) Handle(c *conn.Connection, pkg packet.IPacket) {
f(c, pkg)
}
@ -94,7 +94,7 @@ func NewDefaultRouter() Router {
return &plainRouter{}
}
func (p *plainRouter) Handle(c *connection.Connection, pkg packet.IPacket) {
func (p *plainRouter) Handle(c *conn.Connection, pkg packet.IPacket) {
if p.handler == nil {
return
}

@ -1,7 +1,7 @@
package nnet
import (
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/goburrow/serial"
"sync"
@ -30,7 +30,7 @@ func (ngin *Engine) ListenSerial(conf serial.Config) error {
var wg sync.WaitGroup
wg.Add(1)
ngin.handle(connection.NewSerialConn(port, &conf))
ngin.handle(conn.NewSerialConn(port, &conf))
go func() {
for {

@ -25,7 +25,7 @@ func (ngin *Engine) ListenTCP(conf config.TCPServerConf) error {
ngin.Stop()
}()
for {
conn, err := listener.Accept()
rc, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
nlog.Errorf("%s connection closed, err:%v", ngin.LogPrefix(), err)
@ -35,8 +35,8 @@ func (ngin *Engine) ListenTCP(conf config.TCPServerConf) error {
continue
}
err = ngin.goPool.Submit(func() {
ngin.handle(conn)
err = ngin.pool.Submit(func() {
ngin.handle(rc)
})
if err != nil {
nlog.Errorf("%s submit conn pool err: %ng", ngin.LogPrefix(), err.Error())

@ -3,7 +3,6 @@ package nnet
import (
"fmt"
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/gorilla/websocket"
"net/http"
@ -11,19 +10,22 @@ import (
"strings"
)
type WsConfOption func(conf config.WSServerFullConf)
type WsServerOption func(conf config.WSServerFullConf)
func WithWSCheckOrigin(fn func(*http.Request) bool) WsConfOption {
func WithWSCheckOrigin(fn func(*http.Request) bool) WsServerOption {
return func(conf config.WSServerFullConf) {
conf.CheckOrigin = fn
}
}
// ListenWebsocket 开始监听Websocket
func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, opts ...WsConfOption) error {
for _, opt := range opts {
func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, serverOpts []WsServerOption, evtOpts ...WsEventOption) error {
for _, opt := range serverOpts {
opt(conf)
}
for _, opt := range evtOpts {
opt(conf.WSEvent)
}
err := ngin.setup()
if err != nil {
@ -46,37 +48,6 @@ func (ngin *Engine) ListenWebsocket(conf config.WSServerFullConf, opts ...WsConf
return nil
}
func (ngin *Engine) handleWS(conn *websocket.Conn, conf config.WSServerFullConf) {
wsConn := connection.NewWSConn(conn)
//defaultCloseHandler := conn.CloseHandler()
//conn.SetCloseHandler(func(code int, text string) error {
// result := defaultCloseHandler(code, text)
// //wsConn.Close()
// return result
//})
// ping
defaultPingHandler := wsConn.PingHandler()
wsConn.SetPingHandler(func(appData string) error {
if conf.PingHandler != nil {
conf.PingHandler(appData)
}
return defaultPingHandler(appData)
})
// pong
defaultPongHandler := wsConn.PongHandler()
wsConn.SetPongHandler(func(appData string) error {
if conf.PongHandler != nil {
conf.PongHandler(appData)
}
return defaultPongHandler(appData)
})
ngin.handle(wsConn)
}
func (ngin *Engine) upgradeWebsocket(conf config.WSServerFullConf) {
upgrade := websocket.Upgrader{
HandshakeTimeout: conf.HandshakeTimeout,
@ -88,13 +59,13 @@ func (ngin *Engine) upgradeWebsocket(conf config.WSServerFullConf) {
path := fmt.Sprintf("/%s", strings.TrimPrefix(conf.Path, "/"))
http.HandleFunc(path, func(writer http.ResponseWriter, request *http.Request) {
conn, err := upgrade.Upgrade(writer, request, nil)
wc, err := upgrade.Upgrade(writer, request, nil)
if err != nil {
nlog.Errorf("%s Upgrade failure, URI=%ng, Error=%ng", ngin.LogPrefix(), request.RequestURI, err.Error())
return
}
err = ngin.goPool.Submit(func() {
ngin.handleWS(conn, conf)
err = ngin.pool.Submit(func() {
_ = ngin.handleWS(wc, conf.WSEvent)
})
if err != nil {
nlog.Errorf("%s submit conn pool err: %v", ngin.LogPrefix(), err.Error())

@ -4,7 +4,8 @@ import (
"encoding/json"
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
protocol_nnet "git.noahlan.cn/noahlan/nnet/protocol/nnet"
rt "git.noahlan.cn/noahlan/nnet/router"
@ -14,6 +15,10 @@ import (
"time"
)
var ttt event.OnDisconnectedFn = func(nc *conn.Connection, err error) {
nlog.Debugf("ttt %v", err)
}
func runServer(addr string) {
nginOpts := make([]nnet.RunOption, 0)
nginOpts = append(nginOpts, nnet.WithPoolCfg(npool.Config{
@ -28,6 +33,9 @@ func runServer(addr string) {
HeartbeatInterval: 0,
HandshakeValidator: nil,
})...)
nginOpts = append(nginOpts, func(ngin *nnet.Engine) {
ngin.EventManager().RegisterEvent(event.EvtOnDisconnected, ttt)
})
ngin := nnet.NewEngine(config.EngineConf{
TaskTimerPrecision: 0,
Mode: "dev",
@ -38,7 +46,7 @@ func runServer(addr string) {
Route: "ping",
Code: 1,
},
Handler: func(conn *connection.Connection, pkg packet.IPacket) {
Handler: func(conn *conn.Connection, pkg packet.IPacket) {
nlog.Info("client ping, server pong -> ")
err := conn.Send(protocol_nnet.Header{
PacketType: protocol_nnet.Data,
@ -64,7 +72,7 @@ func runServer(addr string) {
}
func runClient(addr string) (*nnet.Engine, *connection.Connection) {
func runClient(addr string) (*nnet.Engine, *conn.Connection) {
chReady := make(chan struct{})
nginOpts := make([]nnet.RunOption, 0)
@ -90,11 +98,11 @@ func runClient(addr string) (*nnet.Engine, *connection.Connection) {
Route: "test.client",
Code: 1,
},
Handler: func(conn *connection.Connection, pkg packet.IPacket) {
Handler: func(conn *conn.Connection, pkg packet.IPacket) {
nlog.Info("client hahaha")
},
})
conn, err := ngin.Dial(addr)
conn, err := ngin.DialTCP(addr)
nlog.Must(err)
handshake, err := json.Marshal(&protocol_nnet.HandshakeReq{

@ -1,7 +1,7 @@
package main
import (
"git.noahlan.cn/noahlan/nnet/connection"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol/nnet"
rt "git.noahlan.cn/noahlan/nnet/router"
@ -21,7 +21,7 @@ func TestClient(t *testing.T) {
Route: "pong",
Code: 2,
},
Handler: func(conn *connection.Connection, pkg packet.IPacket) {
Handler: func(conn *conn.Connection, pkg packet.IPacket) {
nlog.Info("server pong, client ping ->")
_ = et.Send(nnet.Header{
PacketType: nnet.Data,

@ -0,0 +1,69 @@
package main
import (
"git.noahlan.cn/noahlan/nnet"
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol/plain"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/ntool/nlog"
"time"
)
func runWSServer(addr, path string) {
nginOpts := make([]nnet.RunOption, 0)
nginOpts = append(nginOpts, plain.WithPlainProtocol()...)
ngin := nnet.NewEngine(config.EngineConf{
Mode: config.DevMode,
Name: "DevNL",
}, nginOpts...)
ngin.AddRoutes(rt.Route{
Matches: nil,
Handler: func(conn *conn.Connection, pkg packet.IPacket) {
nlog.Debugf("route fn: %v", pkg)
},
})
defer ngin.Stop()
err := ngin.ListenWebsocket(config.WSServerFullConf{
WSConf: config.WSConf{
Addr: addr,
Path: path,
HandshakeTimeout: time.Second * 5,
},
WSEvent: config.WSEvent{},
}, nil)
if err != nil {
return
}
}
func runWSClient(addr string) (*nnet.Engine, *conn.Connection) {
//chReady := make(chan struct{})
nginOpts := make([]nnet.RunOption, 0)
nginOpts = append(nginOpts, plain.WithPlainProtocol()...)
//var onReadyFn event.OnConnectedFn = func(nc *conn.Connection) {
// chReady <- struct{}{}
//}
//nginOpts = append(nginOpts, func(ngin *nnet.Engine) {
// ngin.EventManager().RegisterEvent(event.EvtOnConnected, onReadyFn)
//})
ngin := nnet.NewEngine(config.EngineConf{
Mode: config.DevMode,
Name: "DevNL-Client",
}, nginOpts...)
nc, err := ngin.DialWebsocket(addr, config.WSClientFullConf{})
nlog.Must(err)
return ngin, nc
}

@ -0,0 +1,24 @@
package main
import (
"github.com/gorilla/websocket"
"sync"
"testing"
)
func TestWSServer(t *testing.T) {
runWSServer("0.0.0.0:14725", "/ws")
}
func TestWSClient(t *testing.T) {
ngin, nc := runWSClient("ws://127.0.0.1:14725/ws")
_ = nc.Send(websocket.TextMessage, []byte("hello world!"))
ngin.LogPrefix()
var wg sync.WaitGroup
wg.Add(1)
wg.Wait()
}

61
ws.go

@ -0,0 +1,61 @@
package nnet
import (
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn"
"github.com/gorilla/websocket"
)
type WsEventOption func(conf config.WSEvent)
func WithPingHandler(fn func(appData string)) WsEventOption {
return func(conf config.WSEvent) {
conf.PingHandler = fn
}
}
func WithPongHandler(fn func(appData string)) WsEventOption {
return func(conf config.WSEvent) {
conf.PongHandler = fn
}
}
func WithCloseHandler(fn func(closeCode int, closeText string) error) WsEventOption {
return func(conf config.WSEvent) {
conf.CloseHandler = fn
}
}
func (ngin *Engine) handleWS(wc *websocket.Conn, conf config.WSEvent) *conn.Connection {
wsConn := conn.NewWSConn(wc)
nc := ngin.handle(wsConn)
defaultCloseHandler := wsConn.CloseHandler()
wsConn.SetCloseHandler(func(code int, text string) error {
result := defaultCloseHandler(code, text)
_ = wsConn.Close()
ngin.evtMgr.OnClose(nc)
return result
})
// ping
defaultPingHandler := wsConn.PingHandler()
wsConn.SetPingHandler(func(appData string) error {
if conf.PingHandler != nil {
conf.PingHandler(appData)
}
return defaultPingHandler(appData)
})
// pong
defaultPongHandler := wsConn.PongHandler()
wsConn.SetPongHandler(func(appData string) error {
if conf.PongHandler != nil {
conf.PongHandler(appData)
}
return defaultPongHandler(appData)
})
return nc
}
Loading…
Cancel
Save