wip: 初步设计。

main
NorthLan 2 years ago
parent e16ec86acd
commit dca483dc32

@ -0,0 +1,13 @@
package component
var _ Component = (*Base)(nil)
// Base 空组件,实现组件时,优先嵌入此基类,然后根据基类方法进行重写
// 可方便实现Component无需每个Component都要实现两个方法
type Base struct{}
func (*Base) OnInit() {
}
func (*Base) OnShutdown() {
}

@ -0,0 +1,9 @@
package component
// Component 组件接口
type Component interface {
// OnInit 初始化组件时调用.
OnInit()
// OnShutdown 停止组件时调用.
OnShutdown()
}

@ -0,0 +1,26 @@
package component
type (
options struct {
serviceName string // 自定义服务名
methodNameFunc func(string) string // 自定义方法名钩子
}
Option func(options *options)
)
// WithServiceName 覆盖默认生成的服务名称
func WithServiceName(name string) Option {
return func(options *options) {
options.serviceName = name
}
}
// WithMethodNameFunc 覆盖默认生成的方法名
// 当前仅支持一些基本策略,如: strings.ToUpper/strings.ToLower
// 或自行根据方法名判断后进行重写
func WithMethodNameFunc(fn func(string) string) Option {
return func(options *options) {
options.methodNameFunc = fn
}
}

@ -0,0 +1,98 @@
package component
import (
"fmt"
"reflect"
)
type (
// Handler 消息处理器,当前仅支持单个自定义参数
Handler struct {
Receiver reflect.Value // 方法接收者
Method reflect.Method // 方法存根
Type reflect.Type // 方法参数类型
IsRawArg bool // 数据是否需要被序列化true代表不需要
}
// Service 服务,绑定消息处理器用以处理发生的消息
Service struct {
Name string // 服务名称
Type reflect.Type // 接收者类型
Receiver reflect.Value // 该服务下所有方法的接收者
Handlers map[string]*Handler // 该服务下属的所有方法
Options options // options
}
)
func NewService(comp Component, opts []Option) *Service {
s := &Service{
Type: reflect.TypeOf(comp),
Receiver: reflect.ValueOf(comp),
}
// apply options
for _, opt := range opts {
opt(&s.Options)
}
if name := s.Options.serviceName; name != "" {
s.Name = name
} else {
s.Name = reflect.Indirect(s.Receiver).Type().Name()
}
return s
}
// suitableHandlerMethods 反射装填指定type-service的所有认定为handlerMethod的方法
func (s *Service) suitableHandlerMethods(typ reflect.Type) map[string]*Handler {
methods := make(map[string]*Handler)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mt := method.Type
mn := method.Name
if isHandlerMethod(method) {
raw := false
if mt.In(2) == typeOfBytes {
raw = true
}
// rewrite handler name
if s.Options.methodNameFunc != nil {
mn = s.Options.methodNameFunc(mn)
}
methods[mn] = &Handler{Method: method, Type: mt.In(2), IsRawArg: raw}
}
}
return methods
}
// ExtractHandler 反射提取满足以下条件的方法
// - 两个显示入参
// - 第一个是 *net.request
// - 另一个是 []byte 或者 任意指针类型 pointer
func (s *Service) ExtractHandler() error {
typeName := reflect.Indirect(s.Receiver).Type().Name()
if typeName == "" {
return fmt.Errorf("no service name for type %s", s.Type.String())
}
if !isExported(typeName) {
return fmt.Errorf("type %s is not exported", typeName)
}
// suitable handlers
s.Handlers = s.suitableHandlerMethods(s.Type)
if len(s.Handlers) == 0 {
method := s.suitableHandlerMethods(reflect.PtrTo(s.Type))
if len(method) == 0 {
return fmt.Errorf("type %s has no exported methods of suitable type (hint: pass a pointer to value of that type)", s.Name)
} else {
return fmt.Errorf("type %s has no exported methods of suitable type")
}
}
for _, handler := range s.Handlers {
handler.Receiver = s.Receiver
}
return nil
}

@ -0,0 +1,63 @@
package component
import (
"git.noahlan.cn/northlan/nnet/net"
"reflect"
"unicode"
"unicode/utf8"
)
var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfBytes = reflect.TypeOf(([]byte)(nil))
typeOfRequest = reflect.TypeOf(net.Request{})
)
func isExported(name string) bool {
w, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(w)
}
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
func isHandlerMethod(method reflect.Method) bool {
mt := method.Type
// 必须是可导出的
if isExportedOrBuiltinType(mt) {
return false
}
// 必须具有3个入参: receiver, *Request, []byte/pointer
// receiver指代 func (*receiver) xxx() 的receiver部分
if mt.NumIn() != 3 {
return false
}
// 至少要有一个出参 且是error
if mt.NumOut() < 1 {
return false
}
// 第一个显式入参必须是*Request
if t1 := mt.In(1); t1.Kind() != reflect.Ptr || t1 != typeOfRequest {
return false
}
// 第二个显式入参必须是 []byte 或者 任意pointer
if t2 := mt.In(2); t2.Kind() != reflect.Ptr || t2 != typeOfBytes {
return false
}
// 最后一个出参必须是error
if o1 := mt.Out(mt.NumOut() - 1); o1 != typeOfError {
return false
}
return true
}

