diff --git a/message/binary_serializer.go b/message/binary_serializer.go new file mode 100644 index 0000000..5d30d74 --- /dev/null +++ b/message/binary_serializer.go @@ -0,0 +1,18 @@ +package message + +type BinarySerializer struct { +} + +func NewBinarySerializer() Serializer { + return &BinarySerializer{} +} + +func (b *BinarySerializer) Marshal(i interface{}) ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (b *BinarySerializer) Unmarshal(bytes []byte, i interface{}) error { + //TODO implement me + panic("implement me") +} diff --git a/message/serializer.go b/message/serializer.go new file mode 100644 index 0000000..7104335 --- /dev/null +++ b/message/serializer.go @@ -0,0 +1,19 @@ +package message + +type ( + // Marshaler 序列化 + Marshaler interface { + Marshal(interface{}) ([]byte, error) + } + + // Unmarshaler 反序列化 + Unmarshaler interface { + Unmarshal([]byte, interface{}) error + } + + // Serializer 消息 序列化/反序列化,仅针对payload + Serializer interface { + Marshaler + Unmarshaler + } +) diff --git a/nface/i_connection.go b/nface/i_connection.go new file mode 100644 index 0000000..358df0f --- /dev/null +++ b/nface/i_connection.go @@ -0,0 +1,29 @@ +package nface + +import "net" + +const ( + // StatusStart 开始阶段 + StatusStart int32 = iota + 1 + // StatusPrepare 准备阶段 + StatusPrepare + // StatusWorking 工作阶段 + StatusWorking + // StatusClosed 连接关闭 + StatusClosed +) + +type IConnection interface { + // Status 获取连接状态 + Status() int32 + // SetStatus 设置连接状态 + SetStatus(s int32) + // Conn 获取底层网络连接 + Conn() net.Conn + // ID 获取连接ID + ID() int64 + // Session 获取当前连接绑定的Session + Session() ISession + // Close 关闭连接 + Close() error +} diff --git a/nnet/connection.go b/nnet/connection.go new file mode 100644 index 0000000..35bb6cd --- /dev/null +++ b/nnet/connection.go @@ -0,0 +1,164 @@ +package nnet + +import ( + "errors" + "git.noahlan.cn/northlan/nnet/log" + "git.noahlan.cn/northlan/nnet/nface" + "git.noahlan.cn/northlan/nnet/packet" + "git.noahlan.cn/northlan/nnet/pipeline" + "git.noahlan.cn/northlan/nnet/session" + "github.com/gorilla/websocket" + "net" + "sync/atomic" + "time" +) + +var ( + ErrCloseClosedSession = errors.New("close closed session") +) + +type ( + Connection struct { + session nface.ISession // Session + server *Server // Server 引用 + + conn net.Conn // low-level conn fd + status int32 // 连接状态 + lastMid uint64 // 最近一次消息ID + lastHeartbeatAt int64 // 最近一次心跳时间 + + chDie chan struct{} // 停止通道 + chSend chan []byte // 消息发送通道 + + pipeline pipeline.Pipeline // 消息管道 + } + + pendingMessage struct { + typ byte // message type + route string // message route + mid uint64 // response message id + payload interface{} // payload + } +) + +func newConnection(server *Server, conn net.Conn, pipeline pipeline.Pipeline) nface.IConnection { + r := &Connection{ + conn: conn, + server: server, + status: nface.StatusStart, + + lastHeartbeatAt: time.Now().Unix(), + + chDie: make(chan struct{}), + chSend: make(chan pendingMessage, 2048), + + pipeline: pipeline, + } + + // binding session + r.session = session.New() + return r +} + +func newConnectionWS(server *Server, conn *websocket.Conn, pipeline pipeline.Pipeline) nface.IConnection { + c, err := newWSConn(conn) + if err != nil { + // TODO panic ? + panic(err) + } + return newConnection(server, c, pipeline) +} + +func (r *Connection) Status() int32 { + return atomic.LoadInt32(&r.status) +} + +func (r *Connection) SetStatus(s int32) { + atomic.StoreInt32(&r.status, s) +} + +func (r *Connection) Conn() net.Conn { + return r.conn +} + +func (r *Connection) ID() int64 { + return r.session.ID() +} + +func (r *Connection) Session() nface.ISession { + return r.session +} + +func (r *Connection) write() { + ticker := time.NewTicker(r.server.HeartbeatInterval) + + chWrite := make(chan []byte, 1024) + + defer func() { + ticker.Stop() + close(r.chSend) + close(chWrite) + _ = r.Close() + + log.Debugf("Session write goroutine exit, SessionID=%d, UID=%d", r.session.ID(), r.session.UID()) + }() + + for { + select { + case <-ticker.C: + deadline := time.Now().Add(-2 * r.server.HeartbeatInterval).Unix() + if atomic.LoadInt64(&r.lastHeartbeatAt) < deadline { + log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&r.lastHeartbeatAt), deadline) + return + } + // TODO heartbeat data + chWrite <- []byte{} + case data := <-r.chSend: + // message marshal data + payload, err := r.server.Serializer.Marshal(data.payload) + if err != nil { + switch data.typ { + + } + break + } + // TODO new message and pipeline + + // TODO encode message ? message processor ? + + // packet pack data + p, err := r.server.Packer.Pack(packet.Data, payload) + if err != nil { + log.Error(err.Error()) + break + } + chWrite <- p + case data := <-chWrite: + // 回写数据 + if _, err := r.conn.Write(data); err != nil { + log.Error(err.Error()) + return + } + case <-r.chDie: // connection close signal + return + // TODO application quit signal + } + } +} + +func (r *Connection) Close() error { + if r.Status() == StatusClosed { + return ErrCloseClosedSession + } + r.SetStatus(StatusClosed) + + log.Debugf("close connection, ID: %d", r.ID()) + + select { + case <-r.chDie: + default: + close(r.chDie) + // TODO lifetime + } + return r.conn.Close() +} diff --git a/nnet/handler.go b/nnet/handler.go index a6f7d9e..11aaed5 100644 --- a/nnet/handler.go +++ b/nnet/handler.go @@ -3,19 +3,27 @@ package nnet import ( "fmt" "git.noahlan.cn/northlan/nnet/component" + "git.noahlan.cn/northlan/nnet/log" + "git.noahlan.cn/northlan/nnet/packet" "git.noahlan.cn/northlan/nnet/pipeline" + "net" + "time" ) type Handler struct { - server *Server // Server 引用 - pipeline pipeline.Pipeline // 通道 + server *Server // Server 引用 + pipeline pipeline.Pipeline // 通道 + processor packet.Processor // 数据包处理器 allServices map[string]*component.Service // 所有注册的Service allHandlers map[string]*component.Handler // 所有注册的Handler } -func NewHandler() *Handler { +func NewHandler(server *Server, pipeline pipeline.Pipeline, processor packet.Processor) *Handler { return &Handler{ + server: server, + pipeline: pipeline, + processor: processor, allServices: make(map[string]*component.Service), allHandlers: make(map[string]*component.Handler), } @@ -37,13 +45,57 @@ func (h *Handler) register(comp component.Component, opts []component.Option) er // 拷贝一份所有handlers for name, handler := range s.Handlers { handleName := fmt.Sprintf("%s.%s", s.Name, name) - // TODO print log + log.Debugf("register handler %s", handleName) h.allHandlers[handleName] = handler } return nil } -func (h *Handler) handle(request *Request) { - buf := make([]byte, 3) +func (h *Handler) handle(conn net.Conn) { + connection := newConnection(h.server, conn, h.pipeline) + h.server.sessionMgr.StoreSession(connection.Session()) + + _ = pool.SubmitConn(func() { + h.writeLoop(connection) + }) + + _ = pool.SubmitWorker(func() { + h.readLoop(connection) + }) + + // hook +} + +func (h *Handler) writeLoop(conn *Connection) { + +} + +func (h *Handler) readLoop(conn *Connection) { + buf := make([]byte, 4096) + for { + n, err := conn.conn.Read(buf) + if err != nil { + log.Errorf("Read message error: %s, session will be closed immediately", err.Error()) + return + } + packets, err := h.server.Packer.Unpack(buf) + if err != nil { + log.Error(err.Error()) + } + // packets 处理 + for _, p := range packets { + if err := h.processPackets(conn, p); err != nil { + log.Error(err.Error()) + return + } + } + } +} + +func (h *Handler) processPackets(conn *Connection, packets interface{}) error { + err := h.processor.ProcessPacket(conn, packets) + + conn.lastHeartbeatAt = time.Now().Unix() + return err } diff --git a/nnet/interface.go b/nnet/interface.go new file mode 100644 index 0000000..93328a5 --- /dev/null +++ b/nnet/interface.go @@ -0,0 +1 @@ +package nnet diff --git a/nnet/request.go b/nnet/request.go deleted file mode 100644 index 63d15ef..0000000 --- a/nnet/request.go +++ /dev/null @@ -1,63 +0,0 @@ -package nnet - -import ( - "git.noahlan.cn/northlan/nnet/nface" - "git.noahlan.cn/northlan/nnet/pipeline" - "git.noahlan.cn/northlan/nnet/session" - "github.com/gorilla/websocket" - "net" - "time" -) - -type Request struct { - session nface.ISession // Session - - conn net.Conn // low-level conn fd - status Status // 连接状态 - lastMid uint64 // 最近一次消息ID - lastHeartbeatAt int64 // 最近一次心跳时间 - - chDie chan struct{} // 停止通道 - chSend chan []byte // 消息发送通道 - - pipeline pipeline.Pipeline // 消息管道 -} - -func newRequest(conn net.Conn, pipeline pipeline.Pipeline) *Request { - r := &Request{ - conn: conn, - status: StatusStart, - - lastHeartbeatAt: time.Now().Unix(), - - chDie: make(chan struct{}), - chSend: make(chan []byte), - - pipeline: pipeline, - } - - // binding session - r.session = session.New() - return r -} - -func newRequestWS(conn *websocket.Conn, pipeline pipeline.Pipeline) *Request { - c, err := newWSConn(conn) - if err != nil { - // TODO panic ? - panic(err) - } - return newRequest(c, pipeline) -} - -func (r *Request) Status() Status { - return r.status -} - -func (r *Request) ID() int64 { - return r.session.ID() -} - -func (r *Request) Session() nface.ISession { - return r.session -} diff --git a/nnet/server.go b/nnet/server.go index db3da24..5077e21 100644 --- a/nnet/server.go +++ b/nnet/server.go @@ -5,6 +5,8 @@ import ( "fmt" "git.noahlan.cn/northlan/nnet/component" "git.noahlan.cn/northlan/nnet/log" + "git.noahlan.cn/northlan/nnet/message" + "git.noahlan.cn/northlan/nnet/packet" "git.noahlan.cn/northlan/nnet/pipeline" "git.noahlan.cn/northlan/nnet/session" "github.com/gorilla/websocket" @@ -18,10 +20,13 @@ import ( type ( Options struct { - Name string // 服务端名,默认为n-net - Pipeline pipeline.Pipeline // 消息管道 - RetryInterval time.Duration // 消息重试间隔时长 - Components *component.Components // 组件库 + Name string // 服务端名,默认为n-net + Pipeline pipeline.Pipeline // 消息管道 + RetryInterval time.Duration // 消息重试间隔时长 + Components *component.Components // 组件库 + Packer packet.Packer // 封包、拆包器 + PacketProcessor packet.Processor // 数据包处理器 + Serializer message.Serializer // 消息 序列化/反序列化 HeartbeatInterval time.Duration // 心跳间隔,0表示不进行心跳 WS WSOptions // websocket @@ -57,6 +62,7 @@ func NewServer(protocol, addr string, opts ...Option) *Server { options := Options{ Components: &component.Components{}, WS: WSOptions{}, + Packer: packet.NewDefaultPacker(), } s := &Server{ Options: options, @@ -68,7 +74,7 @@ func NewServer(protocol, addr string, opts ...Option) *Server { opt(&s.Options) } - s.handler = NewHandler() + s.handler = NewHandler(s, s.Options.Pipeline, s.Options.PacketProcessor) s.sessionMgr = session.NewManager() initPool(0) @@ -149,11 +155,10 @@ func (s *Server) listenAndServe() { } err = pool.SubmitConn(func() { - r := newRequest(conn, s.Pipeline) - s.handler.handle(r) + s.handler.handle(conn) }) if err != nil { - // TODO Log + log.Errorf("submit conn pool err: %s", err.Error()) continue } } diff --git a/nnet/vars.go b/nnet/vars.go deleted file mode 100644 index b2046e2..0000000 --- a/nnet/vars.go +++ /dev/null @@ -1,14 +0,0 @@ -package nnet - -type Status uint8 - -const ( - // StatusStart 开始阶段 - StatusStart Status = iota + 1 - // StatusPrepare 准备阶段 - StatusPrepare - // StatusWorking 工作阶段 - StatusWorking - // StatusClosed 连接关闭 - StatusClosed -) diff --git a/packet/interface.go b/packet/interface.go new file mode 100644 index 0000000..fdc2dc6 --- /dev/null +++ b/packet/interface.go @@ -0,0 +1,23 @@ +package packet + +import "git.noahlan.cn/northlan/nnet/nface" + +// Type 数据帧类型,如:握手,心跳,数据等 +type Type byte + +type ( + Packer interface { + // Pack 从原始raw bytes创建一个用于网络传输的 packet.Packet 结构 + Pack(typ Type, data []byte) ([]byte, error) + + // Unpack 解包 + Unpack(data []byte) ([]interface{}, error) + } + + // Processor 数据帧处理器,拆包之后的处理 + Processor interface { + // ProcessPacket 单个数据包处理方法 + // packet 为实际数据包,是 packet.Packer 的Unpack方法拆包出来的数据指针 + ProcessPacket(conn nface.IConnection, packet interface{}) error + } +) diff --git a/packet/packer.go b/packet/packer.go index d467d26..6f2f683 100644 --- a/packet/packer.go +++ b/packet/packer.go @@ -1,14 +1,16 @@ package packet -type Packer interface { - // Pack 从原始raw bytes创建一个用于网络传输的 packet.Packet 结构 - Pack(typ Type, data []byte) ([]byte, error) +import ( + "bytes" + "errors" +) - // Unpack 解包 - Unpack(data []byte) (*Packet, error) -} +var _ Packer = (*DefaultPacker)(nil) type DefaultPacker struct { + buf *bytes.Buffer + size int // 最近一次 长度 + typ byte // 最近一次 数据帧类型 } // Codec constants. @@ -17,8 +19,13 @@ const ( maxPacketSize = 64 * 1024 ) +var ErrPacketSizeExceed = errors.New("codec: packet size exceed") + func NewDefaultPacker() Packer { - return &DefaultPacker{} + return &DefaultPacker{ + buf: bytes.NewBuffer(nil), + size: -1, + } } func (d *DefaultPacker) Pack(typ Type, data []byte) ([]byte, error) { @@ -48,6 +55,69 @@ func (d *DefaultPacker) intToBytes(n uint32) []byte { return buf } -func (d *DefaultPacker) Unpack(data []byte) (*Packet, error) { - // header +func (d *DefaultPacker) Unpack(data []byte) ([]interface{}, error) { + d.buf.Write(data) // copy + + var ( + packets []interface{} + err error + ) + + // 检查包长度 + if d.buf.Len() < headLength { + return nil, err + } + + // 第一次拆包 + if d.size < 0 { + if err = d.readHeader(); err != nil { + return nil, err + } + } + + for d.size <= d.buf.Len() { + // 读取 + p := &Packet{ + Type: Type(d.typ), + Length: uint32(d.size), + Data: d.buf.Next(d.size), + } + packets = append(packets, p) + + // 剩余数据不满足至少一个数据帧,重置数据帧长度 + // 数据缓存内存 保留至 下一次进入本方法以继续拆包 + if d.buf.Len() < headLength { + d.size = -1 + break + } + // 读取下一个包 next + if err = d.readHeader(); err != nil { + return packets, err + } + } + return packets, nil +} + +func (d *DefaultPacker) readHeader() error { + header := d.buf.Next(headLength) + d.typ = header[0] + if d.typ < Handshake || d.typ > Kick { + return ErrWrongPacketType + } + d.size = d.bytesToInt(header[1:]) + + // 最大包限定 + if d.size > maxPacketSize { + return ErrPacketSizeExceed + } + return nil +} + +// Decode packet data length byte to int(Big end) +func (d *DefaultPacker) bytesToInt(b []byte) int { + result := 0 + for _, v := range b { + result = result<<8 + int(v) + } + return result } diff --git a/packet/packet.go b/packet/packet.go index 801501e..537b672 100644 --- a/packet/packet.go +++ b/packet/packet.go @@ -1,9 +1,8 @@ package packet -import "errors" - -// Type 数据帧类型,如:握手,心跳,数据等 -type Type byte +import ( + "errors" +) const ( // Default 默认,暂无意义 diff --git a/packet/processor.go b/packet/processor.go new file mode 100644 index 0000000..3a9b594 --- /dev/null +++ b/packet/processor.go @@ -0,0 +1,40 @@ +package packet + +import ( + "fmt" + "git.noahlan.cn/northlan/nnet/log" + "git.noahlan.cn/northlan/nnet/nface" +) + +type DefaultProcessor struct{} + +func NewDefaultProcessor() *DefaultProcessor { + return &DefaultProcessor{} +} + +func (d *DefaultProcessor) ProcessPacket(conn nface.IConnection, packet interface{}) error { + p := packet.(*Packet) + switch p.Type { + case Handshake: + // TODO validate handshake + if _, err := conn.Conn().Write([]byte{}); err != nil { + return err + } + conn.SetStatus(nface.StatusPrepare) + log.Debugf("Connection handshake Id=%d, Remote=%s", conn.ID(), conn.Conn().RemoteAddr()) + case HandshakeAck: + conn.SetStatus(nface.StatusWorking) + log.Debugf("Receive handshake ACK Id=%d, Remote=%s", conn.ID(), conn.Conn().RemoteAddr()) + + case Data: + if conn.Status() < nface.StatusWorking { + return fmt.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", + conn.Conn().RemoteAddr()) + } + // TODO message data 处理 + case Heartbeat: + // expected + } + + return nil +} diff --git a/session/session_mgr.go b/session/session_mgr.go index 9475e43..6821c23 100644 --- a/session/session_mgr.go +++ b/session/session_mgr.go @@ -17,21 +17,21 @@ func NewManager() *Manager { } } -func (m *Manager) storeSession(s nface.ISession) { +func (m *Manager) StoreSession(s nface.ISession) { m.Lock() defer m.Unlock() m.sessions[s.ID()] = s } -func (m *Manager) findSession(sid int64) nface.ISession { +func (m *Manager) FindSession(sid int64) nface.ISession { m.RLock() defer m.RUnlock() return m.sessions[sid] } -func (m *Manager) findOrCreateSession(sid int64) nface.ISession { +func (m *Manager) FindOrCreateSession(sid int64) nface.ISession { m.RLock() s, ok := m.sessions[sid] m.RUnlock()