wip: 又又加了一些新东西。

main
NorthLan 2 years ago
parent 6ec0070ebe
commit 115166cb11

@ -0,0 +1,18 @@
package message
type BinarySerializer struct {
}
func NewBinarySerializer() Serializer {
return &BinarySerializer{}
}
func (b *BinarySerializer) Marshal(i interface{}) ([]byte, error) {
//TODO implement me
panic("implement me")
}
func (b *BinarySerializer) Unmarshal(bytes []byte, i interface{}) error {
//TODO implement me
panic("implement me")
}

@ -0,0 +1,19 @@
package message
type (
// Marshaler 序列化
Marshaler interface {
Marshal(interface{}) ([]byte, error)
}
// Unmarshaler 反序列化
Unmarshaler interface {
Unmarshal([]byte, interface{}) error
}
// Serializer 消息 序列化/反序列化仅针对payload
Serializer interface {
Marshaler
Unmarshaler
}
)

@ -0,0 +1,29 @@
package nface
import "net"
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type IConnection interface {
// Status 获取连接状态
Status() int32
// SetStatus 设置连接状态
SetStatus(s int32)
// Conn 获取底层网络连接
Conn() net.Conn
// ID 获取连接ID
ID() int64
// Session 获取当前连接绑定的Session
Session() ISession
// Close 关闭连接
Close() error
}

@ -0,0 +1,164 @@
package nnet
import (
"errors"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/nface"
"git.noahlan.cn/northlan/nnet/packet"
"git.noahlan.cn/northlan/nnet/pipeline"
"git.noahlan.cn/northlan/nnet/session"
"github.com/gorilla/websocket"
"net"
"sync/atomic"
"time"
)
var (
ErrCloseClosedSession = errors.New("close closed session")
)
type (
Connection struct {
session nface.ISession // Session
server *Server // Server 引用
conn net.Conn // low-level conn fd
status int32 // 连接状态
lastMid uint64 // 最近一次消息ID
lastHeartbeatAt int64 // 最近一次心跳时间
chDie chan struct{} // 停止通道
chSend chan []byte // 消息发送通道
pipeline pipeline.Pipeline // 消息管道
}
pendingMessage struct {
typ byte // message type
route string // message route
mid uint64 // response message id
payload interface{} // payload
}
)
func newConnection(server *Server, conn net.Conn, pipeline pipeline.Pipeline) nface.IConnection {
r := &Connection{
conn: conn,
server: server,
status: nface.StatusStart,
lastHeartbeatAt: time.Now().Unix(),
chDie: make(chan struct{}),
chSend: make(chan pendingMessage, 2048),
pipeline: pipeline,
}
// binding session
r.session = session.New()
return r
}
func newConnectionWS(server *Server, conn *websocket.Conn, pipeline pipeline.Pipeline) nface.IConnection {
c, err := newWSConn(conn)
if err != nil {
// TODO panic ?
panic(err)
}
return newConnection(server, c, pipeline)
}
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *Connection) Conn() net.Conn {
return r.conn
}
func (r *Connection) ID() int64 {
return r.session.ID()
}
func (r *Connection) Session() nface.ISession {
return r.session
}
func (r *Connection) write() {
ticker := time.NewTicker(r.server.HeartbeatInterval)
chWrite := make(chan []byte, 1024)
defer func() {
ticker.Stop()
close(r.chSend)
close(chWrite)
_ = r.Close()
log.Debugf("Session write goroutine exit, SessionID=%d, UID=%d", r.session.ID(), r.session.UID())
}()
for {
select {
case <-ticker.C:
deadline := time.Now().Add(-2 * r.server.HeartbeatInterval).Unix()
if atomic.LoadInt64(&r.lastHeartbeatAt) < deadline {
log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&r.lastHeartbeatAt), deadline)
return
}
// TODO heartbeat data
chWrite <- []byte{}
case data := <-r.chSend:
// message marshal data
payload, err := r.server.Serializer.Marshal(data.payload)
if err != nil {
switch data.typ {
}
break
}
// TODO new message and pipeline
// TODO encode message ? message processor ?
// packet pack data
p, err := r.server.Packer.Pack(packet.Data, payload)
if err != nil {
log.Error(err.Error())
break
}
chWrite <- p
case data := <-chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
log.Error(err.Error())
return
}
case <-r.chDie: // connection close signal
return
// TODO application quit signal
}
}
}
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
log.Debugf("close connection, ID: %d", r.ID())
select {
case <-r.chDie:
default:
close(r.chDie)
// TODO lifetime
}
return r.conn.Close()
}