@ -1,5 +1,9 @@
module git.noahlan.cn/northlan/nnet
go 1.18
go 1.19
require google.golang.org/protobuf v1.28.1 // indirect
require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/panjf2000/ants/v2 v2.6.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)

@ -1,5 +1,9 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/panjf2000/ants/v2 v2.6.0 h1:xOSpw42m+BMiJ2I33we7h6fYzG4DAlpE1xyI7VS2gxU=
github.com/panjf2000/ants/v2 v2.6.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=

@ -0,0 +1,5 @@
package interfaces
// IRouter 路由接口
type IRouter interface {
}

@ -1,21 +1,21 @@
package interfaces
// ISessionData Session数据接口
type ISessionData interface {
// Remove 通过key移除数据
Remove(key string)
// Set 设置数据
Set(key string, value interface{})
// ISessionAttribute Session数据接口
type ISessionAttribute interface {
// Attribute 获取指定key的值
Attribute(key string) interface{}
// Keys 获取所有Key
Keys() []string
// Exists key是否存在
Exists(key string) bool
// Value 获取指定key的值
Value(key string) interface{}
// Data 获取所有数据
Data() map[string]interface{}
// SetData 保存所有数据
SetData(data map[string]interface{})
// Clear 清理数据
Clear()
// Attributes 获取所有数据
Attributes() map[string]interface{}
// RemoveAttribute 通过key移除数据
RemoveAttribute(key string)
// SetAttribute 设置数据
SetAttribute(key string, value interface{})
// Invalidate 使当前Session无效并且解除所有与之绑定的对象
Invalidate()
}
// ISession session接口
@ -23,9 +23,9 @@ type ISession interface {
// ID Session ID
ID() int64
// UID 用户自行绑定UID,默认与SessionID一致
UID() interface{}
UID() string
// Bind 绑定用户ID
Bind(uid interface{})
// ISessionData Session数据抽象方法
ISessionData
Bind(uid string)
// ISessionAttribute Session数据抽象方法
ISessionAttribute
}

@ -0,0 +1,66 @@
package net
import (
"fmt"
"git.noahlan.cn/northlan/nnet/component"
"github.com/panjf2000/ants/v2"
)
type Handler struct {
allServices map[string]*component.Service // 所有注册的Service
allHandlers map[string]*component.Handler // 所有注册的Handler
workerSize int64 // 业务工作Worker数量
taskQueue []chan *Request // 工作协程的消息队列
}
func NewHandler() *Handler {
return &Handler{
allServices: make(map[string]*component.Service),
allHandlers: make(map[string]*component.Handler),
// TODO 读取配置
workerSize: 10,
taskQueue: make([]chan *Request, 10),
}
}
func (h *Handler) register(comp component.Component, opts []component.Option) error {
s := component.NewService(comp, opts)
p, _ := ants.NewPool()
p.Submit(func() {
})
if _, ok := h.allServices[s.Name]; ok {
return fmt.Errorf("handler: service already defined: %s", s.Name)
}
if err := s.ExtractHandler(); err != nil {
return err
}
h.allServices[s.Name] = s
// 拷贝一份所有handlers
for name, handler := range s.Handlers {
handleName := fmt.Sprintf("%s.%s", s.Name, name)
// TODO print log
h.allHandlers[handleName] = handler
}
return nil
}
// DoWorker 将工作交给worker处理
func (h *Handler) DoWorker(request *Request) {
// 根据sessionID平均分配worker处理
workerId := request.Session.ID() % h.workerSize
fmt.Printf("sessionID %d to workerID %d\n", request.Session.ID(), workerId)
// 入队
h.taskQueue[workerId] <- request
}
func (h *Handler) handle(request *Request) {
}

@ -0,0 +1,9 @@
package net
type Option func(server *Server)
func WithXXX() Option {
return func(server *Server) {
}
}

@ -0,0 +1,37 @@
package net
import (
"git.noahlan.cn/northlan/nnet/interfaces"
"net"
)
type Request struct {
session interfaces.ISession // Session
server *Server // Server reference
conn net.Conn // low-level conn fd
status Status // 连接状态
}
func newRequest(server *Server, conn net.Conn) *Request {
r := &Request{
server: server,
conn: conn,
status: StatusStart,
}
r.session = newSession()
return r
}
func (r *Request) Status() Status {
return r.status
}
func (r *Request) ID() int64 {
return r.session.ID()
}
func (r *Request) Session() interfaces.ISession {
return r.session
}

@ -0,0 +1,124 @@
package net
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/panjf2000/ants/v2"
"net"
"net/http"
)
// Server TCP-Server
type Server struct {
// Name 服务端名称,默认为 n-net
Name string
// protocol 协议名
// "tcp", "tcp4", "tcp6", "unix" or "unixpacket"
// 若只想开启IPv4, 使用tcp4即可
protocol string
// address 服务地址
// 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP
// 如果端口号不填或端口号为0例如"127.0.0.1:" 或 ":0",服务端将选择随机可用端口
address string
// 一些其它的东西 .etc..
handler *Handler
sessionMgr *SessionMgr
pool *ants.Pool
}
func newServer(opts ...Option) *Server {
s := &Server{
Name: "",
protocol: "",
address: "",
handler: nil,
sessionMgr: nil,
}
for _, opt := range opts {
opt(s)
}
s.pool, _ = ants.NewPool(0)
return s
}
func (s *Server) listenAndServe() {
listener, err := net.Listen(s.protocol, s.address)
if err != nil {
panic(err)
}
// 监听成功,服务已启动
// TODO log
defer listener.Close()
go func() {
for {
conn, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
// 服务端网络错误
// TODO print log
return
}
// TODO log
continue
}
// TODO 最大连接限制
//if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
// conn.Close()
// continue
//}
r := newRequest(s, conn)
s.pool.Submit(func() {
s.handler.handle(r)
})
}
}()
}
func (s *Server) listenAndServeWS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: nil,
EnableCompression: false,
}
http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
// TODO upgrade failure, uri=r.requestURI err=err.Error()
return
}
// TODO s.handler.handleWS(conn)
})
if err := http.ListenAndServe(s.address, nil); err != nil {
panic(err)
}
}
func (s *Server) listenAndServeWSTLS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: nil,
EnableCompression: false,
}
http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
// TODO upgrade failure, uri=r.requestURI err=err.Error()
return
}
// TODO s.handler.handleWS(conn)
})
if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil {
panic(err)
}
}

