You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
148 lines
2.6 KiB
Go
148 lines
2.6 KiB
Go
package conn
|
|
|
|
import (
|
|
"git.noahlan.cn/noahlan/nnet/session"
|
|
"net"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type (
|
|
Connection struct {
|
|
session *session.Session // Session
|
|
|
|
status int32 // 连接状态
|
|
conn net.Conn // low-level conn fd
|
|
typ ConnType // 连接类型
|
|
|
|
lastMid uint64 // 最近一次消息ID
|
|
|
|
chDie chan struct{} // 停止通道
|
|
chSend chan PendingMessage // 消息发送通道(结构化消息)
|
|
chWrite chan []byte // 消息发送通道(二进制消息)
|
|
}
|
|
)
|
|
|
|
func NewConnection(id int64, rawC net.Conn) *Connection {
|
|
r := &Connection{
|
|
session: session.NewSession(id),
|
|
|
|
status: StatusStart,
|
|
conn: rawC,
|
|
typ: ConnTypeTCP,
|
|
|
|
lastMid: 0,
|
|
|
|
chDie: make(chan struct{}),
|
|
chSend: make(chan PendingMessage, 128),
|
|
chWrite: make(chan []byte, 128),
|
|
}
|
|
if _, ok := rawC.(*WSConn); ok {
|
|
r.typ = ConnTypeWS
|
|
return r
|
|
}
|
|
if _, ok := rawC.(*SerialConn); ok {
|
|
r.typ = ConnTypeSerial
|
|
return r
|
|
}
|
|
return r
|
|
}
|
|
|
|
func (r *Connection) Send(header, payload any) (err error) {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
err = ErrBrokenPipe
|
|
}
|
|
}()
|
|
r.chSend <- PendingMessage{
|
|
Header: header,
|
|
Payload: payload,
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (r *Connection) SendBytes(data []byte) (err error) {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
err = ErrBrokenPipe
|
|
}
|
|
}()
|
|
r.chWrite <- data
|
|
return err
|
|
}
|
|
|
|
func (r *Connection) Status() int32 {
|
|
return atomic.LoadInt32(&r.status)
|
|
}
|
|
|
|
func (r *Connection) SetStatus(s int32) {
|
|
atomic.StoreInt32(&r.status, s)
|
|
}
|
|
|
|
func (r *Connection) Type() ConnType {
|
|
return r.typ
|
|
}
|
|
|
|
func (r *Connection) Conn() net.Conn {
|
|
return r.conn
|
|
}
|
|
|
|
func (r *Connection) WsConn() *WSConn {
|
|
if r.typ == ConnTypeWS {
|
|
return r.conn.(*WSConn)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Connection) SerialConn() *SerialConn {
|
|
if r.typ == ConnTypeSerial {
|
|
return r.conn.(*SerialConn)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Connection) ID() int64 {
|
|
return r.session.ID()
|
|
}
|
|
|
|
func (r *Connection) Session() *session.Session {
|
|
return r.session
|
|
}
|
|
|
|
func (r *Connection) LastMID() uint64 {
|
|
return r.lastMid
|
|
}
|
|
|
|
func (r *Connection) SetLastMID(mid uint64) {
|
|
atomic.StoreUint64(&r.lastMid, mid)
|
|
}
|
|
|
|
func (r *Connection) ChDie() chan struct{} {
|
|
return r.chDie
|
|
}
|
|
|
|
func (r *Connection) ChSend() chan PendingMessage {
|
|
return r.chSend
|
|
}
|
|
|
|
func (r *Connection) ChWrite() chan []byte {
|
|
return r.chWrite
|
|
}
|
|
|
|
func (r *Connection) Close() error {
|
|
if r.Status() == StatusClosed {
|
|
return ErrCloseClosedSession
|
|
}
|
|
r.SetStatus(StatusClosed)
|
|
|
|
select {
|
|
case <-r.chDie:
|
|
close(r.chSend)
|
|
close(r.chWrite)
|
|
default:
|
|
close(r.chDie)
|
|
}
|
|
|
|
r.session.Close()
|
|
return r.conn.Close()
|
|
}
|