@ -3,19 +3,27 @@ package nnet
import (
"fmt"
"git.noahlan.cn/northlan/nnet/component"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/packet"
"git.noahlan.cn/northlan/nnet/pipeline"
"net"
"time"
)
type Handler struct {
server *Server // Server 引用
pipeline pipeline.Pipeline // 通道
server *Server // Server 引用
pipeline pipeline.Pipeline // 通道
processor packet.Processor // 数据包处理器
allServices map[string]*component.Service // 所有注册的Service
allHandlers map[string]*component.Handler // 所有注册的Handler
}
func NewHandler() *Handler {
func NewHandler(server *Server, pipeline pipeline.Pipeline, processor packet.Processor) *Handler {
return &Handler{
server: server,
pipeline: pipeline,
processor: processor,
allServices: make(map[string]*component.Service),
allHandlers: make(map[string]*component.Handler),
}
@ -37,13 +45,57 @@ func (h *Handler) register(comp component.Component, opts []component.Option) er
// 拷贝一份所有handlers
for name, handler := range s.Handlers {
handleName := fmt.Sprintf("%s.%s", s.Name, name)
// TODO print log
log.Debugf("register handler %s", handleName)
h.allHandlers[handleName] = handler
}
return nil
}
func (h *Handler) handle(request *Request) {
buf := make([]byte, 3)
func (h *Handler) handle(conn net.Conn) {
connection := newConnection(h.server, conn, h.pipeline)
h.server.sessionMgr.StoreSession(connection.Session())
_ = pool.SubmitConn(func() {
h.writeLoop(connection)
})
_ = pool.SubmitWorker(func() {
h.readLoop(connection)
})
// hook
}
func (h *Handler) writeLoop(conn *Connection) {
}
func (h *Handler) readLoop(conn *Connection) {
buf := make([]byte, 4096)
for {
n, err := conn.conn.Read(buf)
if err != nil {
log.Errorf("Read message error: %s, session will be closed immediately", err.Error())
return
}
packets, err := h.server.Packer.Unpack(buf)
if err != nil {
log.Error(err.Error())
}
// packets 处理
for _, p := range packets {
if err := h.processPackets(conn, p); err != nil {
log.Error(err.Error())
return
}
}
}
}
func (h *Handler) processPackets(conn *Connection, packets interface{}) error {
err := h.processor.ProcessPacket(conn, packets)
conn.lastHeartbeatAt = time.Now().Unix()
return err
}

@ -0,0 +1 @@
package nnet

@ -1,63 +0,0 @@
package nnet
import (
"git.noahlan.cn/northlan/nnet/nface"
"git.noahlan.cn/northlan/nnet/pipeline"
"git.noahlan.cn/northlan/nnet/session"
"github.com/gorilla/websocket"
"net"
"time"
)
type Request struct {
session nface.ISession // Session
conn net.Conn // low-level conn fd
status Status // 连接状态
lastMid uint64 // 最近一次消息ID
lastHeartbeatAt int64 // 最近一次心跳时间
chDie chan struct{} // 停止通道
chSend chan []byte // 消息发送通道
pipeline pipeline.Pipeline // 消息管道
}
func newRequest(conn net.Conn, pipeline pipeline.Pipeline) *Request {
r := &Request{
conn: conn,
status: StatusStart,
lastHeartbeatAt: time.Now().Unix(),
chDie: make(chan struct{}),
chSend: make(chan []byte),
pipeline: pipeline,
}
// binding session
r.session = session.New()
return r
}
func newRequestWS(conn *websocket.Conn, pipeline pipeline.Pipeline) *Request {
c, err := newWSConn(conn)
if err != nil {
// TODO panic ?
panic(err)
}
return newRequest(c, pipeline)
}
func (r *Request) Status() Status {
return r.status
}
func (r *Request) ID() int64 {
return r.session.ID()
}
func (r *Request) Session() nface.ISession {
return r.session
}

@ -5,6 +5,8 @@ import (
"fmt"
"git.noahlan.cn/northlan/nnet/component"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/message"
"git.noahlan.cn/northlan/nnet/packet"
"git.noahlan.cn/northlan/nnet/pipeline"
"git.noahlan.cn/northlan/nnet/session"
"github.com/gorilla/websocket"
@ -18,10 +20,13 @@ import (
type (
Options struct {
Name string // 服务端名默认为n-net
Pipeline pipeline.Pipeline // 消息管道
RetryInterval time.Duration // 消息重试间隔时长
Components *component.Components // 组件库
Name string // 服务端名默认为n-net
Pipeline pipeline.Pipeline // 消息管道
RetryInterval time.Duration // 消息重试间隔时长
Components *component.Components // 组件库
Packer packet.Packer // 封包、拆包器
PacketProcessor packet.Processor // 数据包处理器
Serializer message.Serializer // 消息 序列化/反序列化
HeartbeatInterval time.Duration // 心跳间隔0表示不进行心跳
WS WSOptions // websocket
@ -57,6 +62,7 @@ func NewServer(protocol, addr string, opts ...Option) *Server {
options := Options{
Components: &component.Components{},
WS: WSOptions{},
Packer: packet.NewDefaultPacker(),
}
s := &Server{
Options: options,
@ -68,7 +74,7 @@ func NewServer(protocol, addr string, opts ...Option) *Server {
opt(&s.Options)
}
s.handler = NewHandler()
s.handler = NewHandler(s, s.Options.Pipeline, s.Options.PacketProcessor)
s.sessionMgr = session.NewManager()
initPool(0)
@ -149,11 +155,10 @@ func (s *Server) listenAndServe() {
}
err = pool.SubmitConn(func() {
r := newRequest(conn, s.Pipeline)
s.handler.handle(r)
s.handler.handle(conn)
})
if err != nil {
// TODO Log
log.Errorf("submit conn pool err: %s", err.Error())
continue
}
}

@ -1,14 +0,0 @@
package nnet
type Status uint8
const (
// StatusStart 开始阶段
StatusStart Status = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)

@ -0,0 +1,23 @@
package packet
import "git.noahlan.cn/northlan/nnet/nface"
// Type 数据帧类型,如:握手,心跳,数据等
type Type byte
type (
Packer interface {
// Pack 从原始raw bytes创建一个用于网络传输的 packet.Packet 结构
Pack(typ Type, data []byte) ([]byte, error)
// Unpack 解包
Unpack(data []byte) ([]interface{}, error)
}
// Processor 数据帧处理器,拆包之后的处理
Processor interface {
// ProcessPacket 单个数据包处理方法
// packet 为实际数据包,是 packet.Packer 的Unpack方法拆包出来的数据指针
ProcessPacket(conn nface.IConnection, packet interface{}) error
}
)

@ -1,14 +1,16 @@
package packet
type Packer interface {
// Pack 从原始raw bytes创建一个用于网络传输的 packet.Packet 结构
Pack(typ Type, data []byte) ([]byte, error)
import (
"bytes"
"errors"
)
// Unpack 解包
Unpack(data []byte) (*Packet, error)
}
var _ Packer = (*DefaultPacker)(nil)
type DefaultPacker struct {
buf *bytes.Buffer
size int // 最近一次 长度
typ byte // 最近一次 数据帧类型
}
// Codec constants.
@ -17,8 +19,13 @@ const (
maxPacketSize = 64 * 1024
)
var ErrPacketSizeExceed = errors.New("codec: packet size exceed")
func NewDefaultPacker() Packer {
return &DefaultPacker{}
return &DefaultPacker{
buf: bytes.NewBuffer(nil),
size: -1,
}
}
func (d *DefaultPacker) Pack(typ Type, data []byte) ([]byte, error) {
@ -48,6 +55,69 @@ func (d *DefaultPacker) intToBytes(n uint32) []byte {
return buf
}
func (d *DefaultPacker) Unpack(data []byte) (*Packet, error) {
// header
func (d *DefaultPacker) Unpack(data []byte) ([]interface{}, error) {
d.buf.Write(data) // copy
var (
packets []interface{}
err error
)
// 检查包长度
if d.buf.Len() < headLength {
return nil, err
}
// 第一次拆包
if d.size < 0 {
if err = d.readHeader(); err != nil {
return nil, err
}
}
for d.size <= d.buf.Len() {
// 读取
p := &Packet{
Type: Type(d.typ),
Length: uint32(d.size),
Data: d.buf.Next(d.size),
}
packets = append(packets, p)
// 剩余数据不满足至少一个数据帧,重置数据帧长度
// 数据缓存内存 保留至 下一次进入本方法以继续拆包
if d.buf.Len() < headLength {
d.size = -1
break
}
// 读取下一个包 next
if err = d.readHeader(); err != nil {
return packets, err
}
}
return packets, nil
}
func (d *DefaultPacker) readHeader() error {
header := d.buf.Next(headLength)
d.typ = header[0]
if d.typ < Handshake || d.typ > Kick {
return ErrWrongPacketType
}
d.size = d.bytesToInt(header[1:])
// 最大包限定
if d.size > maxPacketSize {
return ErrPacketSizeExceed
}
return nil
}
// Decode packet data length byte to int(Big end)
func (d *DefaultPacker) bytesToInt(b []byte) int {
result := 0
for _, v := range b {
result = result<<8 + int(v)
}
return result
}

@ -1,9 +1,8 @@
package packet
import "errors"
// Type 数据帧类型,如:握手,心跳,数据等
type Type byte
import (
"errors"
)
const (
// Default 默认,暂无意义

@ -0,0 +1,40 @@
package packet
import (
"fmt"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/nface"
)
type DefaultProcessor struct{}
func NewDefaultProcessor() *DefaultProcessor {
return &DefaultProcessor{}
}
func (d *DefaultProcessor) ProcessPacket(conn nface.IConnection, packet interface{}) error {
p := packet.(*Packet)
switch p.Type {
case Handshake:
// TODO validate handshake
if _, err := conn.Conn().Write([]byte{}); err != nil {
return err
}
conn.SetStatus(nface.StatusPrepare)
log.Debugf("Connection handshake Id=%d, Remote=%s", conn.ID(), conn.Conn().RemoteAddr())
case HandshakeAck:
conn.SetStatus(nface.StatusWorking)
log.Debugf("Receive handshake ACK Id=%d, Remote=%s", conn.ID(), conn.Conn().RemoteAddr())
case Data:
if conn.Status() < nface.StatusWorking {
return fmt.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
conn.Conn().RemoteAddr())
}
// TODO message data 处理
case Heartbeat:
// expected
}
return nil
}

@ -17,21 +17,21 @@ func NewManager() *Manager {
}
}
func (m *Manager) storeSession(s nface.ISession) {
func (m *Manager) StoreSession(s nface.ISession) {
m.Lock()
defer m.Unlock()
m.sessions[s.ID()] = s
}
func (m *Manager) findSession(sid int64) nface.ISession {
func (m *Manager) FindSession(sid int64) nface.ISession {
m.RLock()
defer m.RUnlock()
return m.sessions[sid]
}
func (m *Manager) findOrCreateSession(sid int64) nface.ISession {
func (m *Manager) FindOrCreateSession(sid int64) nface.ISession {
m.RLock()
s, ok := m.sessions[sid]
m.RUnlock()

Loading…
Cancel
Save