From 5779bb7989276a746b38752229c1f4823ce44db5 Mon Sep 17 00:00:00 2001 From: NorthLan <6995syu@163.com> Date: Mon, 31 Oct 2022 17:54:36 +0800 Subject: [PATCH] =?UTF-8?q?wip:=20=E5=8F=88=E5=8A=A0=E4=BA=86=E4=B8=80?= =?UTF-8?q?=E4=BA=9B=E6=96=B0=E4=B8=9C=E8=A5=BF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/hub.go | 20 +++ component/util.go | 3 +- interfaces/i_message.go | 4 - interfaces/i_packet.go | 7 - log/logger.go | 88 +++++++++++++ message/message.go | 11 ++ net/options.go | 9 -- net/request.go | 37 ------ net/server.go | 124 ------------------ net/session_mgr.go | 40 ------ {interfaces => nface}/i_router.go | 2 +- {interfaces => nface}/i_session.go | 2 +- {net => nnet}/handler.go | 27 +--- nnet/options.go | 57 ++++++++ nnet/pool.go | 27 ++++ nnet/request.go | 63 +++++++++ nnet/server.go | 201 +++++++++++++++++++++++++++++ {net => nnet}/vars.go | 2 +- nnet/ws.go | 114 ++++++++++++++++ packet/packet.go | 48 +++++++ pipeline/pipeline.go | 83 ++++++++++++ {net => session}/session.go | 7 +- session/session_mgr.go | 47 +++++++ 23 files changed, 771 insertions(+), 252 deletions(-) create mode 100644 component/hub.go delete mode 100644 interfaces/i_message.go delete mode 100644 interfaces/i_packet.go create mode 100644 log/logger.go create mode 100644 message/message.go delete mode 100644 net/options.go delete mode 100644 net/request.go delete mode 100644 net/server.go delete mode 100644 net/session_mgr.go rename {interfaces => nface}/i_router.go (73%) rename {interfaces => nface}/i_session.go (97%) rename {net => nnet}/handler.go (61%) create mode 100644 nnet/options.go create mode 100644 nnet/pool.go create mode 100644 nnet/request.go create mode 100644 nnet/server.go rename {net => nnet}/vars.go (94%) create mode 100644 nnet/ws.go create mode 100644 packet/packet.go create mode 100644 pipeline/pipeline.go rename {net => session}/session.go (93%) create mode 100644 session/session_mgr.go diff --git a/component/hub.go b/component/hub.go new file mode 100644 index 0000000..6d1495e --- /dev/null +++ b/component/hub.go @@ -0,0 +1,20 @@ +package component + +type CompWithOptions struct { + Comp Component + Opts []Option +} + +type Components struct { + comps []CompWithOptions +} + +// Register 全局注册组件,必须在服务启动之前初始化 +func (cs *Components) Register(c Component, options ...Option) { + cs.comps = append(cs.comps, CompWithOptions{c, options}) +} + +// List 获取所有已注册组件 +func (cs *Components) List() []CompWithOptions { + return cs.comps +} diff --git a/component/util.go b/component/util.go index c899a1b..964080c 100644 --- a/component/util.go +++ b/component/util.go @@ -1,7 +1,6 @@ package component import ( - "git.noahlan.cn/northlan/nnet/net" "reflect" "unicode" "unicode/utf8" @@ -10,7 +9,7 @@ import ( var ( typeOfError = reflect.TypeOf((*error)(nil)).Elem() typeOfBytes = reflect.TypeOf(([]byte)(nil)) - typeOfRequest = reflect.TypeOf(net.Request{}) + typeOfRequest = reflect.TypeOf(nnet.Request{}) ) func isExported(name string) bool { diff --git a/interfaces/i_message.go b/interfaces/i_message.go deleted file mode 100644 index 415de52..0000000 --- a/interfaces/i_message.go +++ /dev/null @@ -1,4 +0,0 @@ -package interfaces - -type IMessage interface { -} diff --git a/interfaces/i_packet.go b/interfaces/i_packet.go deleted file mode 100644 index 34e6cbf..0000000 --- a/interfaces/i_packet.go +++ /dev/null @@ -1,7 +0,0 @@ -package interfaces - -// IPacket 面向TCP连接中的数据流,网络数据包抽象接口 -type IPacket interface { - // HeaderLen TCP数据帧头部长度,固定长度值 - HeaderLen() int -} diff --git a/log/logger.go b/log/logger.go new file mode 100644 index 0000000..a03f4be --- /dev/null +++ b/log/logger.go @@ -0,0 +1,88 @@ +package log + +import ( + "log" + "os" +) + +type Logger interface { + Debugf(format string, v ...interface{}) + Debug(v ...interface{}) + Info(v ...interface{}) + Infof(format string, v ...interface{}) + Error(v ...interface{}) + Errorf(format string, v ...interface{}) + Panic(v ...interface{}) + Panicf(format string, v ...interface{}) +} + +func init() { + SetLogger(newInnerLogger()) +} + +var ( + Debugf func(format string, v ...interface{}) + Debug func(v ...interface{}) + Info func(v ...interface{}) + Infof func(format string, v ...interface{}) + Error func(v ...interface{}) + Errorf func(format string, v ...interface{}) + Panic func(v ...interface{}) + Panicf func(format string, v ...interface{}) +) + +func SetLogger(logger Logger) { + if logger == nil { + return + } + Debugf = logger.Debugf + Debug = logger.Debug + Info = logger.Info + Infof = logger.Infof + Error = logger.Error + Errorf = logger.Errorf + Panic = logger.Panic + Panicf = logger.Panicf +} + +type innerLogger struct { + log *log.Logger +} + +func newInnerLogger() Logger { + return &innerLogger{ + log: log.New(os.Stderr, "[N-Net] ", log.LstdFlags|log.Lshortfile), + } +} + +func (i *innerLogger) Debugf(format string, v ...interface{}) { + i.log.Printf(format, v) +} + +func (i *innerLogger) Debug(v ...interface{}) { + i.log.Println(v) +} + +func (i *innerLogger) Info(v ...interface{}) { + i.log.Println(v) +} + +func (i *innerLogger) Infof(format string, v ...interface{}) { + i.log.Printf(format, v) +} + +func (i *innerLogger) Error(v ...interface{}) { + i.log.Println(v) +} + +func (i *innerLogger) Errorf(format string, v ...interface{}) { + i.log.Printf(format, v) +} + +func (i *innerLogger) Panic(v ...interface{}) { + i.log.Panic(v) +} + +func (i *innerLogger) Panicf(format string, v ...interface{}) { + i.log.Panicf(format, v) +} diff --git a/message/message.go b/message/message.go new file mode 100644 index 0000000..4c5720e --- /dev/null +++ b/message/message.go @@ -0,0 +1,11 @@ +package message + +type Header struct { +} + +type Message struct { + Type byte // 消息类型 + ID uint64 // 消息ID + Header []byte // 消息头原始数据 + Payload []byte // 数据 +} diff --git a/net/options.go b/net/options.go deleted file mode 100644 index 4eb8f7e..0000000 --- a/net/options.go +++ /dev/null @@ -1,9 +0,0 @@ -package net - -type Option func(server *Server) - -func WithXXX() Option { - return func(server *Server) { - - } -} diff --git a/net/request.go b/net/request.go deleted file mode 100644 index 8743322..0000000 --- a/net/request.go +++ /dev/null @@ -1,37 +0,0 @@ -package net - -import ( - "git.noahlan.cn/northlan/nnet/interfaces" - "net" -) - -type Request struct { - session interfaces.ISession // Session - - server *Server // Server reference - conn net.Conn // low-level conn fd - status Status // 连接状态 -} - -func newRequest(server *Server, conn net.Conn) *Request { - r := &Request{ - server: server, - conn: conn, - status: StatusStart, - } - - r.session = newSession() - return r -} - -func (r *Request) Status() Status { - return r.status -} - -func (r *Request) ID() int64 { - return r.session.ID() -} - -func (r *Request) Session() interfaces.ISession { - return r.session -} diff --git a/net/server.go b/net/server.go deleted file mode 100644 index b79289d..0000000 --- a/net/server.go +++ /dev/null @@ -1,124 +0,0 @@ -package net - -import ( - "errors" - "fmt" - "github.com/gorilla/websocket" - "github.com/panjf2000/ants/v2" - "net" - "net/http" -) - -// Server TCP-Server -type Server struct { - // Name 服务端名称,默认为 n-net - Name string - // protocol 协议名 - // "tcp", "tcp4", "tcp6", "unix" or "unixpacket" - // 若只想开启IPv4, 使用tcp4即可 - protocol string - // address 服务地址 - // 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP - // 如果端口号不填或端口号为0,例如:"127.0.0.1:" 或 ":0",服务端将选择随机可用端口 - address string - // 一些其它的东西 .etc.. - handler *Handler - sessionMgr *SessionMgr - pool *ants.Pool -} - -func newServer(opts ...Option) *Server { - s := &Server{ - Name: "", - protocol: "", - address: "", - handler: nil, - sessionMgr: nil, - } - - for _, opt := range opts { - opt(s) - } - - s.pool, _ = ants.NewPool(0) - - return s -} - -func (s *Server) listenAndServe() { - listener, err := net.Listen(s.protocol, s.address) - if err != nil { - panic(err) - } - - // 监听成功,服务已启动 - // TODO log - defer listener.Close() - - go func() { - for { - conn, err := listener.Accept() - if err != nil { - if errors.Is(err, net.ErrClosed) { - // 服务端网络错误 - // TODO print log - return - } - // TODO log - continue - } - - // TODO 最大连接限制 - //if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn { - // conn.Close() - // continue - //} - - r := newRequest(s, conn) - - s.pool.Submit(func() { - s.handler.handle(r) - }) - } - }() -} - -func (s *Server) listenAndServeWS() { - upgrade := websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: nil, - EnableCompression: false, - } - http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrade.Upgrade(w, r, nil) - if err != nil { - // TODO upgrade failure, uri=r.requestURI err=err.Error() - return - } - // TODO s.handler.handleWS(conn) - }) - if err := http.ListenAndServe(s.address, nil); err != nil { - panic(err) - } -} - -func (s *Server) listenAndServeWSTLS() { - upgrade := websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: nil, - EnableCompression: false, - } - http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrade.Upgrade(w, r, nil) - if err != nil { - // TODO upgrade failure, uri=r.requestURI err=err.Error() - return - } - // TODO s.handler.handleWS(conn) - }) - if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil { - panic(err) - } -} diff --git a/net/session_mgr.go b/net/session_mgr.go deleted file mode 100644 index cb313fc..0000000 --- a/net/session_mgr.go +++ /dev/null @@ -1,40 +0,0 @@ -package net - -import ( - "git.noahlan.cn/northlan/nnet/interfaces" - "sync" -) - -type SessionMgr struct { - sync.RWMutex - sessions map[int64]interfaces.ISession -} - -func (m *SessionMgr) storeSession(s interfaces.ISession) { - m.Lock() - defer m.Unlock() - - m.sessions[s.ID()] = s -} - -func (m *SessionMgr) findSession(sid int64) interfaces.ISession { - m.RLock() - defer m.RUnlock() - - return m.sessions[sid] -} - -func (m *SessionMgr) findOrCreateSession(sid int64) interfaces.ISession { - m.RLock() - s, ok := m.sessions[sid] - m.RUnlock() - - if !ok { - s = newSession() - - m.Lock() - m.sessions[s.ID()] = s - m.Unlock() - } - return s -} diff --git a/interfaces/i_router.go b/nface/i_router.go similarity index 73% rename from interfaces/i_router.go rename to nface/i_router.go index 2b0e766..55449a6 100644 --- a/interfaces/i_router.go +++ b/nface/i_router.go @@ -1,4 +1,4 @@ -package interfaces +package nface // IRouter 路由接口 type IRouter interface { diff --git a/interfaces/i_session.go b/nface/i_session.go similarity index 97% rename from interfaces/i_session.go rename to nface/i_session.go index e7ab629..7a30606 100644 --- a/interfaces/i_session.go +++ b/nface/i_session.go @@ -1,4 +1,4 @@ -package interfaces +package nface // ISessionAttribute Session数据接口 type ISessionAttribute interface { diff --git a/net/handler.go b/nnet/handler.go similarity index 61% rename from net/handler.go rename to nnet/handler.go index ec2c7da..7250120 100644 --- a/net/handler.go +++ b/nnet/handler.go @@ -1,37 +1,29 @@ -package net +package nnet import ( "fmt" "git.noahlan.cn/northlan/nnet/component" - "github.com/panjf2000/ants/v2" + "git.noahlan.cn/northlan/nnet/pipeline" ) type Handler struct { + server *Server // Server 引用 + pipeline pipeline.Pipeline // 通道 + allServices map[string]*component.Service // 所有注册的Service allHandlers map[string]*component.Handler // 所有注册的Handler - - workerSize int64 // 业务工作Worker数量 - taskQueue []chan *Request // 工作协程的消息队列 } func NewHandler() *Handler { return &Handler{ allServices: make(map[string]*component.Service), allHandlers: make(map[string]*component.Handler), - // TODO 读取配置 - workerSize: 10, - taskQueue: make([]chan *Request, 10), } } func (h *Handler) register(comp component.Component, opts []component.Option) error { s := component.NewService(comp, opts) - p, _ := ants.NewPool() - p.Submit(func() { - - }) - if _, ok := h.allServices[s.Name]; ok { return fmt.Errorf("handler: service already defined: %s", s.Name) } @@ -52,15 +44,6 @@ func (h *Handler) register(comp component.Component, opts []component.Option) er return nil } -// DoWorker 将工作交给worker处理 -func (h *Handler) DoWorker(request *Request) { - // 根据sessionID平均分配worker处理 - workerId := request.Session.ID() % h.workerSize - fmt.Printf("sessionID %d to workerID %d\n", request.Session.ID(), workerId) - // 入队 - h.taskQueue[workerId] <- request -} - func (h *Handler) handle(request *Request) { } diff --git a/nnet/options.go b/nnet/options.go new file mode 100644 index 0000000..4829a33 --- /dev/null +++ b/nnet/options.go @@ -0,0 +1,57 @@ +package nnet + +import ( + "git.noahlan.cn/northlan/nnet/component" + "git.noahlan.cn/northlan/nnet/log" + "git.noahlan.cn/northlan/nnet/pipeline" + "time" +) + +type Option func(options *Options) +type WSOption func(opts *WSOptions) + +func WithLogger(logger log.Logger) Option { + return func(_ *Options) { + log.SetLogger(logger) + } +} + +func WithPipeline(pipeline pipeline.Pipeline) Option { + return func(options *Options) { + options.Pipeline = pipeline + } +} + +func WithComponents(components *component.Components) Option { + return func(options *Options) { + options.Components = components + } +} + +func WithHeartbeatInterval(d time.Duration) Option { + return func(options *Options) { + options.HeartbeatInterval = d + } +} + +func WithWebsocket(wsOpts ...WSOption) Option { + return func(options *Options) { + for _, opt := range wsOpts { + opt(&options.WS) + } + options.WS.IsWebsocket = true + } +} + +func WithWSPath(path string) WSOption { + return func(opts *WSOptions) { + opts.WebsocketPath = path + } +} + +func WithWSTLS(certificate, key string) WSOption { + return func(opts *WSOptions) { + opts.TLSCertificate = certificate + opts.TLSKey = key + } +} diff --git a/nnet/pool.go b/nnet/pool.go new file mode 100644 index 0000000..c1ef4ce --- /dev/null +++ b/nnet/pool.go @@ -0,0 +1,27 @@ +package nnet + +import "github.com/panjf2000/ants/v2" + +var pool *Pool + +type Pool struct { + connPool *ants.Pool + workerPool *ants.Pool +} + +func initPool(size int) { + p := &Pool{} + + p.connPool, _ = ants.NewPool(size, ants.WithNonblocking(true)) + p.workerPool, _ = ants.NewPool(size*2, ants.WithNonblocking(true)) + + pool = p +} + +func (p *Pool) SubmitConn(h func()) error { + return p.connPool.Submit(h) +} + +func (p *Pool) SubmitWorker(h func()) error { + return p.workerPool.Submit(h) +} diff --git a/nnet/request.go b/nnet/request.go new file mode 100644 index 0000000..63d15ef --- /dev/null +++ b/nnet/request.go @@ -0,0 +1,63 @@ +package nnet + +import ( + "git.noahlan.cn/northlan/nnet/nface" + "git.noahlan.cn/northlan/nnet/pipeline" + "git.noahlan.cn/northlan/nnet/session" + "github.com/gorilla/websocket" + "net" + "time" +) + +type Request struct { + session nface.ISession // Session + + conn net.Conn // low-level conn fd + status Status // 连接状态 + lastMid uint64 // 最近一次消息ID + lastHeartbeatAt int64 // 最近一次心跳时间 + + chDie chan struct{} // 停止通道 + chSend chan []byte // 消息发送通道 + + pipeline pipeline.Pipeline // 消息管道 +} + +func newRequest(conn net.Conn, pipeline pipeline.Pipeline) *Request { + r := &Request{ + conn: conn, + status: StatusStart, + + lastHeartbeatAt: time.Now().Unix(), + + chDie: make(chan struct{}), + chSend: make(chan []byte), + + pipeline: pipeline, + } + + // binding session + r.session = session.New() + return r +} + +func newRequestWS(conn *websocket.Conn, pipeline pipeline.Pipeline) *Request { + c, err := newWSConn(conn) + if err != nil { + // TODO panic ? + panic(err) + } + return newRequest(c, pipeline) +} + +func (r *Request) Status() Status { + return r.status +} + +func (r *Request) ID() int64 { + return r.session.ID() +} + +func (r *Request) Session() nface.ISession { + return r.session +} diff --git a/nnet/server.go b/nnet/server.go new file mode 100644 index 0000000..db3da24 --- /dev/null +++ b/nnet/server.go @@ -0,0 +1,201 @@ +package nnet + +import ( + "errors" + "fmt" + "git.noahlan.cn/northlan/nnet/component" + "git.noahlan.cn/northlan/nnet/log" + "git.noahlan.cn/northlan/nnet/pipeline" + "git.noahlan.cn/northlan/nnet/session" + "github.com/gorilla/websocket" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" +) + +type ( + Options struct { + Name string // 服务端名,默认为n-net + Pipeline pipeline.Pipeline // 消息管道 + RetryInterval time.Duration // 消息重试间隔时长 + Components *component.Components // 组件库 + + HeartbeatInterval time.Duration // 心跳间隔,0表示不进行心跳 + WS WSOptions // websocket + } + + WSOptions struct { + IsWebsocket bool // 是否为websocket服务端 + WebsocketPath string // ws地址(ws://127.0.0.1/WebsocketPath) + TLSCertificate string // TLS 证书地址 (websocket) + TLSKey string // TLS 证书key地址 (websocket) + } +) + +// Server TCP-Server +type Server struct { + // Options 参数列表 + Options + // protocol 协议名 + // "tcp", "tcp4", "tcp6", "unix" or "unixpacket" + // 若只想开启IPv4, 使用tcp4即可 + protocol string + // address 服务地址 + // 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP + // 如果端口号不填或端口号为0,例如:"127.0.0.1:" 或 ":0",服务端将选择随机可用端口 + address string + // handler 消息处理器 + handler *Handler + // sessionMgr session管理器 + sessionMgr *session.Manager +} + +func NewServer(protocol, addr string, opts ...Option) *Server { + options := Options{ + Components: &component.Components{}, + WS: WSOptions{}, + } + s := &Server{ + Options: options, + protocol: protocol, + address: addr, + } + + for _, opt := range opts { + opt(&s.Options) + } + + s.handler = NewHandler() + s.sessionMgr = session.NewManager() + + initPool(0) + + return s +} + +func (s *Server) Serve() { + components := s.Components.List() + for _, c := range components { + err := s.handler.register(c.Comp, c.Opts) + if err != nil { + // TODO Log and return + return + } + } + + // Initialize components + for _, c := range components { + c.Comp.OnInit() + } + + go func() { + if s.WS.IsWebsocket { + if len(s.WS.TLSCertificate) != 0 && len(s.WS.TLSKey) != 0 { + s.listenAndServeWSTLS() + } else { + s.listenAndServeWS() + } + } else { + s.listenAndServe() + } + }() + + sg := make(chan os.Signal) + signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM) + + select { + //case <-env.Die: + // log.Println("The app will shutdown in a few seconds") + case s := <-sg: + log.Infof("server got signal", s) + } + + log.Infof("server is stopping...") + s.Shutdown() + // TODO close +} + +func (s *Server) Shutdown() { + components := s.Components.List() + compLen := len(components) + for i := compLen - 1; i >= 0; i-- { + components[i].Comp.OnShutdown() + } +} + +func (s *Server) listenAndServe() { + listener, err := net.Listen(s.protocol, s.address) + if err != nil { + panic(err) + } + + // 监听成功,服务已启动 + // TODO log + defer listener.Close() + + go func() { + for { + conn, err := listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + log.Error("服务器网络错误", err) + return + } + log.Errorf("监听错误 %v", err) + continue + } + + err = pool.SubmitConn(func() { + r := newRequest(conn, s.Pipeline) + s.handler.handle(r) + }) + if err != nil { + // TODO Log + continue + } + } + }() +} + +func (s *Server) listenAndServeWS() { + upgrade := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: nil, + EnableCompression: false, + } + http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrade.Upgrade(w, r, nil) + if err != nil { + // TODO upgrade failure, uri=r.requestURI err=err.Error() + return + } + // TODO s.handler.handleWS(conn) + }) + if err := http.ListenAndServe(s.address, nil); err != nil { + panic(err) + } +} + +func (s *Server) listenAndServeWSTLS() { + upgrade := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: nil, + EnableCompression: false, + } + http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrade.Upgrade(w, r, nil) + if err != nil { + // TODO upgrade failure, uri=r.requestURI err=err.Error() + return + } + // TODO s.handler.handleWS(conn) + }) + if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil { + panic(err) + } +} diff --git a/net/vars.go b/nnet/vars.go similarity index 94% rename from net/vars.go rename to nnet/vars.go index a7231d9..b2046e2 100644 --- a/net/vars.go +++ b/nnet/vars.go @@ -1,4 +1,4 @@ -package net +package nnet type Status uint8 diff --git a/nnet/ws.go b/nnet/ws.go new file mode 100644 index 0000000..b3a3fec --- /dev/null +++ b/nnet/ws.go @@ -0,0 +1,114 @@ +package nnet + +import ( + "github.com/gorilla/websocket" + "io" + "net" + "time" +) + +// wsConn 封装 websocket.Conn 并实现所有 net.Conn 接口 +// 兼容所有使用 net.Conn 的方法 +type wsConn struct { + conn *websocket.Conn + typ int // message type + reader io.Reader +} + +// newWSConn 新建wsConn +func newWSConn(conn *websocket.Conn) (*wsConn, error) { + c := &wsConn{conn: conn} + + t, r, err := conn.NextReader() + if err != nil { + return nil, err + } + c.typ = t + c.reader = r + return c, nil +} + +// Read reads data from the connection. +// Read can be made to time out and return an Error with Timeout() == true +// after a fixed time limit; see SetDeadline and SetReadDeadline. +func (c *wsConn) Read(b []byte) (int, error) { + n, err := c.reader.Read(b) + if err != nil && err != io.EOF { + return n, err + } else if err == io.EOF { + _, r, err := c.conn.NextReader() + if err != nil { + return 0, err + } + c.reader = r + } + + return n, nil +} + +// Write writes data to the connection. +// Write can be made to time out and return an Error with Timeout() == true +// after a fixed time limit; see SetDeadline and SetWriteDeadline. +func (c *wsConn) Write(b []byte) (int, error) { + err := c.conn.WriteMessage(websocket.BinaryMessage, b) + if err != nil { + return 0, err + } + + return len(b), nil +} + +// Close closes the connection. +// Any blocked Read or Write operations will be unblocked and return errors. +func (c *wsConn) Close() error { + return c.conn.Close() +} + +// LocalAddr returns the local network address. +func (c *wsConn) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +// RemoteAddr returns the remote network address. +func (c *wsConn) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +// SetDeadline sets the read and write deadlines associated +// with the connection. It is equivalent to calling both +// SetReadDeadline and SetWriteDeadline. +// +// A deadline is an absolute time after which I/O operations +// fail with a timeout (see type Error) instead of +// blocking. The deadline applies to all future and pending +// I/O, not just the immediately following call to Read or +// Write. After a deadline has been exceeded, the connection +// can be refreshed by setting a deadline in the future. +// +// An idle timeout can be implemented by repeatedly extending +// the deadline after successful Read or Write calls. +// +// A zero value for t means I/O operations will not time out. +func (c *wsConn) SetDeadline(t time.Time) error { + if err := c.conn.SetReadDeadline(t); err != nil { + return err + } + + return c.conn.SetWriteDeadline(t) +} + +// SetReadDeadline sets the deadline for future Read calls +// and any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (c *wsConn) SetReadDeadline(t time.Time) error { + return c.conn.SetReadDeadline(t) +} + +// SetWriteDeadline sets the deadline for future Write calls +// and any currently-blocked Write call. +// Even if write times out, it may return n > 0, indicating that +// some data was successfully written. +// A zero value for t means Write will not time out. +func (c *wsConn) SetWriteDeadline(t time.Time) error { + return c.conn.SetWriteDeadline(t) +} diff --git a/packet/packet.go b/packet/packet.go new file mode 100644 index 0000000..8f2da8c --- /dev/null +++ b/packet/packet.go @@ -0,0 +1,48 @@ +package packet + +import ( + "fmt" +) + +// Type 数据帧类型,如:握手,心跳,数据等 +type Type byte + +const ( + // Default 默认,暂无意义 + Default Type = iota + + // Handshake 握手数据(服务端主动发起) + Handshake = 0x01 + + // HandshakeAck 握手回复(客户端回复) + HandshakeAck = 0x02 + + // Heartbeat 心跳(服务端发起) + Heartbeat = 0x03 + + // Data 数据传输 + Data = 0x04 + + // Kick 服务端主动断开连接 + Kick = 0x05 +) + +type Packet struct { + Type Type // 数据帧 类型 + + HeaderLen uint32 // 数据帧头 长度 + HeaderRaw []byte // 头原始数据 + + DataLen uint32 // 数据长度 + DataRaw []byte // 原始数据 +} + +func New() *Packet { + return &Packet{ + Type: Default, + } +} + +func (p *Packet) String() string { + return fmt.Sprintf("Type: %d, HeaderLen: %d, DataLen: %d, Header: %s, Data: %s", p.Type, p.HeaderLen, p.DataLen, string(p.HeaderRaw), string(p.DataRaw)) +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go new file mode 100644 index 0000000..72b880d --- /dev/null +++ b/pipeline/pipeline.go @@ -0,0 +1,83 @@ +package pipeline + +import ( + "sync" +) + +type ( + Func func(request *nnet.Request) error + + // Pipeline 消息管道 + Pipeline interface { + Outbound() Channel + Inbound() Channel + } + + pipeline struct { + outbound, inbound *pipelineChannel + } + + Channel interface { + PushFront(h Func) + PushBack(h Func) + Process(request *nnet.Request) error + } + + pipelineChannel struct { + mu sync.RWMutex + handlers []Func + } +) + +func New() Pipeline { + return &pipeline{ + outbound: &pipelineChannel{}, + inbound: &pipelineChannel{}, + } +} + +func (p *pipeline) Outbound() Channel { + return p.outbound +} + +func (p *pipeline) Inbound() Channel { + return p.inbound +} + +// PushFront 将func压入slice首位 +func (p *pipelineChannel) PushFront(h Func) { + p.mu.Lock() + defer p.mu.Unlock() + + handlers := make([]Func, len(p.handlers)+1) + handlers[0] = h + copy(handlers[1:], p.handlers) + + p.handlers = handlers +} + +// PushBack 将func压入slice末位 +func (p *pipelineChannel) PushBack(h Func) { + p.mu.Lock() + defer p.mu.Unlock() + + p.handlers = append(p.handlers, h) +} + +// Process 处理所有的pipeline方法 +func (p *pipelineChannel) Process(request *nnet.Request) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.handlers) < 1 { + return nil + } + + for _, handler := range p.handlers { + err := handler(request) + if err != nil { + return err + } + } + return nil +} diff --git a/net/session.go b/session/session.go similarity index 93% rename from net/session.go rename to session/session.go index 5b18bad..7719ba1 100644 --- a/net/session.go +++ b/session/session.go @@ -1,12 +1,11 @@ -package net +package session import ( - "git.noahlan.cn/northlan/nnet/interfaces" "sync" "sync/atomic" ) -var _ interfaces.ISession = (*Session)(nil) +var _ nface.ISession = (*Session)(nil) type Session struct { sync.RWMutex // 数据锁 @@ -16,7 +15,7 @@ type Session struct { data map[string]interface{} // session数据存储(内存) } -func newSession() interfaces.ISession { +func New() nface.ISession { return &Session{ id: sessionIDMgrInstance.SessionID(), uid: "", diff --git a/session/session_mgr.go b/session/session_mgr.go new file mode 100644 index 0000000..9475e43 --- /dev/null +++ b/session/session_mgr.go @@ -0,0 +1,47 @@ +package session + +import ( + "git.noahlan.cn/northlan/nnet/nface" + "sync" +) + +type Manager struct { + sync.RWMutex + sessions map[int64]nface.ISession +} + +func NewManager() *Manager { + return &Manager{ + RWMutex: sync.RWMutex{}, + sessions: make(map[int64]nface.ISession), + } +} + +func (m *Manager) storeSession(s nface.ISession) { + m.Lock() + defer m.Unlock() + + m.sessions[s.ID()] = s +} + +func (m *Manager) findSession(sid int64) nface.ISession { + m.RLock() + defer m.RUnlock() + + return m.sessions[sid] +} + +func (m *Manager) findOrCreateSession(sid int64) nface.ISession { + m.RLock() + s, ok := m.sessions[sid] + m.RUnlock() + + if !ok { + s = New() + + m.Lock() + m.sessions[s.ID()] = s + m.Unlock() + } + return s +}