refactor: 重构nnet,高度扩展性,支持诸多特性。

main v0.4.0
NorthLan 2 years ago
parent 52eea69cd0
commit 87b0ae6cec

@ -128,7 +128,7 @@ func (c *Group) Add(e entity.NetworkEntity) error {
c.conns[id] = e c.conns[id] = e
} }
nlog.Debugf("Add connection to group %s, ID=%d, UID=%d", c.name, sess.ID(), sess.UID()) nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", c.name, sess.ID(), sess.UID())
return nil return nil
} }
@ -141,7 +141,7 @@ func (c *Group) Leave(e entity.NetworkEntity) error {
return nil return nil
} }
sess := e.Session() sess := e.Session()
nlog.Debugf("Remove connection from group %s, UID=%d", c.name, sess.ID()) nlog.Debugf("Remove connection from group %s, UID=%s", c.name, sess.UID())
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

@ -183,6 +183,7 @@ func (r *connection) write() {
nlog.Error(err.Error()) nlog.Error(err.Error())
break break
} }
//nlog.Debugf("write data %v", data)
case <-r.chDie: // connection close signal case <-r.chDie: // connection close signal
return return
case <-r.ngin.dieChan: // application quit signal case <-r.ngin.dieChan: // application quit signal
@ -199,6 +200,7 @@ func (r *connection) read() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
for { for {
n, err := r.conn.Read(buf) n, err := r.conn.Read(buf)
//nlog.Debugf("receive data %v", buf[:n])
if err != nil { if err != nil {
nlog.Errorf("Read message error: %s, session will be closed immediately", err.Error()) nlog.Errorf("Read message error: %s, session will be closed immediately", err.Error())
return return

@ -107,6 +107,29 @@ func convertMiddleware(ware Middleware) func(Handler) Handler {
} }
} }
func (ng *engine) dial(addr string, router Router) (entity.NetworkEntity, error) {
ng.handler = router
if err := ng.bindRoutes(router); err != nil {
return nil, err
}
go scheduler.Schedule()
// connection
conn, err := net.Dial("tcp", addr)
nlog.Must(err)
c := newConnection(ng, conn)
c.serve()
// hook
Lifetime.Open(c)
// 连接成功,客户端已启动
nlog.Infof("now connect to %s.", addr)
return c, nil
}
func (ng *engine) serve(router Router) error { func (ng *engine) serve(router Router) error {
ng.handler = router ng.handler = router
@ -115,7 +138,7 @@ func (ng *engine) serve(router Router) error {
} }
go scheduler.Schedule() go scheduler.Schedule()
defer func() { defer func() {
nlog.Info("Server is stopping...") nlog.Info("NNet is stopping...")
ng.shutdown() ng.shutdown()
scheduler.Close() scheduler.Close()
@ -136,9 +159,9 @@ func (ng *engine) serve(router Router) error {
select { select {
case <-ng.dieChan: case <-ng.dieChan:
nlog.Info("Server will shutdown in a few seconds") nlog.Info("NNet will shutdown in a few seconds")
case s := <-sg: case s := <-sg:
nlog.Infof("Server got signal: %s", s) nlog.Infof("NNet got signal: %s", s)
} }
return nil return nil
@ -153,15 +176,12 @@ func (ng *engine) shutdown() {
func (ng *engine) listenAndServe() { func (ng *engine) listenAndServe() {
listener, err := net.Listen(ng.conf.Protocol, ng.conf.Addr) listener, err := net.Listen(ng.conf.Protocol, ng.conf.Addr)
if err != nil { nlog.Must(err)
panic(err)
}
// 监听成功,服务已启动 // 监听成功,服务已启动
nlog.Infof("now listening %s on %s.", ng.conf.Protocol, ng.conf.Addr) nlog.Infof("now listening %s on %s.", ng.conf.Protocol, ng.conf.Addr)
defer func() { defer func() {
_ = listener.Close() _ = listener.Close()
ng.shutdown()
ng.close() ng.close()
}() }()

@ -2,6 +2,7 @@ package core
import ( import (
"git.noahlan.cn/noahlan/nnet/config" "git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/internal/env" "git.noahlan.cn/noahlan/nnet/internal/env"
"git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/pipeline" "git.noahlan.cn/noahlan/nnet/pipeline"
@ -12,10 +13,18 @@ import (
) )
type ( type (
// RunOption defines the method to customize a Server. // RunOption defines the method to customize a NNet.
RunOption func(*Server) RunOption func(*NNet)
Server struct { Server struct {
*NNet
}
Client struct {
*NNet
}
NNet struct {
ngin *engine ngin *engine
router Router router Router
} }
@ -25,29 +34,39 @@ type (
// Be aware that later RunOption might overwrite previous one that write the same option. // Be aware that later RunOption might overwrite previous one that write the same option.
func NewServer(c config.EngineConf, opts ...RunOption) *Server { func NewServer(c config.EngineConf, opts ...RunOption) *Server {
s := &Server{ s := &Server{
NNet: &NNet{
ngin: newEngine(c), ngin: newEngine(c),
router: NewDefaultRouter(), router: NewDefaultRouter(),
},
} }
opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...) opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...)
for _, opt := range opts { for _, opt := range opts {
opt(s) opt(s.NNet)
} }
return s return s
} }
// AddRoutes add given routes into the Server. // NewClient returns a client with given config of c and options defined in opts.
func (s *Server) AddRoutes(rs []Route) { // Be aware that later RunOption might overwrite previous one that write the same option.
s.ngin.addRoutes(rs...) func NewClient(c config.EngineConf, opts ...RunOption) *Client {
s := &Client{
NNet: &NNet{
ngin: newEngine(c),
router: NewDefaultRouter(),
},
} }
// AddRoute adds given route into the Server. opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...)
func (s *Server) AddRoute(r Route) { for _, opt := range opts {
s.AddRoutes([]Route{r}) opt(s.NNet)
} }
// Start starts the Server. return s
}
// Start starts the NNet.
// Graceful shutdown is enabled by default. // Graceful shutdown is enabled by default.
func (s *Server) Start() { func (s *Server) Start() {
if err := s.ngin.serve(s.router); err != nil { if err := s.ngin.serve(s.router); err != nil {
@ -56,18 +75,38 @@ func (s *Server) Start() {
} }
} }
// Stop stops the Server. // Dial start the NNet client.
func (s *Server) Stop() { // Graceful shutdown is enabled by default.
func (c *Client) Dial(addr string) entity.NetworkEntity {
e, err := c.ngin.dial(addr, c.router)
nlog.Must(err)
return e
}
// AddRoutes add given routes into the NNet.
func (s *NNet) AddRoutes(rs []Route) {
s.ngin.addRoutes(rs...)
err := s.ngin.bindRoutes(s.router)
nlog.Must(err)
}
// AddRoute adds given route into the NNet.
func (s *NNet) AddRoute(r Route) {
s.AddRoutes([]Route{r})
}
// Stop stops the NNet.
func (s *NNet) Stop() {
s.ngin.close() s.ngin.close()
} }
// Use adds the given middleware in the Server. // Use adds the given middleware in the NNet.
func (s *Server) Use(middleware ...Middleware) { func (s *NNet) Use(middleware ...Middleware) {
s.ngin.use(middleware...) s.ngin.use(middleware...)
} }
// Pipeline returns inner pipeline // Pipeline returns inner pipeline
func (s *Server) Pipeline() pipeline.Pipeline { func (s *NNet) Pipeline() pipeline.Pipeline {
return s.ngin.pipeline return s.ngin.pipeline
} }
@ -101,14 +140,14 @@ func WithMiddleware(middleware Middleware, rs ...Route) []Route {
} }
func UseMiddleware(middleware ...Middleware) RunOption { func UseMiddleware(middleware ...Middleware) RunOption {
return func(server *Server) { return func(server *NNet) {
server.Use(middleware...) server.Use(middleware...)
} }
} }
// WithNotFoundHandler returns a RunOption with not found handler set to given handler. // WithNotFoundHandler returns a RunOption with not found handler set to given handler.
func WithNotFoundHandler(handler Handler) RunOption { func WithNotFoundHandler(handler Handler) RunOption {
return func(server *Server) { return func(server *NNet) {
notFoundHandler := server.ngin.notFoundHandler(handler) notFoundHandler := server.ngin.notFoundHandler(handler)
server.router.SetNotFoundHandler(notFoundHandler) server.router.SetNotFoundHandler(notFoundHandler)
} }
@ -116,21 +155,21 @@ func WithNotFoundHandler(handler Handler) RunOption {
// WithRouter 设置消息路由 // WithRouter 设置消息路由
func WithRouter(router Router) RunOption { func WithRouter(router Router) RunOption {
return func(server *Server) { return func(server *NNet) {
server.router = router server.router = router
} }
} }
// WithPacker 设置消息的 封包/解包 方式 // WithPacker 设置消息的 封包/解包 方式
func WithPacker(fn packet.NewPackerFunc) RunOption { func WithPacker(fn packet.NewPackerFunc) RunOption {
return func(server *Server) { return func(server *NNet) {
server.ngin.packerFn = fn server.ngin.packerFn = fn
} }
} }
// WithSerializer 设置消息的 序列化/反序列化 方式 // WithSerializer 设置消息的 序列化/反序列化 方式
func WithSerializer(s serialize.Serializer) RunOption { func WithSerializer(s serialize.Serializer) RunOption {
return func(server *Server) { return func(server *NNet) {
server.ngin.serializer = s server.ngin.serializer = s
} }
} }
@ -142,13 +181,13 @@ func WithTimerPrecision(precision time.Duration) RunOption {
if precision < time.Millisecond { if precision < time.Millisecond {
panic("time precision can not less than a Millisecond") panic("time precision can not less than a Millisecond")
} }
return func(_ *Server) { return func(_ *NNet) {
env.TimerPrecision = precision env.TimerPrecision = precision
} }
} }
func WithPipeline(pipeline pipeline.Pipeline) RunOption { func WithPipeline(pipeline pipeline.Pipeline) RunOption {
return func(server *Server) { return func(server *NNet) {
server.ngin.pipeline = pipeline server.ngin.pipeline = pipeline
} }
} }
@ -156,7 +195,7 @@ func WithPipeline(pipeline pipeline.Pipeline) RunOption {
type PipelineOption func(opts pipeline.Pipeline) type PipelineOption func(opts pipeline.Pipeline)
func WithPipelineOpt(opts ...func(pipeline.Pipeline)) RunOption { func WithPipelineOpt(opts ...func(pipeline.Pipeline)) RunOption {
return func(server *Server) { return func(server *NNet) {
for _, opt := range opts { for _, opt := range opts {
opt(server.ngin.pipeline) opt(server.ngin.pipeline)
} }
@ -167,7 +206,7 @@ type WSOption func(opts *wsOptions)
// WithWebsocket 开启Websocket, 参数是websocket的相关参数 nnet.WSOption // WithWebsocket 开启Websocket, 参数是websocket的相关参数 nnet.WSOption
func WithWebsocket(wsOpts ...WSOption) RunOption { func WithWebsocket(wsOpts ...WSOption) RunOption {
return func(server *Server) { return func(server *NNet) {
for _, opt := range wsOpts { for _, opt := range wsOpts {
opt(&server.ngin.wsOpt) opt(&server.ngin.wsOpt)
} }

@ -0,0 +1,66 @@
package core
import (
"fmt"
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"git.noahlan.cn/noahlan/ntools-go/core/pool"
"math"
"testing"
"time"
)
func TestServer(t *testing.T) {
server := NewServer(config.EngineConf{
ServerConf: config.ServerConf{
Protocol: "tcp",
Addr: "0.0.0.0:6666",
Name: "testServer",
Mode: "dev",
},
Pool: pool.Config{
PoolSize: math.MaxInt32,
ExpiryDuration: time.Second,
PreAlloc: false,
MaxBlockingTasks: 0,
Nonblocking: false,
DisablePurge: false,
},
}, protocol.WithNNetProtocol(protocol.NNetConfig{
HeartbeatInterval: 0,
HandshakeValidator: nil,
})...)
server.AddRoute(Route{
Matches: protocol.Match{
Route: "test",
Code: 1,
},
Handler: func(entity entity.NetworkEntity, pkg packet.IPacket) {
fmt.Println(pkg)
p, ok := pkg.(*protocol.NNetPacket)
if !ok {
nlog.Error("wrong packet type")
return
}
bd := []byte("服务器接收到数据为: " + string(p.GetBody()))
// 注Response类型数据不需要Route原地返回客户端需等待
_ = entity.Send(protocol.Header{
PacketType: protocol.Data,
Length: uint32(len(bd)),
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Response,
ID: p.ID,
Route: p.Route,
},
}, bd)
},
})
defer server.Stop()
server.Start()
}

@ -1,53 +0,0 @@
package core
import (
"fmt"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"testing"
"time"
)
func TestServer(t *testing.T) {
server := NewServer(EngineConf{
Protocol: "tcp",
Addr: ":12345",
Name: "N-Net",
Mode: DevMode,
},
WithPacker(func() packet.Packer {
return packet.NewNNetPacker()
}),
WithSerializer(nil),
WithHeartbeatInterval(time.Hour),
WithProcessor(NewNNetProcessor()),
)
server.AddRoute(Route{
Matches: "test",
Handler: func(conn *connection, pkg packet.IPacket) {
fmt.Println(pkg)
p, ok := pkg.(*packet.Packet)
if !ok {
nlog.Error("wrong packet type")
return
}
bd := []byte("服务器接收到数据为: " + string(p.GetBody()))
// 注Response类型数据不需要Route原地返回客户端需等待
conn.Send(packet.Header{
PacketType: packet.Data,
Length: uint32(len(bd)),
MessageHeader: packet.MessageHeader{
MsgType: packet.Response,
ID: p.ID,
Route: p.Route,
},
}, bd)
},
})
defer server.Stop()
server.Start()
}

@ -4,7 +4,7 @@ go 1.20
require ( require (
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/panjf2000/ants/v2 v2.7.3 github.com/panjf2000/ants/v2 v2.7.3 // indirect
) )
require google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 require google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8

@ -1,5 +1,3 @@
git.noahlan.cn/noahlan/ntools-go/core v1.1.1 h1:icFPOTTpVYPa8NpNJteAwFBARPOuHE3695xZWNcAM2c=
git.noahlan.cn/noahlan/ntools-go/core v1.1.1/go.mod h1:UN8UVL5WoyMgqNcxKoAu0/J9d+1hH2Yco64MUtPdjFk=
git.noahlan.cn/noahlan/ntools-go/core v1.1.3 h1:n4z0KaXmX/fmobavxCMc2vGJDoStbhNbm8AZugPEPGg= git.noahlan.cn/noahlan/ntools-go/core v1.1.3 h1:n4z0KaXmX/fmobavxCMc2vGJDoStbhNbm8AZugPEPGg=
git.noahlan.cn/noahlan/ntools-go/core v1.1.3/go.mod h1:pmwee9V76Cyp6nVr3dPj5TpePLvRpc8C0ZgAzFIFAKU= git.noahlan.cn/noahlan/ntools-go/core v1.1.3/go.mod h1:pmwee9V76Cyp6nVr3dPj5TpePLvRpc8C0ZgAzFIFAKU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -21,8 +19,7 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/panjf2000/ants/v2 v2.6.0 h1:xOSpw42m+BMiJ2I33we7h6fYzG4DAlpE1xyI7VS2gxU= github.com/panjf2000/ants/v2 v2.7.3 h1:rHQ0hH0DQvuNUqqlWIMJtkMcDuL1uQAfpX2mIhQ5/s0=
github.com/panjf2000/ants/v2 v2.6.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
github.com/panjf2000/ants/v2 v2.7.3/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/panjf2000/ants/v2 v2.7.3/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -40,6 +37,7 @@ go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyK
go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=

@ -15,19 +15,19 @@ type HeartbeatMiddleware struct {
hbdFn func(entity entity.NetworkEntity) []byte hbdFn func(entity entity.NetworkEntity) []byte
} }
func WithHeartbeat(interval time.Duration, dataFn func(entity entity.NetworkEntity) []byte) core.RunOption { func WithHeartbeat(interval time.Duration, hbdFn func(entity entity.NetworkEntity) []byte) core.RunOption {
m := &HeartbeatMiddleware{ m := &HeartbeatMiddleware{
lastAt: time.Now().Unix(), lastAt: time.Now().Unix(),
interval: interval, interval: interval,
hbdFn: dataFn, hbdFn: hbdFn,
} }
if dataFn == nil { if hbdFn == nil {
nlog.Error("dataFn must not be nil") nlog.Error("dataFn must not be nil")
panic("dataFn must not be nil") panic("dataFn must not be nil")
} }
core.Lifetime.OnOpen(m.start) core.Lifetime.OnOpen(m.start)
return func(server *core.Server) { return func(server *core.NNet) {
server.Use(func(next core.HandlerFunc) core.HandlerFunc { server.Use(func(next core.HandlerFunc) core.HandlerFunc {
return func(entity entity.NetworkEntity, pkg packet.IPacket) { return func(entity entity.NetworkEntity, pkg packet.IPacket) {
m.handle(entity, pkg) m.handle(entity, pkg)

@ -0,0 +1,65 @@
package protocol
import (
"encoding/json"
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
)
type OnReadyFunc func()
func WithNNetClientPipeline(onReady OnReadyFunc) core.RunOption {
packer := NewNNetPacker()
return func(server *core.NNet) {
server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error {
pkg, ok := v.(*NNetPacket)
if !ok {
return ErrWrongPacketType
}
conn, _ := entity.Conn()
// Server to client
switch pkg.PacketType {
case Handshake:
var handshakeData HandshakeResp
err := json.Unmarshal(pkg.Data, &handshakeData)
nlog.Must(err)
hrd, _ := packer.Pack(Header{
PacketType: HandshakeAck,
MessageHeader: MessageHeader{},
}, nil)
if err := entity.SendBytes(hrd); err != nil {
return err
}
entity.SetStatus(core.StatusWorking)
// onReady
if onReady != nil {
onReady()
}
nlog.Debugf("connection handshake Id=%d, Remote=%s", entity.Session().ID(), conn.RemoteAddr())
case Kick:
_ = entity.Close()
case Data:
status := entity.Status()
if status != core.StatusWorking {
return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
conn.RemoteAddr()))
}
var lastMid uint64
switch pkg.MsgType {
case Response:
lastMid = pkg.ID
case Notify:
lastMid = 0
}
entity.SetLastMID(lastMid)
}
return nil
})
}
}

@ -1,7 +1,6 @@
package protocol package protocol
import ( import (
"encoding/json"
"git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/middleware" "git.noahlan.cn/noahlan/nnet/middleware"
@ -14,10 +13,9 @@ type (
NNetConfig struct { NNetConfig struct {
HeartbeatInterval time.Duration HeartbeatInterval time.Duration
HandshakeValidator HandshakeValidatorFunc HandshakeValidator HandshakeValidatorFunc
HandshakeAckBuilder HandshakeAckBuilderFunc
} }
handshakeData struct { HandshakeReq struct {
Version string `json:"version"` // 客户端版本,服务器以此判断是否合适与客户端通信 Version string `json:"version"` // 客户端版本,服务器以此判断是否合适与客户端通信
Type string `json:"type"` // 客户端类型,与客户端版本号一起来确定客户端是否合适 Type string `json:"type"` // 客户端类型,与客户端版本号一起来确定客户端是否合适
ClientId string `json:"clientId"` // 客户端ID服务器以此取值 ClientId string `json:"clientId"` // 客户端ID服务器以此取值
@ -27,39 +25,56 @@ type (
Payload interface{} `json:"payload,optional,omitempty"` Payload interface{} `json:"payload,optional,omitempty"`
} }
HandshakeAckData struct { HandshakeResp struct {
// 心跳间隔,单位秒 0表示不需要心跳 // 心跳间隔,单位秒 0表示不需要心跳
Heartbeat int64 `json:"heartbeat"` Heartbeat int64 `json:"heartbeat"`
// 路由 *RouteMap
Routes map[string]uint16 `json:"routes"` // route map to code
Codes map[uint16]string `json:"codes"` // code map to route
// 透传信息 // 透传信息
Payload interface{} `json:"payload,optional,omitempty"` Payload interface{} `json:"payload,optional,omitempty"`
} }
) )
var routeMap *RouteMap
func init() {
routeMap = &RouteMap{
Routes: make(map[string]uint16),
Codes: make(map[uint16]string),
}
}
func WithNNetClientProtocol(onReady OnReadyFunc) []core.RunOption {
opts := []core.RunOption{
WithNNetClientPipeline(onReady),
core.WithRouter(NewNNetRouter()),
core.WithPacker(func() packet.Packer { return NewNNetPacker() }),
}
return opts
}
func WithNNetProtocol(config NNetConfig) []core.RunOption { func WithNNetProtocol(config NNetConfig) []core.RunOption {
if config.HandshakeValidator == nil { if config.HandshakeValidator == nil {
config.HandshakeValidator = func(bytes []byte) error { return nil } config.HandshakeValidator = func(data *HandshakeReq) error {
return nil
} }
if config.HandshakeAckBuilder == nil {
config.HandshakeAckBuilder = func() ([]byte, error) {
defaultData := &HandshakeAckData{}
return json.Marshal(defaultData)
} }
handshakeAckData := &HandshakeResp{
Heartbeat: int64(config.HeartbeatInterval.Seconds()),
RouteMap: routeMap,
} }
opts := []core.RunOption{ opts := []core.RunOption{
WithNNetPipeline(config.HandshakeAckBuilder, config.HandshakeValidator), WithNNetPipeline(handshakeAckData, config.HandshakeValidator),
core.WithRouter(NewNNetRouter()), core.WithRouter(NewNNetRouter()),
core.WithPacker(func() packet.Packer { return NewNNetPacker() }), core.WithPacker(func() packet.Packer { return NewNNetPacker() }),
} }
if config.HeartbeatInterval.Seconds() > 0 { if config.HeartbeatInterval.Seconds() > 0 {
packer := NewNNetPacker() packer := NewNNetPacker()
hbd, err := packer.Pack(Handshake, nil) hbd, err := packer.Pack(Heartbeat, nil)
nlog.Must(err) nlog.Must(err)
opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ entity.NetworkEntity) []byte { opts = append(opts, middleware.WithHeartbeat(config.HeartbeatInterval, func(_ entity.NetworkEntity) []byte {

@ -35,11 +35,6 @@ var (
ErrWrongPacketType = errors.New("wrong packet type") ErrWrongPacketType = errors.New("wrong packet type")
) )
var (
routes = make(map[string]uint16) // route map to code
codes = make(map[uint16]string) // code map to route
)
func NewNNetPacker() *NNetPacker { func NewNNetPacker() *NNetPacker {
p := &NNetPacker{ p := &NNetPacker{
buf: bytes.NewBuffer(nil), buf: bytes.NewBuffer(nil),
@ -87,7 +82,7 @@ func (d *NNetPacker) Pack(header interface{}, data []byte) ([]byte, error) {
// flag // flag
flag := byte(h.MsgType << 1) // 编译器提示,此处 byte 转换不能删 flag := byte(h.MsgType << 1) // 编译器提示,此处 byte 转换不能删
code, compressed := routes[h.Route] code, compressed := routeMap.Routes[h.Route]
if compressed { if compressed {
flag |= msgRouteCompressMask flag |= msgRouteCompressMask
} }
@ -185,7 +180,7 @@ func (d *NNetPacker) Unpack(data []byte) ([]packet.IPacket, error) {
if d.flag&msgRouteCompressMask == 1 { if d.flag&msgRouteCompressMask == 1 {
p.compressed = true p.compressed = true
code := binary.BigEndian.Uint16(d.buf.Next(2)) code := binary.BigEndian.Uint16(d.buf.Next(2))
route, ok := codes[code] route, ok := routeMap.Codes[code]
if !ok { if !ok {
return nil, ErrRouteInfoNotFound return nil, ErrRouteInfoNotFound
} }

@ -1,6 +1,7 @@
package protocol package protocol
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/core"
@ -9,13 +10,16 @@ import (
) )
type ( type (
HandshakeValidatorFunc func([]byte) error HandshakeValidatorFunc func(*HandshakeReq) error
HandshakeAckBuilderFunc func() (interface{}, error) HandshakeAckPayloadFunc func() interface{}
) )
func WithNNetPipeline(ackDataBuilder HandshakeAckBuilderFunc, validator HandshakeValidatorFunc) core.RunOption { func WithNNetPipeline(
handshakeResp *HandshakeResp,
validator HandshakeValidatorFunc,
) core.RunOption {
packer := NewNNetPacker() packer := NewNNetPacker()
return func(server *core.Server) { return func(server *core.NNet) {
server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error { server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error {
pkg, ok := v.(*NNetPacket) pkg, ok := v.(*NNetPacket)
if !ok { if !ok {
@ -25,13 +29,22 @@ func WithNNetPipeline(ackDataBuilder HandshakeAckBuilderFunc, validator Handshak
switch pkg.PacketType { switch pkg.PacketType {
case Handshake: case Handshake:
if err := validator(pkg.Data); err != nil { var handshakeData HandshakeReq
err := json.Unmarshal(pkg.Data, &handshakeData)
nlog.Must(err)
if err := validator(&handshakeData); err != nil {
return err return err
} }
data, err := ackDataBuilder() handshakeResp.Payload = handshakeData.Payload
data, err := json.Marshal(handshakeResp)
nlog.Must(err) nlog.Must(err)
hrd, _ := packer.Pack(Handshake, data) hrd, _ := packer.Pack(Header{
PacketType: Handshake,
MessageHeader: MessageHeader{},
}, data)
if err := entity.SendBytes(hrd); err != nil { if err := entity.SendBytes(hrd); err != nil {
return err return err
} }
@ -40,8 +53,6 @@ func WithNNetPipeline(ackDataBuilder HandshakeAckBuilderFunc, validator Handshak
case HandshakeAck: case HandshakeAck:
entity.SetStatus(core.StatusPending) entity.SetStatus(core.StatusPending)
nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", entity.Session().ID(), conn.RemoteAddr()) nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", entity.Session().ID(), conn.RemoteAddr())
case Heartbeat:
// Expected
case Data: case Data:
if entity.Status() < core.StatusPending { if entity.Status() < core.StatusPending {
return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", return errors.New(fmt.Sprintf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",

@ -2,16 +2,35 @@ package protocol
import ( import (
"errors" "errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntools-go/core/nlog" "git.noahlan.cn/noahlan/ntools-go/core/nlog"
) )
type nNetRouter struct { var (
Routes = make(map[string]uint16) // route map to code
Codes = make(map[uint16]string) // code map to route
)
type (
RouteMap struct {
// 路由
Routes map[string]uint16 `json:"routes"` // route map to code
Codes map[uint16]string `json:"codes"` // code map to route
}
Match struct {
Route string // 路由
Code uint16 // 路由编码
}
nNetRouter struct {
handlers map[string]core.Handler handlers map[string]core.Handler
notFound core.Handler notFound core.Handler
} }
)
func NewNNetRouter() core.Router { func NewNNetRouter() core.Router {
return &nNetRouter{ return &nNetRouter{
@ -38,11 +57,14 @@ func (r *nNetRouter) Handle(entity entity.NetworkEntity, p packet.IPacket) {
} }
func (r *nNetRouter) Register(matches interface{}, handler core.Handler) error { func (r *nNetRouter) Register(matches interface{}, handler core.Handler) error {
route, ok := matches.(string) match, ok := matches.(Match)
if !ok { if !ok {
return errors.New("the type of matches must be string") return errors.New(fmt.Sprintf("the type of matches must be %T", Match{}))
} }
r.handlers[route] = handler r.handlers[match.Route] = handler
// routeMap
routeMap.Routes[match.Route] = match.Code
routeMap.Codes[match.Code] = match.Route
return nil return nil
} }

@ -0,0 +1,117 @@
package main
import (
"encoding/json"
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"git.noahlan.cn/noahlan/ntools-go/core/pool"
"math"
"time"
)
func runServer(addr string) {
server := core.NewServer(config.EngineConf{
ServerConf: config.ServerConf{
Protocol: "tcp",
Addr: addr,
Name: "testServer",
Mode: "dev",
},
Pool: pool.Config{
PoolSize: math.MaxInt32,
ExpiryDuration: time.Second,
PreAlloc: false,
MaxBlockingTasks: 0,
Nonblocking: false,
DisablePurge: false,
},
}, protocol.WithNNetProtocol(protocol.NNetConfig{
HeartbeatInterval: 0,
HandshakeValidator: nil,
})...)
server.AddRoutes([]core.Route{
{
Matches: protocol.Match{
Route: "ping",
Code: 1,
},
Handler: func(et entity.NetworkEntity, pkg packet.IPacket) {
nlog.Info("client ping, server pong -> ")
err := et.Send(protocol.Header{
PacketType: protocol.Data,
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Request,
ID: 1,
Route: "pong",
},
}, []byte("1"))
nlog.Must(err)
},
},
})
defer server.Stop()
server.Start()
}
func runClient(addr string) (client *core.Client, et entity.NetworkEntity) {
chReady := make(chan struct{})
client = core.NewClient(config.EngineConf{
Pool: pool.Config{
PoolSize: math.MaxInt32,
ExpiryDuration: time.Second,
PreAlloc: false,
MaxBlockingTasks: 0,
Nonblocking: false,
DisablePurge: false,
},
}, protocol.WithNNetClientProtocol(func() {
chReady <- struct{}{}
})...)
client.AddRoutes([]core.Route{
{
Matches: protocol.Match{
Route: "test.client",
Code: 1,
},
Handler: func(et entity.NetworkEntity, pkg packet.IPacket) {
nlog.Info("client hahaha")
},
},
})
et = client.Dial(addr)
handshake, err := json.Marshal(&protocol.HandshakeReq{
Version: "1.0.0",
Type: "test",
ClientId: "a",
ClientSecret: "a",
Payload: map[string]string{
"pl": "test-data",
},
})
nlog.Must(err)
packer := protocol.NewNNetPacker()
hsd, err := packer.Pack(protocol.Header{
PacketType: protocol.Handshake,
MessageHeader: protocol.MessageHeader{
MsgType: 0,
ID: 0,
Route: "",
},
}, handshake)
nlog.Must(err)
err = et.SendBytes(hsd)
nlog.Must(err)
<-chReady
return
}

@ -0,0 +1,50 @@
package main
import (
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"sync"
"testing"
)
func TestServer(t *testing.T) {
runServer("0.0.0.0:6666")
}
func TestClient(t *testing.T) {
client, et := runClient("127.0.0.1:6666")
client.AddRoute(core.Route{
Matches: protocol.Match{
Route: "pong",
Code: 2,
},
Handler: func(et entity.NetworkEntity, pkg packet.IPacket) {
nlog.Info("server pong, client ping ->")
_ = et.Send(protocol.Header{
PacketType: protocol.Data,
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Request,
ID: 1,
Route: "ping",
},
}, []byte("1"))
},
})
_ = et.Send(protocol.Header{
PacketType: protocol.Data,
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Request,
ID: 1,
Route: "ping",
},
}, []byte("1"))
var wg sync.WaitGroup
wg.Add(1)
wg.Wait()
}
Loading…
Cancel
Save