package core import ( "errors" "fmt" "git.noahlan.cn/northlan/nnet/internal/pool" "git.noahlan.cn/northlan/nnet/log" "git.noahlan.cn/northlan/nnet/packet" "git.noahlan.cn/northlan/nnet/scheduler" "git.noahlan.cn/northlan/nnet/session" "net" "sync/atomic" "time" ) var ( ErrCloseClosedSession = errors.New("close closed session") // ErrBrokenPipe represents the low-level connection has broken. ErrBrokenPipe = errors.New("broken low-level pipe") // ErrBufferExceed indicates that the current session buffer is full and // can not receive more data. ErrBufferExceed = errors.New("session send buffer exceed") ) const ( // StatusStart 开始阶段 StatusStart int32 = iota + 1 // StatusPrepare 准备阶段 StatusPrepare // StatusPending 等待工作阶段 StatusPending // StatusWorking 工作阶段 StatusWorking // StatusClosed 连接关闭 StatusClosed ) type ( Connection struct { session *session.Session // Session ngin *engine // engine conn net.Conn // low-level conn fd status int32 // 连接状态 lastMid uint64 // 最近一次消息ID // TODO 考虑独立出去作为一个中间件 lastHeartbeatAt int64 // 最近一次心跳时间 chDie chan struct{} // 停止通道 chSend chan pendingMessage // 消息发送通道(结构化消息) chWrite chan []byte // 消息发送通道(二进制消息) } pendingMessage struct { header interface{} payload interface{} } ) func newConn(server *engine, conn net.Conn) *Connection { r := &Connection{ conn: conn, ngin: server, status: StatusStart, lastHeartbeatAt: time.Now().Unix(), chDie: make(chan struct{}), chSend: make(chan pendingMessage, 128), chWrite: make(chan []byte, 128), } // binding session r.session = session.NewSession() return r } func (r *Connection) Send(header, payload interface{}) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chSend <- pendingMessage{ header: header, payload: payload, } return err } func (r *Connection) SendBytes(data []byte) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chWrite <- data return err } func (r *Connection) Status() int32 { return atomic.LoadInt32(&r.status) } func (r *Connection) SetStatus(s int32) { atomic.StoreInt32(&r.status, s) } func (r *Connection) Conn() net.Conn { return r.conn } func (r *Connection) ID() int64 { return r.session.ID() } func (r *Connection) SetLastHeartbeatAt(t int64) { atomic.StoreInt64(&r.lastHeartbeatAt, t) } func (r *Connection) Session() *session.Session { return r.session } func (r *Connection) LastMID() uint64 { return r.lastMid } func (r *Connection) SetLastMID(mid uint64) { atomic.StoreUint64(&r.lastMid, mid) } func (r *Connection) serve() { _ = pool.SubmitConn(func() { r.write() }) _ = pool.SubmitWorker(func() { r.read() }) } func (r *Connection) write() { ticker := time.NewTicker(r.ngin.heartbeatInterval) defer func() { ticker.Stop() close(r.chSend) close(r.chWrite) _ = r.Close() log.Debugf("Connection write goroutine exit, ConnID=%d, SessionUID=%s", r.ID(), r.session.UID()) }() for { select { case <-ticker.C: // TODO heartbeat enable control deadline := time.Now().Add(-2 * r.ngin.heartbeatInterval).Unix() if atomic.LoadInt64(&r.lastHeartbeatAt) < deadline { log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&r.lastHeartbeatAt), deadline) return } // TODO heartbeat data r.chWrite <- []byte{} case data := <-r.chSend: // marshal packet body (data) if r.ngin.serializer == nil { if _, ok := data.payload.([]byte); !ok { log.Errorf("serializer is nil, but payload type not []byte") break } } else { payload, err := r.ngin.serializer.Marshal(data.payload) if err != nil { log.Errorf("message body marshal err: %v", err) break } data.payload = payload } // invoke pipeline if pipe := r.ngin.pipeline; pipe != nil { err := pipe.Outbound().Process(r, data) if err != nil { log.Errorf("broken pipeline err: %s", err.Error()) break } } // packet pack data p, err := r.ngin.packer.Pack(data.header, data.payload.([]byte)) if err != nil { log.Error(err.Error()) break } r.chWrite <- p case data := <-r.chWrite: // 回写数据 if _, err := r.conn.Write(data); err != nil { log.Error(err.Error()) return } case <-r.chDie: // connection close signal return case <-r.ngin.dieChan: // application quit signal return } } } func (r *Connection) read() { defer func() { r.Close() }() buf := make([]byte, 4096) for { n, err := r.conn.Read(buf) if err != nil { log.Errorf("Read message error: %s, session will be closed immediately", err.Error()) return } if r.ngin.packer == nil { log.Errorf("unexpected error: packer is nil") return } // warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理 packets, err := r.ngin.packer.Unpack(buf[:n]) if err != nil { log.Error(err.Error()) } // packets 处理 for _, p := range packets { if err := r.processPacket(p); err != nil { log.Error(err.Error()) continue } } } } func (r *Connection) processPacket(packet packet.IPacket) error { if pipe := r.ngin.pipeline; pipe != nil { err := pipe.Inbound().Process(r, packet) if err != nil { return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error())) } } // packet processor err := r.ngin.processor.Process(r, packet) if err != nil { return err } if r.Status() == StatusWorking { // HandleFunc _ = pool.SubmitWorker(func() { r.ngin.handler.Handle(r, packet) }) } return err } func (r *Connection) Close() error { if r.Status() == StatusClosed { return ErrCloseClosedSession } r.SetStatus(StatusClosed) log.Debugf("close connection, ID: %d", r.ID()) select { case <-r.chDie: default: close(r.chDie) scheduler.PushTask(func() { Lifetime.Close(r) }) } return r.conn.Close() }