package core import ( "errors" "fmt" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "git.noahlan.cn/noahlan/ntools-go/core/pool" "net" "sync/atomic" ) var ( ErrCloseClosedSession = errors.New("close closed session") // ErrBrokenPipe represents the low-level connection has broken. ErrBrokenPipe = errors.New("broken low-level pipe") ) const ( // StatusStart 开始阶段 StatusStart int32 = iota + 1 // StatusPrepare 准备阶段 StatusPrepare // StatusPending 等待工作阶段 StatusPending // StatusWorking 工作阶段 StatusWorking // StatusClosed 连接关闭 StatusClosed ) type ( connection struct { session *session // Session ngin *engine // engine status int32 // 连接状态 conn net.Conn // low-level conn fd isWS bool // 是否为websocket packer packet.Packer // 封包、拆包器 lastMid uint64 // 最近一次消息ID chDie chan struct{} // 停止通道 chSend chan PendingMessage // 消息发送通道(结构化消息) chWrite chan []byte // 消息发送通道(二进制消息) } PendingMessage struct { header interface{} payload interface{} } ) func newConnection(server *engine, conn net.Conn) *connection { r := &connection{ ngin: server, status: StatusStart, conn: conn, packer: server.packerFn(), chDie: make(chan struct{}), chSend: make(chan PendingMessage, 128), chWrite: make(chan []byte, 128), } _, r.isWS = conn.(*WSConn) // binding session r.session = newSession(r) return r } func (r *connection) Send(header, payload interface{}) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chSend <- PendingMessage{ header: header, payload: payload, } return err } func (r *connection) SendBytes(data []byte) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chWrite <- data return err } func (r *connection) Status() int32 { return atomic.LoadInt32(&r.status) } func (r *connection) SetStatus(s int32) { atomic.StoreInt32(&r.status, s) } func (r *connection) Conn() (net.Conn, bool) { return r.conn, r.isWS } func (r *connection) ID() int64 { return r.session.ID() } func (r *connection) Session() entity.Session { return r.session } func (r *connection) LastMID() uint64 { return r.lastMid } func (r *connection) SetLastMID(mid uint64) { atomic.StoreUint64(&r.lastMid, mid) } func (r *connection) serve() { _ = pool.Submit(func() { r.write() }) _ = pool.Submit(func() { r.read() }) } func (r *connection) write() { defer func() { close(r.chSend) close(r.chWrite) _ = r.Close() nlog.Debugf("[writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s", r.ID(), r.session.UID()) }() for { select { case data := <-r.chSend: // marshal packet body (data) if r.ngin.serializer == nil { if _, ok := data.payload.([]byte); !ok { nlog.Errorf("serializer is nil, but payload type not []byte") break } } else { payload, err := r.ngin.serializer.Marshal(data.payload) if err != nil { nlog.Errorf("message body marshal err: %v", err) break } data.payload = payload } // invoke pipeline if pipe := r.ngin.pipeline; pipe != nil { err := pipe.Outbound().Process(r, data) if err != nil { nlog.Errorf("pipeline err: %s", err.Error()) } } // packet pack data p, err := r.packer.Pack(data.header, data.payload.([]byte)) if err != nil { nlog.Error(err.Error()) break } r.chWrite <- p case data := <-r.chWrite: // 回写数据 if _, err := r.conn.Write(data); err != nil { nlog.Error(err.Error()) break } //nlog.Debugf("write data %v", data) case <-r.chDie: // connection close signal return case <-r.ngin.dieChan: // application quit signal return } } } func (r *connection) read() { defer func() { _ = r.Close() }() buf := make([]byte, 4096) for { n, err := r.conn.Read(buf) //nlog.Debugf("receive data %v", buf[:n]) if err != nil { nlog.Errorf("[readLoop] Read message error: %s, session will be closed immediately", err.Error()) return } if r.packer == nil { nlog.Errorf("[readLoop] unexpected error: packer is nil") return } // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 packets, err := r.packer.Unpack(buf[:n]) if err != nil { nlog.Error(err.Error()) } // packets 处理 for _, p := range packets { if err := r.processPacket(p); err != nil { nlog.Error(err.Error()) continue } } } } func (r *connection) processPacket(packet packet.IPacket) error { if pipe := r.ngin.pipeline; pipe != nil { err := pipe.Inbound().Process(r, packet) if err != nil { return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error())) } } if r.Status() == StatusWorking { // HandleFunc _ = pool.Submit(func() { r.ngin.handler.Handle(r, packet) }) } return nil } func (r *connection) Close() error { if r.Status() == StatusClosed { return ErrCloseClosedSession } r.SetStatus(StatusClosed) nlog.Debugf("close connection, ID: %d", r.ID()) select { case <-r.chDie: default: close(r.chDie) scheduler.PushTask(func() { Lifetime.Close(r) }) } r.session.Close() return r.conn.Close() }