package connection import ( "errors" "fmt" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/session" "git.noahlan.cn/noahlan/ntool/ndef" "git.noahlan.cn/noahlan/ntool/nlog" "github.com/panjf2000/ants/v2" "net" "sync/atomic" ) var ( ErrCloseClosedSession = errors.New("close closed session") // ErrBrokenPipe represents the low-level connection has broken. ErrBrokenPipe = errors.New("broken low-level pipe") ) const ( // StatusStart 开始阶段 StatusStart int32 = iota + 1 // StatusPrepare 准备阶段 StatusPrepare // StatusPending 等待工作阶段 StatusPending // StatusWorking 工作阶段 StatusWorking // StatusClosed 连接关闭 StatusClosed ) type ConnType int const ( ConnTypeTCP ConnType = iota // TCP connection ConnTypeWS // Websocket connection ConnTypeSerial // Websocket connection ) type ( Connection struct { conf Config // 配置 session *session.Session // Session pool *ants.Pool // 连接池 status int32 // 连接状态 conn net.Conn // low-level conn fd typ ConnType // 连接类型 packer packet.Packer // 封包、拆包器 serializer ndef.Serializer // 消息序列化/反序列化器 pipeline Pipeline // 连接生命周期管理 handleFn func(conn *Connection, pkg packet.IPacket) // 消息处理方法 lastMid uint64 // 最近一次消息ID chDie chan struct{} // 停止通道 chSend chan PendingMessage // 消息发送通道(结构化消息) chWrite chan []byte // 消息发送通道(二进制消息) } packetFn func(conn *Connection, pkg packet.IPacket) Config struct { LogDebug bool LogPrefix string } PendingMessage struct { header any payload any } ) func NewConnection( id int64, conn net.Conn, pool *ants.Pool, conf Config, packerBuilder packet.PackerBuilder, serializer ndef.Serializer, pipeline Pipeline, handleFn packetFn) *Connection { r := &Connection{ conf: conf, session: session.NewSession(id), pool: pool, status: StatusStart, conn: conn, typ: ConnTypeTCP, packer: packerBuilder(), serializer: serializer, pipeline: pipeline, handleFn: handleFn, lastMid: 0, chDie: make(chan struct{}), chSend: make(chan PendingMessage, 128), chWrite: make(chan []byte, 128), } if _, ok := conn.(*WSConn); ok { r.typ = ConnTypeWS return r } if _, ok := conn.(*SerialConn); ok { r.typ = ConnTypeSerial return r } return r } func (r *Connection) Send(header, payload any) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chSend <- PendingMessage{ header: header, payload: payload, } return err } func (r *Connection) SendBytes(data []byte) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chWrite <- data return err } func (r *Connection) Status() int32 { return atomic.LoadInt32(&r.status) } func (r *Connection) SetStatus(s int32) { atomic.StoreInt32(&r.status, s) } func (r *Connection) Conn() (net.Conn, ConnType) { return r.conn, r.typ } func (r *Connection) ID() int64 { return r.session.ID() } func (r *Connection) Session() *session.Session { return r.session } func (r *Connection) LastMID() uint64 { return r.lastMid } func (r *Connection) SetLastMID(mid uint64) { atomic.StoreUint64(&r.lastMid, mid) } func (r *Connection) Serve() { _ = r.pool.Submit(func() { r.write() }) _ = r.pool.Submit(func() { r.read() }) } func (r *Connection) write() { defer func() { close(r.chSend) close(r.chWrite) _ = r.Close() if r.conf.LogDebug { nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s", r.conf.LogPrefix, r.ID(), r.session.UID()) } }() for { select { case data := <-r.chSend: // marshal packet body (data) if r.serializer == nil { if _, ok := data.payload.([]byte); !ok { nlog.Errorf("%s serializer is nil, but payload type not []byte", r.conf.LogPrefix) break } } else { payload, err := r.serializer.Marshal(data.payload) if err != nil { nlog.Errorf("%s message body marshal err: %v", r.conf.LogPrefix, err) break } data.payload = payload } // invoke pipeline if pipe := r.pipeline; pipe != nil { err := pipe.Outbound().Process(r, data) if err != nil { nlog.Errorf("%s pipeline err: %s", r.conf.LogPrefix, err.Error()) } } // packet pack data p, err := r.packer.Pack(data.header, data.payload.([]byte)) if err != nil { nlog.Errorf("%s pack err: %s", r.conf.LogPrefix, err.Error()) break } r.chWrite <- p case data := <-r.chWrite: // 回写数据 if _, err := r.conn.Write(data); err != nil { nlog.Errorf("%s write data err: %s", r.conf.LogPrefix, err.Error()) break } //nlog.Debugf("write data %v", data) case <-r.chDie: // connection close signal return // TODO //case <-r.ngin.dieChan: // application quit signal // return } } } func (r *Connection) read() { defer func() { _ = r.Close() }() buf := make([]byte, 4096) for { n, err := r.conn.Read(buf) //nlog.Debugf("receive data %v", buf[:n]) if err != nil { nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately", r.conf.LogPrefix, err.Error()) return } if n == 0 { nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately", r.conf.LogPrefix) return } if r.packer == nil { nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.conf.LogPrefix) return } // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 packets, err := r.packer.Unpack(buf[:n]) if err != nil { nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error()) } // packets 处理 for _, p := range packets { if err := r.processPacket(p); err != nil { nlog.Errorf("%s process packet err: %s", r.conf.LogPrefix, err.Error()) continue } } } } func (r *Connection) processPacket(packet packet.IPacket) error { if pipe := r.pipeline; pipe != nil { err := pipe.Inbound().Process(r, packet) if err != nil { return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error())) } } if r.Status() == StatusWorking { // 处理包消息 _ = r.pool.Submit(func() { r.handleFn(r, packet) }) } return nil } func (r *Connection) DieChan() chan struct{} { return r.chDie } func (r *Connection) Close() error { if r.Status() == StatusClosed { return ErrCloseClosedSession } r.SetStatus(StatusClosed) if r.conf.LogDebug { nlog.Debugf("%s close connection, ID: %d", r.conf.LogPrefix, r.ID()) } select { case <-r.chDie: default: close(r.chDie) } r.session.Close() return r.conn.Close() }