@ -0,0 +1,130 @@
package net
import (
"git.noahlan.cn/northlan/nnet/interfaces"
"sync"
"sync/atomic"
)
var _ interfaces.ISession = (*Session)(nil)
type Session struct {
sync.RWMutex // 数据锁
id int64 // Session全局唯一ID
uid string // 用户ID不绑定的情况下与sid一致
data map[string]interface{} // session数据存储内存
}
func newSession() interfaces.ISession {
return &Session{
id: sessionIDMgrInstance.SessionID(),
uid: "",
data: make(map[string]interface{}),
}
}
func (s *Session) ID() int64 {
return s.id
}
func (s *Session) UID() string {
return s.uid
}
func (s *Session) Bind(uid string) {
s.uid = uid
}
func (s *Session) Attribute(key string) interface{} {
s.RLock()
defer s.RUnlock()
return s.data[key]
}
func (s *Session) Keys() []string {
s.RLock()
defer s.RUnlock()
keys := make([]string, 0, len(s.data))
for k := range s.data {
keys = append(keys, k)
}
return keys
}
func (s *Session) Exists(key string) bool {
s.RLock()
defer s.RUnlock()
_, has := s.data[key]
return has
}
func (s *Session) Attributes() map[string]interface{} {
s.RLock()
defer s.RUnlock()
return s.data
}
func (s *Session) RemoveAttribute(key string) {
s.Lock()
defer s.Unlock()
delete(s.data, key)
}
func (s *Session) SetAttribute(key string, value interface{}) {
s.Lock()
defer s.Unlock()
s.data[key] = value
}
func (s *Session) Invalidate() {
s.Lock()
defer s.Unlock()
s.id = 0
s.uid = ""
s.data = make(map[string]interface{})
}
var sessionIDMgrInstance = newSessionIDMgr()
type sessionIDMgr struct {
count int64
sid int64
}
func newSessionIDMgr() *sessionIDMgr {
return &sessionIDMgr{}
}
// Increment the connection count
func (c *sessionIDMgr) Increment() {
atomic.AddInt64(&c.count, 1)
}
// Decrement the connection count
func (c *sessionIDMgr) Decrement() {
atomic.AddInt64(&c.count, -1)
}
// Count returns the connection numbers in current
func (c *sessionIDMgr) Count() int64 {
return atomic.LoadInt64(&c.count)
}
// Reset the connection service status
func (c *sessionIDMgr) Reset() {
atomic.StoreInt64(&c.count, 0)
atomic.StoreInt64(&c.sid, 0)
}
// SessionID returns the session id
func (c *sessionIDMgr) SessionID() int64 {
return atomic.AddInt64(&c.sid, 1)
}

@ -0,0 +1,40 @@
package net
import (
"git.noahlan.cn/northlan/nnet/interfaces"
"sync"
)
type SessionMgr struct {
sync.RWMutex
sessions map[int64]interfaces.ISession
}
func (m *SessionMgr) storeSession(s interfaces.ISession) {
m.Lock()
defer m.Unlock()
m.sessions[s.ID()] = s
}
func (m *SessionMgr) findSession(sid int64) interfaces.ISession {
m.RLock()
defer m.RUnlock()
return m.sessions[sid]
}
func (m *SessionMgr) findOrCreateSession(sid int64) interfaces.ISession {
m.RLock()
s, ok := m.sessions[sid]
m.RUnlock()
if !ok {
s = newSession()
m.Lock()
m.sessions[s.ID()] = s
m.Unlock()
}
return s
}

@ -0,0 +1,14 @@
package net
type Status uint8
const (
// StatusStart 开始阶段
StatusStart Status = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
Loading…
Cancel
Save