From 7de6cee1804872c06c75918204f662c060ee421e Mon Sep 17 00:00:00 2001 From: NoahLan <6995syu@163.com> Date: Thu, 4 May 2023 10:44:22 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=B0=86=E5=85=A8=E5=B1=80=E7=9A=84?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86=E5=99=A8=E4=B8=8Ennet?= =?UTF-8?q?=E8=B7=AF=E7=94=B1=E4=BF=AE=E6=94=B9=E4=B8=BA=E5=B1=80=E9=83=A8?= =?UTF-8?q?=EF=BC=8C=E4=BB=A5=E8=BE=BE=E5=88=B0=E5=A4=9A=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E7=9A=84=E7=9B=AE=E7=9A=84=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 8 +++++++- conn/conn_mgr.go | 10 ++++------ core/connection.go | 6 +++--- core/engine.go | 28 ++++++++++++++++++---------- core/nnet.go | 13 +++++++++---- internal/env/env.go | 8 -------- protocol/client_pipeline_nnet.go | 4 ++-- protocol/nnet.go | 30 +++++++++++++----------------- protocol/packer_nnet.go | 18 ++++++++++-------- protocol/pipeline_nnet.go | 3 ++- protocol/router_nnet.go | 13 +++++++++++-- scheduler/scheduler.go | 9 ++++++--- test/test_nnet.go | 2 +- 13 files changed, 86 insertions(+), 66 deletions(-) delete mode 100644 internal/env/env.go diff --git a/config/config.go b/config/config.go index c5600f1..ecf6652 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,9 @@ package config -import "git.noahlan.cn/noahlan/ntools-go/core/pool" +import ( + "git.noahlan.cn/noahlan/ntools-go/core/pool" + "time" +) const ( // DevMode means development mode. @@ -28,6 +31,9 @@ type ( Addr string `json:",default=0.0.0.0"` // Name 服务端名称,默认为n-net Name string `json:",default=n-net"` + // TaskTimerPrecision // 全局任务的timer间隔 + TaskTimerPrecision time.Duration `json:",default=1s"` + // Mode 运行模式 Mode string `json:",default=dev,options=[dev,test,prod]"` } ) diff --git a/conn/conn_mgr.go b/conn/conn_mgr.go index 8f0b131..fd85b64 100644 --- a/conn/conn_mgr.go +++ b/conn/conn_mgr.go @@ -5,8 +5,6 @@ import ( "sync" ) -var ConnManager *Manager - type Manager struct { sync.RWMutex @@ -16,8 +14,8 @@ type Manager struct { conns map[int64]entity.NetworkEntity } -func init() { - ConnManager = &Manager{ +func NewManager() *Manager { + return &Manager{ RWMutex: sync.RWMutex{}, groups: make(map[string]*Group), conns: make(map[int64]entity.NetworkEntity), @@ -25,7 +23,7 @@ func init() { } // Store 保存连接,同时加入到指定分组,若给定分组名为空,则不进行分组操作 -func (m *Manager) Store(groupName string, s entity.NetworkEntity) { +func (m *Manager) Store(groupName string, s entity.NetworkEntity) error { m.Lock() m.conns[s.Session().ID()] = s m.Unlock() @@ -34,7 +32,7 @@ func (m *Manager) Store(groupName string, s entity.NetworkEntity) { if !ok { group = m.NewGroup(groupName) } - _ = group.Add(s) + return group.Add(s) } // NewGroup 新增分组,若分组已存在,则返回现有分组 diff --git a/core/connection.go b/core/connection.go index 4f08486..b47e8a7 100644 --- a/core/connection.go +++ b/core/connection.go @@ -141,7 +141,7 @@ func (r *connection) write() { close(r.chWrite) _ = r.Close() - nlog.Debugf("connection write goroutine exit, ConnID=%d, SessionUID=%s", r.ID(), r.session.UID()) + nlog.Debugf("[writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s", r.ID(), r.session.UID()) }() for { @@ -202,12 +202,12 @@ func (r *connection) read() { n, err := r.conn.Read(buf) //nlog.Debugf("receive data %v", buf[:n]) if err != nil { - nlog.Errorf("Read message error: %s, session will be closed immediately", err.Error()) + nlog.Errorf("[readLoop] Read message error: %s, session will be closed immediately", err.Error()) return } if r.packer == nil { - nlog.Errorf("unexpected error: packer is nil") + nlog.Errorf("[readLoop] unexpected error: packer is nil") return } diff --git a/core/engine.go b/core/engine.go index 586bf3e..4d2b95f 100644 --- a/core/engine.go +++ b/core/engine.go @@ -47,8 +47,10 @@ type ( packerFn packet.NewPackerFunc // 封包、拆包器 serializer serialize.Serializer // 消息 序列化/反序列化 - retryInterval time.Duration // 消息重试间隔时长 - wsOpt wsOptions // websocket + wsOpt wsOptions // websocket + + taskTimerPrecision time.Duration + connManager *conn2.Manager } wsOptions struct { @@ -62,11 +64,13 @@ type ( func newEngine(conf config.EngineConf) *engine { s := &engine{ - conf: conf, - dieChan: make(chan struct{}), - pipeline: pipeline.New(), - middlewares: make([]Middleware, 0), - routes: make([]Route, 0), + conf: conf, + dieChan: make(chan struct{}), + pipeline: pipeline.New(), + middlewares: make([]Middleware, 0), + routes: make([]Route, 0), + taskTimerPrecision: conf.TaskTimerPrecision, + connManager: conn2.NewManager(), } pool.InitPool(conf.Pool) @@ -113,7 +117,7 @@ func (ng *engine) dial(addr string, router Router) (entity.NetworkEntity, error) if err := ng.bindRoutes(router); err != nil { return nil, err } - go scheduler.Schedule() + go scheduler.Schedule(ng.taskTimerPrecision) // connection conn, err := net.Dial("tcp", addr) @@ -123,6 +127,9 @@ func (ng *engine) dial(addr string, router Router) (entity.NetworkEntity, error) c.serve() // hook Lifetime.Open(c) + // connection manager + err = ng.connManager.Store(conn2.DefaultGroupName, c) + nlog.Must(err) // 连接成功,客户端已启动 nlog.Infof("now connect to %s.", addr) @@ -136,7 +143,7 @@ func (ng *engine) serve(router Router) error { if err := ng.bindRoutes(router); err != nil { return err } - go scheduler.Schedule() + go scheduler.Schedule(ng.taskTimerPrecision) defer func() { nlog.Info("NNet is stopping...") @@ -248,7 +255,8 @@ func (ng *engine) handleWS(conn *websocket.Conn) { func (ng *engine) handle(conn net.Conn) { c := newConnection(ng, conn) - conn2.ConnManager.Store(conn2.DefaultGroupName, c) + err := ng.connManager.Store(conn2.DefaultGroupName, c) + nlog.Must(err) c.serve() // hook diff --git a/core/nnet.go b/core/nnet.go index e27c10f..c8c1ee6 100644 --- a/core/nnet.go +++ b/core/nnet.go @@ -2,8 +2,8 @@ package core import ( "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/entity" - "git.noahlan.cn/noahlan/nnet/internal/env" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/pipeline" "git.noahlan.cn/noahlan/nnet/serialize" @@ -110,6 +110,11 @@ func (s *NNet) Pipeline() pipeline.Pipeline { return s.ngin.pipeline } +// ConnManager returns connection manager +func (s *NNet) ConnManager() *conn.Manager { + return s.ngin.connManager +} + // ToMiddleware converts the given handler to a Middleware. func ToMiddleware(handler func(next Handler) Handler) Middleware { return func(next HandlerFunc) HandlerFunc { @@ -174,15 +179,15 @@ func WithSerializer(s serialize.Serializer) RunOption { } } -// WithTimerPrecision 设置Timer精度 +// WithTimerPrecision 设置Timer精度,需在 Start 或 Dial 之前执行 // 注:精度需大于1ms, 并且不能在运行时更改 // 默认精度是 time.Second func WithTimerPrecision(precision time.Duration) RunOption { if precision < time.Millisecond { panic("time precision can not less than a Millisecond") } - return func(_ *NNet) { - env.TimerPrecision = precision + return func(s *NNet) { + s.ngin.taskTimerPrecision = precision } } diff --git a/internal/env/env.go b/internal/env/env.go deleted file mode 100644 index 89d5adc..0000000 --- a/internal/env/env.go +++ /dev/null @@ -1,8 +0,0 @@ -package env - -import "time" - -var ( - // TimerPrecision indicates the precision of timer, default is time.Second - TimerPrecision = time.Second -) diff --git a/protocol/client_pipeline_nnet.go b/protocol/client_pipeline_nnet.go index 3828105..42e3784 100644 --- a/protocol/client_pipeline_nnet.go +++ b/protocol/client_pipeline_nnet.go @@ -6,13 +6,13 @@ import ( "fmt" "git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/entity" + "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntools-go/core/nlog" ) type OnReadyFunc func() -func WithNNetClientPipeline(onReady OnReadyFunc) core.RunOption { - packer := NewNNetPacker() +func WithNNetClientPipeline(onReady OnReadyFunc, packer packet.Packer) core.RunOption { return func(server *core.NNet) { server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error { pkg, ok := v.(*NNetPacket) diff --git a/protocol/nnet.go b/protocol/nnet.go index 01f7418..dad98a9 100644 --- a/protocol/nnet.go +++ b/protocol/nnet.go @@ -36,20 +36,13 @@ type ( } ) -var routeMap *RouteMap - -func init() { - routeMap = &RouteMap{ - Routes: make(map[string]uint16), - Codes: make(map[uint16]string), - } -} - func WithNNetClientProtocol(onReady OnReadyFunc) []core.RunOption { + router := NewNNetRouter().(*nNetRouter) + packer := NewNNetPacker(router.routeMap) opts := []core.RunOption{ - WithNNetClientPipeline(onReady), - core.WithRouter(NewNNetRouter()), - core.WithPacker(func() packet.Packer { return NewNNetPacker() }), + WithNNetClientPipeline(onReady, packer), + core.WithRouter(router), + core.WithPacker(func() packet.Packer { return NewNNetPacker(router.routeMap) }), } return opts @@ -61,19 +54,22 @@ func WithNNetProtocol(config NNetConfig) []core.RunOption { return nil } } + router := NewNNetRouter().(*nNetRouter) handshakeAckData := &HandshakeResp{ Heartbeat: int64(config.HeartbeatInterval.Seconds()), - RouteMap: routeMap, + RouteMap: router.routeMap, } + router.routeMap.Routes["hahah"] = 222 + + packer := NewNNetPacker(router.routeMap) opts := []core.RunOption{ - WithNNetPipeline(handshakeAckData, config.HandshakeValidator), - core.WithRouter(NewNNetRouter()), - core.WithPacker(func() packet.Packer { return NewNNetPacker() }), + WithNNetPipeline(handshakeAckData, config.HandshakeValidator, packer), + core.WithRouter(router), + core.WithPacker(func() packet.Packer { return NewNNetPacker(router.routeMap) }), } if config.HeartbeatInterval.Seconds() > 0 { - packer := NewNNetPacker() hbd, err := packer.Pack(Heartbeat, nil) nlog.Must(err) diff --git a/protocol/packer_nnet.go b/protocol/packer_nnet.go index 2b409e7..f6f94f2 100644 --- a/protocol/packer_nnet.go +++ b/protocol/packer_nnet.go @@ -8,10 +8,11 @@ import ( ) type NNetPacker struct { - buf *bytes.Buffer - size int // 最近一次 length - typ byte // 最近一次 packet type - flag byte // 最近一次 flag + buf *bytes.Buffer + size int // 最近一次 length + typ byte // 最近一次 packet type + flag byte // 最近一次 flag + routeMap *RouteMap } // packer constants. @@ -35,9 +36,10 @@ var ( ErrWrongPacketType = errors.New("wrong packet type") ) -func NewNNetPacker() *NNetPacker { +func NewNNetPacker(routeMap *RouteMap) *NNetPacker { p := &NNetPacker{ - buf: bytes.NewBuffer(nil), + buf: bytes.NewBuffer(nil), + routeMap: routeMap, } p.resetFlags() return p @@ -82,7 +84,7 @@ func (d *NNetPacker) Pack(header interface{}, data []byte) ([]byte, error) { // flag flag := byte(h.MsgType << 1) // 编译器提示,此处 byte 转换不能删 - code, compressed := routeMap.Routes[h.Route] + code, compressed := d.routeMap.Routes[h.Route] if compressed { flag |= msgRouteCompressMask } @@ -180,7 +182,7 @@ func (d *NNetPacker) Unpack(data []byte) ([]packet.IPacket, error) { if d.flag&msgRouteCompressMask == 1 { p.compressed = true code := binary.BigEndian.Uint16(d.buf.Next(2)) - route, ok := routeMap.Codes[code] + route, ok := d.routeMap.Codes[code] if !ok { return nil, ErrRouteInfoNotFound } diff --git a/protocol/pipeline_nnet.go b/protocol/pipeline_nnet.go index beab5e4..47adadb 100644 --- a/protocol/pipeline_nnet.go +++ b/protocol/pipeline_nnet.go @@ -6,6 +6,7 @@ import ( "fmt" "git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/entity" + "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntools-go/core/nlog" ) @@ -17,8 +18,8 @@ type ( func WithNNetPipeline( handshakeResp *HandshakeResp, validator HandshakeValidatorFunc, + packer packet.Packer, ) core.RunOption { - packer := NewNNetPacker() return func(server *core.NNet) { server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error { pkg, ok := v.(*NNetPacket) diff --git a/protocol/router_nnet.go b/protocol/router_nnet.go index 4833afc..c1e5df4 100644 --- a/protocol/router_nnet.go +++ b/protocol/router_nnet.go @@ -27,13 +27,22 @@ type ( } nNetRouter struct { + routeMap *RouteMap handlers map[string]core.Handler notFound core.Handler } ) +func NewRouteMap() *RouteMap { + return &RouteMap{ + Routes: make(map[string]uint16), + Codes: make(map[uint16]string), + } +} + func NewNNetRouter() core.Router { return &nNetRouter{ + routeMap: NewRouteMap(), handlers: make(map[string]core.Handler), } } @@ -63,8 +72,8 @@ func (r *nNetRouter) Register(matches interface{}, handler core.Handler) error { } r.handlers[match.Route] = handler // routeMap - routeMap.Routes[match.Route] = match.Code - routeMap.Codes[match.Code] = match.Route + r.routeMap.Routes[match.Route] = match.Code + r.routeMap.Codes[match.Code] = match.Route return nil } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 365c2aa..fb1ed8a 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -1,7 +1,6 @@ package scheduler import ( - "git.noahlan.cn/noahlan/nnet/internal/env" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "runtime/debug" "sync/atomic" @@ -39,12 +38,16 @@ func try(f func()) { f() } -func Schedule() { +func Schedule(timerPrecision time.Duration) { if atomic.AddInt32(&started, 1) != 1 { return } - ticker := time.NewTicker(env.TimerPrecision) + if timerPrecision.Seconds() == 0 { + timerPrecision = time.Second + } + + ticker := time.NewTicker(timerPrecision) defer func() { ticker.Stop() close(chExit) diff --git a/test/test_nnet.go b/test/test_nnet.go index 542ed2a..c47317f 100644 --- a/test/test_nnet.go +++ b/test/test_nnet.go @@ -98,7 +98,7 @@ func runClient(addr string) (client *core.Client, et entity.NetworkEntity) { }) nlog.Must(err) - packer := protocol.NewNNetPacker() + packer := protocol.NewNNetPacker(protocol.NewRouteMap()) hsd, err := packer.Pack(protocol.Header{ PacketType: protocol.Handshake, MessageHeader: protocol.MessageHeader{