From dca483dc324e679ccfd17f27d6cfbf4222240944 Mon Sep 17 00:00:00 2001 From: NorthLan <6995syu@163.com> Date: Fri, 28 Oct 2022 18:02:02 +0800 Subject: [PATCH] =?UTF-8?q?wip:=20=E5=88=9D=E6=AD=A5=E8=AE=BE=E8=AE=A1?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/base.go | 13 ++++ component/component.go | 9 +++ component/options.go | 26 ++++++++ component/service.go | 98 ++++++++++++++++++++++++++++++ component/util.go | 63 +++++++++++++++++++ go.mod | 8 ++- go.sum | 4 ++ interfaces/i_router.go | 5 ++ interfaces/i_session.go | 36 +++++------ net/handler.go | 66 ++++++++++++++++++++ net/options.go | 9 +++ net/request.go | 37 ++++++++++++ net/server.go | 124 ++++++++++++++++++++++++++++++++++++++ net/session.go | 130 ++++++++++++++++++++++++++++++++++++++++ net/session_mgr.go | 40 +++++++++++++ net/vars.go | 14 +++++ 16 files changed, 662 insertions(+), 20 deletions(-) create mode 100644 component/base.go create mode 100644 component/component.go create mode 100644 component/options.go create mode 100644 component/service.go create mode 100644 component/util.go create mode 100644 interfaces/i_router.go create mode 100644 net/handler.go create mode 100644 net/options.go create mode 100644 net/request.go create mode 100644 net/server.go create mode 100644 net/session.go create mode 100644 net/session_mgr.go create mode 100644 net/vars.go diff --git a/component/base.go b/component/base.go new file mode 100644 index 0000000..67460fe --- /dev/null +++ b/component/base.go @@ -0,0 +1,13 @@ +package component + +var _ Component = (*Base)(nil) + +// Base 空组件,实现组件时,优先嵌入此基类,然后根据基类方法进行重写 +// 可方便实现Component,无需每个Component都要实现两个方法 +type Base struct{} + +func (*Base) OnInit() { +} + +func (*Base) OnShutdown() { +} diff --git a/component/component.go b/component/component.go new file mode 100644 index 0000000..e85e8a2 --- /dev/null +++ b/component/component.go @@ -0,0 +1,9 @@ +package component + +// Component 组件接口 +type Component interface { + // OnInit 初始化组件时调用. + OnInit() + // OnShutdown 停止组件时调用. + OnShutdown() +} diff --git a/component/options.go b/component/options.go new file mode 100644 index 0000000..7021c14 --- /dev/null +++ b/component/options.go @@ -0,0 +1,26 @@ +package component + +type ( + options struct { + serviceName string // 自定义服务名 + methodNameFunc func(string) string // 自定义方法名钩子 + } + + Option func(options *options) +) + +// WithServiceName 覆盖默认生成的服务名称 +func WithServiceName(name string) Option { + return func(options *options) { + options.serviceName = name + } +} + +// WithMethodNameFunc 覆盖默认生成的方法名 +// 当前仅支持一些基本策略,如: strings.ToUpper/strings.ToLower +// 或自行根据方法名判断后进行重写 +func WithMethodNameFunc(fn func(string) string) Option { + return func(options *options) { + options.methodNameFunc = fn + } +} diff --git a/component/service.go b/component/service.go new file mode 100644 index 0000000..2e26295 --- /dev/null +++ b/component/service.go @@ -0,0 +1,98 @@ +package component + +import ( + "fmt" + "reflect" +) + +type ( + // Handler 消息处理器,当前仅支持单个自定义参数 + Handler struct { + Receiver reflect.Value // 方法接收者 + Method reflect.Method // 方法存根 + Type reflect.Type // 方法参数类型 + IsRawArg bool // 数据是否需要被序列化,true代表不需要 + } + + // Service 服务,绑定消息处理器用以处理发生的消息 + Service struct { + Name string // 服务名称 + Type reflect.Type // 接收者类型 + Receiver reflect.Value // 该服务下所有方法的接收者 + Handlers map[string]*Handler // 该服务下属的所有方法 + Options options // options + } +) + +func NewService(comp Component, opts []Option) *Service { + s := &Service{ + Type: reflect.TypeOf(comp), + Receiver: reflect.ValueOf(comp), + } + + // apply options + for _, opt := range opts { + opt(&s.Options) + } + + if name := s.Options.serviceName; name != "" { + s.Name = name + } else { + s.Name = reflect.Indirect(s.Receiver).Type().Name() + } + return s +} + +// suitableHandlerMethods 反射装填指定type-service的所有认定为handlerMethod的方法 +func (s *Service) suitableHandlerMethods(typ reflect.Type) map[string]*Handler { + methods := make(map[string]*Handler) + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + mt := method.Type + mn := method.Name + if isHandlerMethod(method) { + raw := false + if mt.In(2) == typeOfBytes { + raw = true + } + // rewrite handler name + if s.Options.methodNameFunc != nil { + mn = s.Options.methodNameFunc(mn) + } + methods[mn] = &Handler{Method: method, Type: mt.In(2), IsRawArg: raw} + } + } + return methods +} + +// ExtractHandler 反射提取满足以下条件的方法 +// - 两个显示入参 +// - 第一个是 *net.request +// - 另一个是 []byte 或者 任意指针类型 pointer +func (s *Service) ExtractHandler() error { + typeName := reflect.Indirect(s.Receiver).Type().Name() + if typeName == "" { + return fmt.Errorf("no service name for type %s", s.Type.String()) + } + if !isExported(typeName) { + return fmt.Errorf("type %s is not exported", typeName) + } + + // suitable handlers + s.Handlers = s.suitableHandlerMethods(s.Type) + + if len(s.Handlers) == 0 { + method := s.suitableHandlerMethods(reflect.PtrTo(s.Type)) + if len(method) == 0 { + return fmt.Errorf("type %s has no exported methods of suitable type (hint: pass a pointer to value of that type)", s.Name) + } else { + return fmt.Errorf("type %s has no exported methods of suitable type") + } + } + + for _, handler := range s.Handlers { + handler.Receiver = s.Receiver + } + + return nil +} diff --git a/component/util.go b/component/util.go new file mode 100644 index 0000000..c899a1b --- /dev/null +++ b/component/util.go @@ -0,0 +1,63 @@ +package component + +import ( + "git.noahlan.cn/northlan/nnet/net" + "reflect" + "unicode" + "unicode/utf8" +) + +var ( + typeOfError = reflect.TypeOf((*error)(nil)).Elem() + typeOfBytes = reflect.TypeOf(([]byte)(nil)) + typeOfRequest = reflect.TypeOf(net.Request{}) +) + +func isExported(name string) bool { + w, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(w) +} + +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +func isHandlerMethod(method reflect.Method) bool { + mt := method.Type + // 必须是可导出的 + if isExportedOrBuiltinType(mt) { + return false + } + + // 必须具有3个入参: receiver, *Request, []byte/pointer + // receiver指代 func (*receiver) xxx() 的receiver部分 + if mt.NumIn() != 3 { + return false + } + + // 至少要有一个出参 且是error + if mt.NumOut() < 1 { + return false + } + + // 第一个显式入参必须是*Request + if t1 := mt.In(1); t1.Kind() != reflect.Ptr || t1 != typeOfRequest { + return false + } + + // 第二个显式入参必须是 []byte 或者 任意pointer + if t2 := mt.In(2); t2.Kind() != reflect.Ptr || t2 != typeOfBytes { + return false + } + + // 最后一个出参必须是error + if o1 := mt.Out(mt.NumOut() - 1); o1 != typeOfError { + return false + } + return true +} diff --git a/go.mod b/go.mod index c5361ad..7f88823 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,9 @@ module git.noahlan.cn/northlan/nnet -go 1.18 +go 1.19 -require google.golang.org/protobuf v1.28.1 // indirect +require ( + github.com/gorilla/websocket v1.5.0 // indirect + github.com/panjf2000/ants/v2 v2.6.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/go.sum b/go.sum index c4b2520..7e0a492 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/panjf2000/ants/v2 v2.6.0 h1:xOSpw42m+BMiJ2I33we7h6fYzG4DAlpE1xyI7VS2gxU= +github.com/panjf2000/ants/v2 v2.6.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= diff --git a/interfaces/i_router.go b/interfaces/i_router.go new file mode 100644 index 0000000..2b0e766 --- /dev/null +++ b/interfaces/i_router.go @@ -0,0 +1,5 @@ +package interfaces + +// IRouter 路由接口 +type IRouter interface { +} diff --git a/interfaces/i_session.go b/interfaces/i_session.go index c65a7a7..e7ab629 100644 --- a/interfaces/i_session.go +++ b/interfaces/i_session.go @@ -1,21 +1,21 @@ package interfaces -// ISessionData Session数据接口 -type ISessionData interface { - // Remove 通过key移除数据 - Remove(key string) - // Set 设置数据 - Set(key string, value interface{}) +// ISessionAttribute Session数据接口 +type ISessionAttribute interface { + // Attribute 获取指定key的值 + Attribute(key string) interface{} + // Keys 获取所有Key + Keys() []string // Exists key是否存在 Exists(key string) bool - // Value 获取指定key的值 - Value(key string) interface{} - // Data 获取所有数据 - Data() map[string]interface{} - // SetData 保存所有数据 - SetData(data map[string]interface{}) - // Clear 清理数据 - Clear() + // Attributes 获取所有数据 + Attributes() map[string]interface{} + // RemoveAttribute 通过key移除数据 + RemoveAttribute(key string) + // SetAttribute 设置数据 + SetAttribute(key string, value interface{}) + // Invalidate 使当前Session无效,并且解除所有与之绑定的对象 + Invalidate() } // ISession session接口 @@ -23,9 +23,9 @@ type ISession interface { // ID Session ID ID() int64 // UID 用户自行绑定UID,默认与SessionID一致 - UID() interface{} + UID() string // Bind 绑定用户ID - Bind(uid interface{}) - // ISessionData Session数据抽象方法 - ISessionData + Bind(uid string) + // ISessionAttribute Session数据抽象方法 + ISessionAttribute } diff --git a/net/handler.go b/net/handler.go new file mode 100644 index 0000000..ec2c7da --- /dev/null +++ b/net/handler.go @@ -0,0 +1,66 @@ +package net + +import ( + "fmt" + "git.noahlan.cn/northlan/nnet/component" + "github.com/panjf2000/ants/v2" +) + +type Handler struct { + allServices map[string]*component.Service // 所有注册的Service + allHandlers map[string]*component.Handler // 所有注册的Handler + + workerSize int64 // 业务工作Worker数量 + taskQueue []chan *Request // 工作协程的消息队列 +} + +func NewHandler() *Handler { + return &Handler{ + allServices: make(map[string]*component.Service), + allHandlers: make(map[string]*component.Handler), + // TODO 读取配置 + workerSize: 10, + taskQueue: make([]chan *Request, 10), + } +} + +func (h *Handler) register(comp component.Component, opts []component.Option) error { + s := component.NewService(comp, opts) + + p, _ := ants.NewPool() + p.Submit(func() { + + }) + + if _, ok := h.allServices[s.Name]; ok { + return fmt.Errorf("handler: service already defined: %s", s.Name) + } + + if err := s.ExtractHandler(); err != nil { + return err + } + + h.allServices[s.Name] = s + + // 拷贝一份所有handlers + for name, handler := range s.Handlers { + handleName := fmt.Sprintf("%s.%s", s.Name, name) + // TODO print log + h.allHandlers[handleName] = handler + } + + return nil +} + +// DoWorker 将工作交给worker处理 +func (h *Handler) DoWorker(request *Request) { + // 根据sessionID平均分配worker处理 + workerId := request.Session.ID() % h.workerSize + fmt.Printf("sessionID %d to workerID %d\n", request.Session.ID(), workerId) + // 入队 + h.taskQueue[workerId] <- request +} + +func (h *Handler) handle(request *Request) { + +} diff --git a/net/options.go b/net/options.go new file mode 100644 index 0000000..4eb8f7e --- /dev/null +++ b/net/options.go @@ -0,0 +1,9 @@ +package net + +type Option func(server *Server) + +func WithXXX() Option { + return func(server *Server) { + + } +} diff --git a/net/request.go b/net/request.go new file mode 100644 index 0000000..8743322 --- /dev/null +++ b/net/request.go @@ -0,0 +1,37 @@ +package net + +import ( + "git.noahlan.cn/northlan/nnet/interfaces" + "net" +) + +type Request struct { + session interfaces.ISession // Session + + server *Server // Server reference + conn net.Conn // low-level conn fd + status Status // 连接状态 +} + +func newRequest(server *Server, conn net.Conn) *Request { + r := &Request{ + server: server, + conn: conn, + status: StatusStart, + } + + r.session = newSession() + return r +} + +func (r *Request) Status() Status { + return r.status +} + +func (r *Request) ID() int64 { + return r.session.ID() +} + +func (r *Request) Session() interfaces.ISession { + return r.session +} diff --git a/net/server.go b/net/server.go new file mode 100644 index 0000000..b79289d --- /dev/null +++ b/net/server.go @@ -0,0 +1,124 @@ +package net + +import ( + "errors" + "fmt" + "github.com/gorilla/websocket" + "github.com/panjf2000/ants/v2" + "net" + "net/http" +) + +// Server TCP-Server +type Server struct { + // Name 服务端名称,默认为 n-net + Name string + // protocol 协议名 + // "tcp", "tcp4", "tcp6", "unix" or "unixpacket" + // 若只想开启IPv4, 使用tcp4即可 + protocol string + // address 服务地址 + // 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP + // 如果端口号不填或端口号为0,例如:"127.0.0.1:" 或 ":0",服务端将选择随机可用端口 + address string + // 一些其它的东西 .etc.. + handler *Handler + sessionMgr *SessionMgr + pool *ants.Pool +} + +func newServer(opts ...Option) *Server { + s := &Server{ + Name: "", + protocol: "", + address: "", + handler: nil, + sessionMgr: nil, + } + + for _, opt := range opts { + opt(s) + } + + s.pool, _ = ants.NewPool(0) + + return s +} + +func (s *Server) listenAndServe() { + listener, err := net.Listen(s.protocol, s.address) + if err != nil { + panic(err) + } + + // 监听成功,服务已启动 + // TODO log + defer listener.Close() + + go func() { + for { + conn, err := listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + // 服务端网络错误 + // TODO print log + return + } + // TODO log + continue + } + + // TODO 最大连接限制 + //if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn { + // conn.Close() + // continue + //} + + r := newRequest(s, conn) + + s.pool.Submit(func() { + s.handler.handle(r) + }) + } + }() +} + +func (s *Server) listenAndServeWS() { + upgrade := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: nil, + EnableCompression: false, + } + http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrade.Upgrade(w, r, nil) + if err != nil { + // TODO upgrade failure, uri=r.requestURI err=err.Error() + return + } + // TODO s.handler.handleWS(conn) + }) + if err := http.ListenAndServe(s.address, nil); err != nil { + panic(err) + } +} + +func (s *Server) listenAndServeWSTLS() { + upgrade := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: nil, + EnableCompression: false, + } + http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrade.Upgrade(w, r, nil) + if err != nil { + // TODO upgrade failure, uri=r.requestURI err=err.Error() + return + } + // TODO s.handler.handleWS(conn) + }) + if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil { + panic(err) + } +} diff --git a/net/session.go b/net/session.go new file mode 100644 index 0000000..5b18bad --- /dev/null +++ b/net/session.go @@ -0,0 +1,130 @@ +package net + +import ( + "git.noahlan.cn/northlan/nnet/interfaces" + "sync" + "sync/atomic" +) + +var _ interfaces.ISession = (*Session)(nil) + +type Session struct { + sync.RWMutex // 数据锁 + + id int64 // Session全局唯一ID + uid string // 用户ID,不绑定的情况下与sid一致 + data map[string]interface{} // session数据存储(内存) +} + +func newSession() interfaces.ISession { + return &Session{ + id: sessionIDMgrInstance.SessionID(), + uid: "", + data: make(map[string]interface{}), + } +} + +func (s *Session) ID() int64 { + return s.id +} + +func (s *Session) UID() string { + return s.uid +} + +func (s *Session) Bind(uid string) { + s.uid = uid +} + +func (s *Session) Attribute(key string) interface{} { + s.RLock() + defer s.RUnlock() + + return s.data[key] +} + +func (s *Session) Keys() []string { + s.RLock() + defer s.RUnlock() + + keys := make([]string, 0, len(s.data)) + for k := range s.data { + keys = append(keys, k) + } + return keys +} + +func (s *Session) Exists(key string) bool { + s.RLock() + defer s.RUnlock() + + _, has := s.data[key] + return has +} + +func (s *Session) Attributes() map[string]interface{} { + s.RLock() + defer s.RUnlock() + + return s.data +} + +func (s *Session) RemoveAttribute(key string) { + s.Lock() + defer s.Unlock() + + delete(s.data, key) +} + +func (s *Session) SetAttribute(key string, value interface{}) { + s.Lock() + defer s.Unlock() + + s.data[key] = value +} + +func (s *Session) Invalidate() { + s.Lock() + defer s.Unlock() + + s.id = 0 + s.uid = "" + s.data = make(map[string]interface{}) +} + +var sessionIDMgrInstance = newSessionIDMgr() + +type sessionIDMgr struct { + count int64 + sid int64 +} + +func newSessionIDMgr() *sessionIDMgr { + return &sessionIDMgr{} +} + +// Increment the connection count +func (c *sessionIDMgr) Increment() { + atomic.AddInt64(&c.count, 1) +} + +// Decrement the connection count +func (c *sessionIDMgr) Decrement() { + atomic.AddInt64(&c.count, -1) +} + +// Count returns the connection numbers in current +func (c *sessionIDMgr) Count() int64 { + return atomic.LoadInt64(&c.count) +} + +// Reset the connection service status +func (c *sessionIDMgr) Reset() { + atomic.StoreInt64(&c.count, 0) + atomic.StoreInt64(&c.sid, 0) +} + +// SessionID returns the session id +func (c *sessionIDMgr) SessionID() int64 { + return atomic.AddInt64(&c.sid, 1) +} diff --git a/net/session_mgr.go b/net/session_mgr.go new file mode 100644 index 0000000..cb313fc --- /dev/null +++ b/net/session_mgr.go @@ -0,0 +1,40 @@ +package net + +import ( + "git.noahlan.cn/northlan/nnet/interfaces" + "sync" +) + +type SessionMgr struct { + sync.RWMutex + sessions map[int64]interfaces.ISession +} + +func (m *SessionMgr) storeSession(s interfaces.ISession) { + m.Lock() + defer m.Unlock() + + m.sessions[s.ID()] = s +} + +func (m *SessionMgr) findSession(sid int64) interfaces.ISession { + m.RLock() + defer m.RUnlock() + + return m.sessions[sid] +} + +func (m *SessionMgr) findOrCreateSession(sid int64) interfaces.ISession { + m.RLock() + s, ok := m.sessions[sid] + m.RUnlock() + + if !ok { + s = newSession() + + m.Lock() + m.sessions[s.ID()] = s + m.Unlock() + } + return s +} diff --git a/net/vars.go b/net/vars.go new file mode 100644 index 0000000..a7231d9 --- /dev/null +++ b/net/vars.go @@ -0,0 +1,14 @@ +package net + +type Status uint8 + +const ( + // StatusStart 开始阶段 + StatusStart Status = iota + 1 + // StatusPrepare 准备阶段 + StatusPrepare + // StatusWorking 工作阶段 + StatusWorking + // StatusClosed 连接关闭 + StatusClosed +)