diff --git a/conn/group.go b/conn/group.go index 3ad3aa1..b9d5b59 100644 --- a/conn/group.go +++ b/conn/group.go @@ -128,7 +128,7 @@ func (c *Group) Add(e entity.NetworkEntity) error { c.conns[id] = e } - nlog.Debugf("Add connection to group %s, ID=%d, UID=%d", c.name, sess.ID(), sess.UID()) + nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", c.name, sess.ID(), sess.UID()) return nil } @@ -141,7 +141,7 @@ func (c *Group) Leave(e entity.NetworkEntity) error { return nil } sess := e.Session() - nlog.Debugf("Remove connection from group %s, UID=%d", c.name, sess.ID()) + nlog.Debugf("Remove connection from group %s, UID=%s", c.name, sess.UID()) c.mu.Lock() defer c.mu.Unlock() diff --git a/core/connection.go b/core/connection.go index f029a65..4f08486 100644 --- a/core/connection.go +++ b/core/connection.go @@ -183,6 +183,7 @@ func (r *connection) write() { nlog.Error(err.Error()) break } + //nlog.Debugf("write data %v", data) case <-r.chDie: // connection close signal return case <-r.ngin.dieChan: // application quit signal @@ -199,6 +200,7 @@ func (r *connection) read() { buf := make([]byte, 4096) for { n, err := r.conn.Read(buf) + //nlog.Debugf("receive data %v", buf[:n]) if err != nil { nlog.Errorf("Read message error: %s, session will be closed immediately", err.Error()) return diff --git a/core/engine.go b/core/engine.go index 90c5a0c..586bf3e 100644 --- a/core/engine.go +++ b/core/engine.go @@ -107,6 +107,29 @@ func convertMiddleware(ware Middleware) func(Handler) Handler { } } +func (ng *engine) dial(addr string, router Router) (entity.NetworkEntity, error) { + ng.handler = router + + if err := ng.bindRoutes(router); err != nil { + return nil, err + } + go scheduler.Schedule() + + // connection + conn, err := net.Dial("tcp", addr) + nlog.Must(err) + + c := newConnection(ng, conn) + c.serve() + // hook + Lifetime.Open(c) + + // 连接成功,客户端已启动 + nlog.Infof("now connect to %s.", addr) + + return c, nil +} + func (ng *engine) serve(router Router) error { ng.handler = router @@ -115,7 +138,7 @@ func (ng *engine) serve(router Router) error { } go scheduler.Schedule() defer func() { - nlog.Info("Server is stopping...") + nlog.Info("NNet is stopping...") ng.shutdown() scheduler.Close() @@ -136,9 +159,9 @@ func (ng *engine) serve(router Router) error { select { case <-ng.dieChan: - nlog.Info("Server will shutdown in a few seconds") + nlog.Info("NNet will shutdown in a few seconds") case s := <-sg: - nlog.Infof("Server got signal: %s", s) + nlog.Infof("NNet got signal: %s", s) } return nil @@ -153,15 +176,12 @@ func (ng *engine) shutdown() { func (ng *engine) listenAndServe() { listener, err := net.Listen(ng.conf.Protocol, ng.conf.Addr) - if err != nil { - panic(err) - } + nlog.Must(err) // 监听成功,服务已启动 nlog.Infof("now listening %s on %s.", ng.conf.Protocol, ng.conf.Addr) defer func() { _ = listener.Close() - ng.shutdown() ng.close() }() diff --git a/core/server.go b/core/nnet.go similarity index 70% rename from core/server.go rename to core/nnet.go index b37e0f0..e27c10f 100644 --- a/core/server.go +++ b/core/nnet.go @@ -2,6 +2,7 @@ package core import ( "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/internal/env" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/pipeline" @@ -12,10 +13,18 @@ import ( ) type ( - // RunOption defines the method to customize a Server. - RunOption func(*Server) + // RunOption defines the method to customize a NNet. + RunOption func(*NNet) Server struct { + *NNet + } + + Client struct { + *NNet + } + + NNet struct { ngin *engine router Router } @@ -25,29 +34,39 @@ type ( // Be aware that later RunOption might overwrite previous one that write the same option. func NewServer(c config.EngineConf, opts ...RunOption) *Server { s := &Server{ - ngin: newEngine(c), - router: NewDefaultRouter(), + NNet: &NNet{ + ngin: newEngine(c), + router: NewDefaultRouter(), + }, } opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...) for _, opt := range opts { - opt(s) + opt(s.NNet) } return s } -// AddRoutes add given routes into the Server. -func (s *Server) AddRoutes(rs []Route) { - s.ngin.addRoutes(rs...) -} +// NewClient returns a client with given config of c and options defined in opts. +// Be aware that later RunOption might overwrite previous one that write the same option. +func NewClient(c config.EngineConf, opts ...RunOption) *Client { + s := &Client{ + NNet: &NNet{ + ngin: newEngine(c), + router: NewDefaultRouter(), + }, + } -// AddRoute adds given route into the Server. -func (s *Server) AddRoute(r Route) { - s.AddRoutes([]Route{r}) + opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...) + for _, opt := range opts { + opt(s.NNet) + } + + return s } -// Start starts the Server. +// Start starts the NNet. // Graceful shutdown is enabled by default. func (s *Server) Start() { if err := s.ngin.serve(s.router); err != nil { @@ -56,18 +75,38 @@ func (s *Server) Start() { } } -// Stop stops the Server. -func (s *Server) Stop() { +// Dial start the NNet client. +// Graceful shutdown is enabled by default. +func (c *Client) Dial(addr string) entity.NetworkEntity { + e, err := c.ngin.dial(addr, c.router) + nlog.Must(err) + return e +} + +// AddRoutes add given routes into the NNet. +func (s *NNet) AddRoutes(rs []Route) { + s.ngin.addRoutes(rs...) + err := s.ngin.bindRoutes(s.router) + nlog.Must(err) +} + +// AddRoute adds given route into the NNet. +func (s *NNet) AddRoute(r Route) { + s.AddRoutes([]Route{r}) +} + +// Stop stops the NNet. +func (s *NNet) Stop() { s.ngin.close() } -// Use adds the given middleware in the Server. -func (s *Server) Use(middleware ...Middleware) { +// Use adds the given middleware in the NNet. +func (s *NNet) Use(middleware ...Middleware) { s.ngin.use(middleware...) } // Pipeline returns inner pipeline -func (s *Server) Pipeline() pipeline.Pipeline { +func (s *NNet) Pipeline() pipeline.Pipeline { return s.ngin.pipeline } @@ -101,14 +140,14 @@ func WithMiddleware(middleware Middleware, rs ...Route) []Route { } func UseMiddleware(middleware ...Middleware) RunOption { - return func(server *Server) { + return func(server *NNet) { server.Use(middleware...) } } // WithNotFoundHandler returns a RunOption with not found handler set to given handler. func WithNotFoundHandler(handler Handler) RunOption { - return func(server *Server) { + return func(server *NNet) { notFoundHandler := server.ngin.notFoundHandler(handler) server.router.SetNotFoundHandler(notFoundHandler) } @@ -116,21 +155,21 @@ func WithNotFoundHandler(handler Handler) RunOption { // WithRouter 设置消息路由 func WithRouter(router Router) RunOption { - return func(server *Server) { + return func(server *NNet) { server.router = router } } // WithPacker 设置消息的 封包/解包 方式 func WithPacker(fn packet.NewPackerFunc) RunOption { - return func(server *Server) { + return func(server *NNet) { server.ngin.packerFn = fn } } // WithSerializer 设置消息的 序列化/反序列化 方式 func WithSerializer(s serialize.Serializer) RunOption { - return func(server *Server) { + return func(server *NNet) { server.ngin.serializer = s } } @@ -142,13 +181,13 @@ func WithTimerPrecision(precision time.Duration) RunOption { if precision < time.Millisecond { panic("time precision can not less than a Millisecond") } - return func(_ *Server) { + return func(_ *NNet) { env.TimerPrecision = precision } } func WithPipeline(pipeline pipeline.Pipeline) RunOption { - return func(server *Server) { + return func(server *NNet) { server.ngin.pipeline = pipeline } } @@ -156,7 +195,7 @@ func WithPipeline(pipeline pipeline.Pipeline) RunOption { type PipelineOption func(opts pipeline.Pipeline) func WithPipelineOpt(opts ...func(pipeline.Pipeline)) RunOption { - return func(server *Server) { + return func(server *NNet) { for _, opt := range opts { opt(server.ngin.pipeline) } @@ -167,7 +206,7 @@ type WSOption func(opts *wsOptions) // WithWebsocket 开启Websocket, 参数是websocket的相关参数 nnet.WSOption func WithWebsocket(wsOpts ...WSOption) RunOption { - return func(server *Server) { + return func(server *NNet) { for _, opt := range wsOpts { opt(&server.ngin.wsOpt) } diff --git a/core/nnet_test.go b/core/nnet_test.go new file mode 100644 index 0000000..6420c0b --- /dev/null +++ b/core/nnet_test.go @@ -0,0 +1,66 @@ +package core + +import ( + "fmt" + "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/entity" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/nnet/protocol" + "git.noahlan.cn/noahlan/ntools-go/core/nlog" + "git.noahlan.cn/noahlan/ntools-go/core/pool" + "math" + "testing" + "time" +) + +func TestServer(t *testing.T) { + server := NewServer(config.EngineConf{ + ServerConf: config.ServerConf{ + Protocol: "tcp", + Addr: "0.0.0.0:6666", + Name: "testServer", + Mode: "dev", + }, + Pool: pool.Config{ + PoolSize: math.MaxInt32, + ExpiryDuration: time.Second, + PreAlloc: false, + MaxBlockingTasks: 0, + Nonblocking: false, + DisablePurge: false, + }, + }, protocol.WithNNetProtocol(protocol.NNetConfig{ + HeartbeatInterval: 0, + HandshakeValidator: nil, + })...) + + server.AddRoute(Route{ + Matches: protocol.Match{ + Route: "test", + Code: 1, + }, + Handler: func(entity entity.NetworkEntity, pkg packet.IPacket) { + fmt.Println(pkg) + p, ok := pkg.(*protocol.NNetPacket) + if !ok { + nlog.Error("wrong packet type") + return + } + + bd := []byte("服务器接收到数据为: " + string(p.GetBody())) + // 注:Response类型数据不需要Route(原地返回,客户端需等待) + _ = entity.Send(protocol.Header{ + PacketType: protocol.Data, + Length: uint32(len(bd)), + MessageHeader: protocol.MessageHeader{ + MsgType: protocol.Response, + ID: p.ID, + Route: p.Route, + }, + }, bd) + }, + }) + + defer server.Stop() + server.Start() +} diff --git a/core/server_test.go b/core/server_test.go deleted file mode 100644 index c3e3573..0000000 --- a/core/server_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package core - -import ( - "fmt" - "git.noahlan.cn/noahlan/nnet/packet" - "git.noahlan.cn/noahlan/ntools-go/core/nlog" - "testing" - "time" -) - -func TestServer(t *testing.T) { - server := NewServer(EngineConf{ - Protocol: "tcp", - Addr: ":12345", - Name: "N-Net", - Mode: DevMode, - }, - WithPacker(func() packet.Packer { - return packet.NewNNetPacker() - }), - WithSerializer(nil), - WithHeartbeatInterval(time.Hour), - WithProcessor(NewNNetProcessor()), - ) - - server.AddRoute(Route{ - Matches: "test", - Handler: func(conn *connection, pkg packet.IPacket) { - fmt.Println(pkg) - p, ok := pkg.(*packet.Packet) - if !ok { - nlog.Error("wrong packet type") - return - } - - bd := []byte("服务器接收到数据为: " + string(p.GetBody())) - // 注:Response类型数据不需要Route(原地返回,客户端需等待) - conn.Send(packet.Header{ - PacketType: packet.Data, - Length: uint32(len(bd)), - MessageHeader: packet.MessageHeader{ - MsgType: packet.Response, - ID: p.ID, - Route: p.Route, - }, - }, bd) - }, - }) - - defer server.Stop() - - server.Start() -} diff --git a/go.mod b/go.mod index 1467554..b42d2e0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( github.com/gorilla/websocket v1.5.0 - github.com/panjf2000/ants/v2 v2.7.3 + github.com/panjf2000/ants/v2 v2.7.3 // indirect ) require google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 diff --git a/go.sum b/go.sum index c5b02db..c6aa715 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -git.noahlan.cn/noahlan/ntools-go/core v1.1.1 h1:icFPOTTpVYPa8NpNJteAwFBARPOuHE3695xZWNcAM2c= -git.noahlan.cn/noahlan/ntools-go/core v1.1.1/go.mod h1:UN8UVL5WoyMgqNcxKoAu0/J9d+1hH2Yco64MUtPdjFk= git.noahlan.cn/noahlan/ntools-go/core v1.1.3 h1:n4z0KaXmX/fmobavxCMc2vGJDoStbhNbm8AZugPEPGg= git.noahlan.cn/noahlan/ntools-go/core v1.1.3/go.mod h1:pmwee9V76Cyp6nVr3dPj5TpePLvRpc8C0ZgAzFIFAKU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -21,8 +19,7 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/panjf2000/ants/v2 v2.6.0 h1:xOSpw42m+BMiJ2I33we7h6fYzG4DAlpE1xyI7VS2gxU= -github.com/panjf2000/ants/v2 v2.6.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE= +github.com/panjf2000/ants/v2 v2.7.3 h1:rHQ0hH0DQvuNUqqlWIMJtkMcDuL1uQAfpX2mIhQ5/s0= github.com/panjf2000/ants/v2 v2.7.3/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -40,6 +37,7 @@ go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyK go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= diff --git a/middleware/heartbeat.go b/middleware/heartbeat.go index fa205e0..5c2e4e7 100644 --- a/middleware/heartbeat.go +++ b/middleware/heartbeat.go @@ -15,19 +15,19 @@ type HeartbeatMiddleware struct { hbdFn func(entity entity.NetworkEntity) []byte } -func WithHeartbeat(interval time.Duration, dataFn func(entity entity.NetworkEntity) []byte) core.RunOption { +func WithHeartbeat(interval time.Duration, hbdFn func(entity entity.NetworkEntity) []byte) core.RunOption { m := &HeartbeatMiddleware{ lastAt: time.Now().Unix(), interval: interval, - hbdFn: dataFn, + hbdFn: hbdFn, } - if dataFn == nil { + if hbdFn == nil { nlog.Error("dataFn must not be nil") panic("dataFn must not be nil") } core.Lifetime.OnOpen(m.start) - return func(server *core.Server) { + return func(server *core.NNet) { server.Use(func(next core.HandlerFunc) core.HandlerFunc { return func(entity entity.NetworkEntity, pkg packet.IPacket) { m.handle(entity, pkg) diff --git a/protocol/client_pipeline_nnet.go b/protocol/client_pipeline_nnet.go new file mode 100644 index 0000000..3828105 --- /dev/null +++ b/protocol/client_pipeline_nnet.go @@ -0,0 +1,65 @@ +package protocol + +import ( + "encoding/json" + "errors" + "fmt" + "git.noahlan.cn/noahlan/nnet/core" + "git.noahlan.cn/noahlan/nnet/entity" + "git.noahlan.cn/noahlan/ntools-go/core/nlog" +) + +type OnReadyFunc func() + +func WithNNetClientPipeline(onReady OnReadyFunc) core.RunOption { + packer := NewNNetPacker() + return func(server *core.NNet) { + server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error { + pkg, ok := v.(*NNetPacket) + if !ok { + return ErrWrongPacketType + } + conn, _ := entity.Conn() + + // Server to client + switch pkg.PacketType { + case Handshake: + var handshakeData HandshakeResp + err := json.Unmarshal(pkg.Data, &handshakeData) + nlog.Must(err) + + hrd, _ := packer.Pack(Header{ + PacketType: HandshakeAck, + MessageHeader: MessageHeader{}, + }, nil) + if err := entity.SendBytes(hrd); err != nil { + return err + } + entity.SetStatus(core.StatusWorking) + // onReady + if onReady != nil { + onReady() + } + nlog.Debugf("connection handshake Id=%d, Remote=%s", entity.Session().ID(), conn.RemoteAddr()) + case Kick: + _ = entity.Close() + case Data: + status := entity.Status() + if status != core.StatusWorking { + return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", + conn.RemoteAddr())) + } + + var lastMid uint64 + switch pkg.MsgType { + case Response: + lastMid = pkg.ID + case Notify: + lastMid = 0 + } + entity.SetLastMID(lastMid) + } + return nil + }) + } +} diff --git a/protocol/nnet.go b/protocol/nnet.go index b230bd9..01f7418 100644 --- a/protocol/nnet.go +++ b/protocol/nnet.go @@ -1,7 +1,6 @@ package protocol import ( - "encoding/json" "git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/middleware" @@ -12,12 +11,11 @@ import ( type ( NNetConfig struct { - HeartbeatInterval time.Duration - HandshakeValidator HandshakeValidatorFunc - HandshakeAckBuilder HandshakeAckBuilderFunc + HeartbeatInterval time.Duration + HandshakeValidator HandshakeValidatorFunc } - handshakeData struct { + HandshakeReq struct { Version string `json:"version"` // 客户端版本,服务器以此判断是否合适与客户端通信 Type string `json:"type"` // 客户端类型,与客户端版本号一起来确定客户端是否合适 ClientId string `json:"clientId"` // 客户端ID,服务器以此取值 @@ -27,39 +25,56 @@ type ( Payload interface{} `json:"payload,optional,omitempty"` } - HandshakeAckData struct { + HandshakeResp struct { // 心跳间隔,单位秒 0表示不需要心跳 Heartbeat int64 `json:"heartbeat"` - // 路由 - Routes map[string]uint16 `json:"routes"` // route map to code - Codes map[uint16]string `json:"codes"` // code map to route + *RouteMap // 透传信息 Payload interface{} `json:"payload,optional,omitempty"` } ) +var routeMap *RouteMap + +func init() { + routeMap = &RouteMap{ + Routes: make(map[string]uint16), + Codes: make(map[uint16]string), + } +} + +func WithNNetClientProtocol(onReady OnReadyFunc) []core.RunOption { + opts := []core.RunOption{ + WithNNetClientPipeline(onReady), + core.WithRouter(NewNNetRouter()), + core.WithPacker(func() packet.Packer { return NewNNetPacker() }), + } + + return opts +} + func WithNNetProtocol(config NNetConfig) []core.RunOption { if config.HandshakeValidator == nil { - config.HandshakeValidator = func(bytes []byte) error { return nil } - } - if config.HandshakeAckBuilder == nil { - config.HandshakeAckBuilder = func() ([]byte, error) { - defaultData := &HandshakeAckData{} - return json.Marshal(defaultData) + config.HandshakeValidator = func(data *HandshakeReq) error { + return nil } } + handshakeAckData := &HandshakeResp{ + Heartbeat: int64(config.HeartbeatInterval.Seconds()), + RouteMap: routeMap, + } opts := []core.RunOption{ - WithNNetPipeline(config.HandshakeAckBuilder, config.HandshakeValidator), + WithNNetPipeline(handshakeAckData, config.HandshakeValidator), core.WithRouter(NewNNetRouter()), core.WithPacker(func() packet.Packer { return NewNNetPacker() }), } if config.HeartbeatInterval.Seconds() > 0 { packer := NewNNetPacker() - hbd, err := packer.Pack(Handshake, nil) + hbd, err := packer.Pack(Heartbeat, nil) nlog.Must(err) opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ entity.NetworkEntity) []byte { diff --git a/protocol/packer_nnet.go b/protocol/packer_nnet.go index 4cbb5e4..2b409e7 100644 --- a/protocol/packer_nnet.go +++ b/protocol/packer_nnet.go @@ -35,11 +35,6 @@ var ( ErrWrongPacketType = errors.New("wrong packet type") ) -var ( - routes = make(map[string]uint16) // route map to code - codes = make(map[uint16]string) // code map to route -) - func NewNNetPacker() *NNetPacker { p := &NNetPacker{ buf: bytes.NewBuffer(nil), @@ -87,7 +82,7 @@ func (d *NNetPacker) Pack(header interface{}, data []byte) ([]byte, error) { // flag flag := byte(h.MsgType << 1) // 编译器提示,此处 byte 转换不能删 - code, compressed := routes[h.Route] + code, compressed := routeMap.Routes[h.Route] if compressed { flag |= msgRouteCompressMask } @@ -185,7 +180,7 @@ func (d *NNetPacker) Unpack(data []byte) ([]packet.IPacket, error) { if d.flag&msgRouteCompressMask == 1 { p.compressed = true code := binary.BigEndian.Uint16(d.buf.Next(2)) - route, ok := codes[code] + route, ok := routeMap.Codes[code] if !ok { return nil, ErrRouteInfoNotFound } diff --git a/protocol/pipeline_nnet.go b/protocol/pipeline_nnet.go index 4a83c16..beab5e4 100644 --- a/protocol/pipeline_nnet.go +++ b/protocol/pipeline_nnet.go @@ -1,6 +1,7 @@ package protocol import ( + "encoding/json" "errors" "fmt" "git.noahlan.cn/noahlan/nnet/core" @@ -9,13 +10,16 @@ import ( ) type ( - HandshakeValidatorFunc func([]byte) error - HandshakeAckBuilderFunc func() (interface{}, error) + HandshakeValidatorFunc func(*HandshakeReq) error + HandshakeAckPayloadFunc func() interface{} ) -func WithNNetPipeline(ackDataBuilder HandshakeAckBuilderFunc, validator HandshakeValidatorFunc) core.RunOption { +func WithNNetPipeline( + handshakeResp *HandshakeResp, + validator HandshakeValidatorFunc, +) core.RunOption { packer := NewNNetPacker() - return func(server *core.Server) { + return func(server *core.NNet) { server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error { pkg, ok := v.(*NNetPacket) if !ok { @@ -25,13 +29,22 @@ func WithNNetPipeline(ackDataBuilder HandshakeAckBuilderFunc, validator Handshak switch pkg.PacketType { case Handshake: - if err := validator(pkg.Data); err != nil { + var handshakeData HandshakeReq + err := json.Unmarshal(pkg.Data, &handshakeData) + nlog.Must(err) + + if err := validator(&handshakeData); err != nil { return err } - data, err := ackDataBuilder() + handshakeResp.Payload = handshakeData.Payload + + data, err := json.Marshal(handshakeResp) nlog.Must(err) - hrd, _ := packer.Pack(Handshake, data) + hrd, _ := packer.Pack(Header{ + PacketType: Handshake, + MessageHeader: MessageHeader{}, + }, data) if err := entity.SendBytes(hrd); err != nil { return err } @@ -40,8 +53,6 @@ func WithNNetPipeline(ackDataBuilder HandshakeAckBuilderFunc, validator Handshak case HandshakeAck: entity.SetStatus(core.StatusPending) nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", entity.Session().ID(), conn.RemoteAddr()) - case Heartbeat: - // Expected case Data: if entity.Status() < core.StatusPending { return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", diff --git a/protocol/router_nnet.go b/protocol/router_nnet.go index a692621..4833afc 100644 --- a/protocol/router_nnet.go +++ b/protocol/router_nnet.go @@ -2,16 +2,35 @@ package protocol import ( "errors" + "fmt" "git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntools-go/core/nlog" ) -type nNetRouter struct { - handlers map[string]core.Handler - notFound core.Handler -} +var ( + Routes = make(map[string]uint16) // route map to code + Codes = make(map[uint16]string) // code map to route +) + +type ( + RouteMap struct { + // 路由 + Routes map[string]uint16 `json:"routes"` // route map to code + Codes map[uint16]string `json:"codes"` // code map to route + } + + Match struct { + Route string // 路由 + Code uint16 // 路由编码 + } + + nNetRouter struct { + handlers map[string]core.Handler + notFound core.Handler + } +) func NewNNetRouter() core.Router { return &nNetRouter{ @@ -38,11 +57,14 @@ func (r *nNetRouter) Handle(entity entity.NetworkEntity, p packet.IPacket) { } func (r *nNetRouter) Register(matches interface{}, handler core.Handler) error { - route, ok := matches.(string) + match, ok := matches.(Match) if !ok { - return errors.New("the type of matches must be string") + return errors.New(fmt.Sprintf("the type of matches must be %T", Match{})) } - r.handlers[route] = handler + r.handlers[match.Route] = handler + // routeMap + routeMap.Routes[match.Route] = match.Code + routeMap.Codes[match.Code] = match.Route return nil } diff --git a/test/test_nnet.go b/test/test_nnet.go new file mode 100644 index 0000000..542ed2a --- /dev/null +++ b/test/test_nnet.go @@ -0,0 +1,117 @@ +package main + +import ( + "encoding/json" + "git.noahlan.cn/noahlan/nnet/config" + "git.noahlan.cn/noahlan/nnet/core" + "git.noahlan.cn/noahlan/nnet/entity" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/nnet/protocol" + "git.noahlan.cn/noahlan/ntools-go/core/nlog" + "git.noahlan.cn/noahlan/ntools-go/core/pool" + "math" + "time" +) + +func runServer(addr string) { + server := core.NewServer(config.EngineConf{ + ServerConf: config.ServerConf{ + Protocol: "tcp", + Addr: addr, + Name: "testServer", + Mode: "dev", + }, + Pool: pool.Config{ + PoolSize: math.MaxInt32, + ExpiryDuration: time.Second, + PreAlloc: false, + MaxBlockingTasks: 0, + Nonblocking: false, + DisablePurge: false, + }, + }, protocol.WithNNetProtocol(protocol.NNetConfig{ + HeartbeatInterval: 0, + HandshakeValidator: nil, + })...) + + server.AddRoutes([]core.Route{ + { + Matches: protocol.Match{ + Route: "ping", + Code: 1, + }, + Handler: func(et entity.NetworkEntity, pkg packet.IPacket) { + nlog.Info("client ping, server pong -> ") + err := et.Send(protocol.Header{ + PacketType: protocol.Data, + MessageHeader: protocol.MessageHeader{ + MsgType: protocol.Request, + ID: 1, + Route: "pong", + }, + }, []byte("1")) + nlog.Must(err) + }, + }, + }) + + defer server.Stop() + server.Start() +} + +func runClient(addr string) (client *core.Client, et entity.NetworkEntity) { + chReady := make(chan struct{}) + client = core.NewClient(config.EngineConf{ + Pool: pool.Config{ + PoolSize: math.MaxInt32, + ExpiryDuration: time.Second, + PreAlloc: false, + MaxBlockingTasks: 0, + Nonblocking: false, + DisablePurge: false, + }, + }, protocol.WithNNetClientProtocol(func() { + chReady <- struct{}{} + })...) + + client.AddRoutes([]core.Route{ + { + Matches: protocol.Match{ + Route: "test.client", + Code: 1, + }, + Handler: func(et entity.NetworkEntity, pkg packet.IPacket) { + nlog.Info("client hahaha") + }, + }, + }) + et = client.Dial(addr) + + handshake, err := json.Marshal(&protocol.HandshakeReq{ + Version: "1.0.0", + Type: "test", + ClientId: "a", + ClientSecret: "a", + Payload: map[string]string{ + "pl": "test-data", + }, + }) + nlog.Must(err) + + packer := protocol.NewNNetPacker() + hsd, err := packer.Pack(protocol.Header{ + PacketType: protocol.Handshake, + MessageHeader: protocol.MessageHeader{ + MsgType: 0, + ID: 0, + Route: "", + }, + }, handshake) + nlog.Must(err) + + err = et.SendBytes(hsd) + nlog.Must(err) + + <-chReady + return +} diff --git a/test/test_nnet_test.go b/test/test_nnet_test.go new file mode 100644 index 0000000..e01f104 --- /dev/null +++ b/test/test_nnet_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "git.noahlan.cn/noahlan/nnet/core" + "git.noahlan.cn/noahlan/nnet/entity" + "git.noahlan.cn/noahlan/nnet/packet" + "git.noahlan.cn/noahlan/nnet/protocol" + "git.noahlan.cn/noahlan/ntools-go/core/nlog" + "sync" + "testing" +) + +func TestServer(t *testing.T) { + runServer("0.0.0.0:6666") +} + +func TestClient(t *testing.T) { + client, et := runClient("127.0.0.1:6666") + client.AddRoute(core.Route{ + Matches: protocol.Match{ + Route: "pong", + Code: 2, + }, + Handler: func(et entity.NetworkEntity, pkg packet.IPacket) { + nlog.Info("server pong, client ping ->") + _ = et.Send(protocol.Header{ + PacketType: protocol.Data, + MessageHeader: protocol.MessageHeader{ + MsgType: protocol.Request, + ID: 1, + Route: "ping", + }, + }, []byte("1")) + }, + }) + + _ = et.Send(protocol.Header{ + PacketType: protocol.Data, + MessageHeader: protocol.MessageHeader{ + MsgType: protocol.Request, + ID: 1, + Route: "ping", + }, + }, []byte("1")) + + var wg sync.WaitGroup + wg.Add(1) + + wg.Wait() +}