package conn import ( "git.noahlan.cn/noahlan/nnet/session" "net" "sync/atomic" ) type ( Connection struct { session *session.Session // Session status int32 // 连接状态 conn net.Conn // low-level conn fd typ ConnType // 连接类型 lastMid uint64 // 最近一次消息ID chDie chan struct{} // 停止通道 chSend chan PendingMessage // 消息发送通道(结构化消息) chWrite chan []byte // 消息发送通道(二进制消息) } ) func NewConnection(id int64, rawC net.Conn, sendSize, writeSize int) *Connection { r := &Connection{ session: session.NewSession(id), status: StatusStart, conn: rawC, typ: ConnTypeTCP, lastMid: 0, chDie: make(chan struct{}), chSend: make(chan PendingMessage, sendSize), chWrite: make(chan []byte, writeSize), } if _, ok := rawC.(*WSConn); ok { r.typ = ConnTypeWS return r } if _, ok := rawC.(*SerialConn); ok { r.typ = ConnTypeSerial return r } return r } func (r *Connection) Send(header, payload any) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chSend <- PendingMessage{ Header: header, Payload: payload, } return err } func (r *Connection) SendBytes(data []byte) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() r.chWrite <- data return err } func (r *Connection) Status() int32 { return atomic.LoadInt32(&r.status) } func (r *Connection) SetStatus(s int32) { atomic.StoreInt32(&r.status, s) } func (r *Connection) Type() ConnType { return r.typ } func (r *Connection) Conn() net.Conn { return r.conn } func (r *Connection) WsConn() *WSConn { if r.typ == ConnTypeWS { return r.conn.(*WSConn) } return nil } func (r *Connection) SerialConn() *SerialConn { if r.typ == ConnTypeSerial { return r.conn.(*SerialConn) } return nil } func (r *Connection) ID() int64 { return r.session.ID() } func (r *Connection) Session() *session.Session { return r.session } func (r *Connection) LastMID() uint64 { return r.lastMid } func (r *Connection) SetLastMID(mid uint64) { atomic.StoreUint64(&r.lastMid, mid) } func (r *Connection) ChDie() chan struct{} { return r.chDie } func (r *Connection) ChSend() chan PendingMessage { return r.chSend } func (r *Connection) ChWrite() chan []byte { return r.chWrite } func (r *Connection) Close() error { if r.Status() == StatusClosed { return ErrCloseClosedSession } r.SetStatus(StatusClosed) select { case <-r.chDie: close(r.chSend) close(r.chWrite) default: close(r.chDie) } r.session.Close() return r.conn.Close() }