|
|
|
|
package connection
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/packet"
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/session"
|
|
|
|
|
"git.noahlan.cn/noahlan/ntool/ndef"
|
|
|
|
|
"git.noahlan.cn/noahlan/ntool/nlog"
|
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
|
|
|
"net"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
ErrCloseClosedSession = errors.New("close closed session")
|
|
|
|
|
// ErrBrokenPipe represents the low-level connection has broken.
|
|
|
|
|
ErrBrokenPipe = errors.New("broken low-level pipe")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// StatusStart 开始阶段
|
|
|
|
|
StatusStart int32 = iota + 1
|
|
|
|
|
// StatusPrepare 准备阶段
|
|
|
|
|
StatusPrepare
|
|
|
|
|
// StatusPending 等待工作阶段
|
|
|
|
|
StatusPending
|
|
|
|
|
// StatusWorking 工作阶段
|
|
|
|
|
StatusWorking
|
|
|
|
|
// StatusClosed 连接关闭
|
|
|
|
|
StatusClosed
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type ConnType int
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
ConnTypeTCP ConnType = iota // TCP connection
|
|
|
|
|
ConnTypeWS // Websocket connection
|
|
|
|
|
ConnTypeSerial // Websocket connection
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type (
|
|
|
|
|
Connection struct {
|
|
|
|
|
conf Config // 配置
|
|
|
|
|
session *session.Session // Session
|
|
|
|
|
pool *ants.Pool // 连接池
|
|
|
|
|
|
|
|
|
|
status int32 // 连接状态
|
|
|
|
|
conn net.Conn // low-level conn fd
|
|
|
|
|
typ ConnType // 连接类型
|
|
|
|
|
|
|
|
|
|
packer packet.Packer // 封包、拆包器
|
|
|
|
|
serializer ndef.Serializer // 消息序列化/反序列化器
|
|
|
|
|
pipeline Pipeline // 连接生命周期管理
|
|
|
|
|
handleFn func(conn *Connection, pkg packet.IPacket) // 消息处理方法
|
|
|
|
|
|
|
|
|
|
lastMid uint64 // 最近一次消息ID
|
|
|
|
|
|
|
|
|
|
chDie chan struct{} // 停止通道
|
|
|
|
|
chSend chan PendingMessage // 消息发送通道(结构化消息)
|
|
|
|
|
chWrite chan []byte // 消息发送通道(二进制消息)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
packetFn func(conn *Connection, pkg packet.IPacket)
|
|
|
|
|
|
|
|
|
|
Config struct {
|
|
|
|
|
LogDebug bool
|
|
|
|
|
LogPrefix string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PendingMessage struct {
|
|
|
|
|
header any
|
|
|
|
|
payload any
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func NewConnection(
|
|
|
|
|
id int64,
|
|
|
|
|
conn net.Conn,
|
|
|
|
|
pool *ants.Pool,
|
|
|
|
|
conf Config,
|
|
|
|
|
packerBuilder packet.PackerBuilder,
|
|
|
|
|
serializer ndef.Serializer,
|
|
|
|
|
pipeline Pipeline,
|
|
|
|
|
handleFn packetFn) *Connection {
|
|
|
|
|
r := &Connection{
|
|
|
|
|
conf: conf,
|
|
|
|
|
session: session.NewSession(id),
|
|
|
|
|
pool: pool,
|
|
|
|
|
|
|
|
|
|
status: StatusStart,
|
|
|
|
|
conn: conn,
|
|
|
|
|
typ: ConnTypeTCP,
|
|
|
|
|
|
|
|
|
|
packer: packerBuilder(),
|
|
|
|
|
serializer: serializer,
|
|
|
|
|
pipeline: pipeline,
|
|
|
|
|
handleFn: handleFn,
|
|
|
|
|
|
|
|
|
|
lastMid: 0,
|
|
|
|
|
|
|
|
|
|
chDie: make(chan struct{}),
|
|
|
|
|
chSend: make(chan PendingMessage, 128),
|
|
|
|
|
chWrite: make(chan []byte, 128),
|
|
|
|
|
}
|
|
|
|
|
if _, ok := conn.(*WSConn); ok {
|
|
|
|
|
r.typ = ConnTypeWS
|
|
|
|
|
return r
|
|
|
|
|
}
|
|
|
|
|
if _, ok := conn.(*SerialConn); ok {
|
|
|
|
|
r.typ = ConnTypeSerial
|
|
|
|
|
return r
|
|
|
|
|
}
|
|
|
|
|
return r
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) Send(header, payload any) (err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if e := recover(); e != nil {
|
|
|
|
|
err = ErrBrokenPipe
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
r.chSend <- PendingMessage{
|
|
|
|
|
header: header,
|
|
|
|
|
payload: payload,
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) SendBytes(data []byte) (err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if e := recover(); e != nil {
|
|
|
|
|
err = ErrBrokenPipe
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
r.chWrite <- data
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) Status() int32 {
|
|
|
|
|
return atomic.LoadInt32(&r.status)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) SetStatus(s int32) {
|
|
|
|
|
atomic.StoreInt32(&r.status, s)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) Conn() (net.Conn, ConnType) {
|
|
|
|
|
return r.conn, r.typ
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) ID() int64 {
|
|
|
|
|
return r.session.ID()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) Session() *session.Session {
|
|
|
|
|
return r.session
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) LastMID() uint64 {
|
|
|
|
|
return r.lastMid
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) SetLastMID(mid uint64) {
|
|
|
|
|
atomic.StoreUint64(&r.lastMid, mid)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) Serve() {
|
|
|
|
|
_ = r.pool.Submit(func() {
|
|
|
|
|
r.write()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
_ = r.pool.Submit(func() {
|
|
|
|
|
r.read()
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) write() {
|
|
|
|
|
defer func() {
|
|
|
|
|
close(r.chSend)
|
|
|
|
|
close(r.chWrite)
|
|
|
|
|
_ = r.Close()
|
|
|
|
|
|
|
|
|
|
if r.conf.LogDebug {
|
|
|
|
|
nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s",
|
|
|
|
|
r.conf.LogPrefix, r.ID(), r.session.UID())
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case data := <-r.chSend:
|
|
|
|
|
// marshal packet body (data)
|
|
|
|
|
if r.serializer == nil {
|
|
|
|
|
if _, ok := data.payload.([]byte); !ok {
|
|
|
|
|
nlog.Errorf("%s serializer is nil, but payload type not []byte", r.conf.LogPrefix)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
payload, err := r.serializer.Marshal(data.payload)
|
|
|
|
|
if err != nil {
|
|
|
|
|
nlog.Errorf("%s message body marshal err: %v", r.conf.LogPrefix, err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
data.payload = payload
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// invoke pipeline
|
|
|
|
|
if pipe := r.pipeline; pipe != nil {
|
|
|
|
|
err := pipe.Outbound().Process(r, data)
|
|
|
|
|
if err != nil {
|
|
|
|
|
nlog.Errorf("%s pipeline err: %s", r.conf.LogPrefix, err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// packet pack data
|
|
|
|
|
p, err := r.packer.Pack(data.header, data.payload.([]byte))
|
|
|
|
|
if err != nil {
|
|
|
|
|
nlog.Errorf("%s pack err: %s", r.conf.LogPrefix, err.Error())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
r.chWrite <- p
|
|
|
|
|
case data := <-r.chWrite:
|
|
|
|
|
// 回写数据
|
|
|
|
|
if _, err := r.conn.Write(data); err != nil {
|
|
|
|
|
nlog.Errorf("%s write data err: %s", r.conf.LogPrefix, err.Error())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
//nlog.Debugf("write data %v", data)
|
|
|
|
|
case <-r.chDie: // connection close signal
|
|
|
|
|
return
|
|
|
|
|
// TODO
|
|
|
|
|
//case <-r.ngin.dieChan: // application quit signal
|
|
|
|
|
// return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) read() {
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = r.Close()
|
|
|
|
|
}()
|
|
|
|
|
buf := make([]byte, 4096)
|
|
|
|
|
|
|
|
|
|
var wsConn *WSConn
|
|
|
|
|
if r.typ == ConnTypeWS {
|
|
|
|
|
wsConn = r.conn.(*WSConn)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
var (
|
|
|
|
|
err error
|
|
|
|
|
n int
|
|
|
|
|
msgTyp int
|
|
|
|
|
)
|
|
|
|
|
if r.typ == ConnTypeWS {
|
|
|
|
|
var bb []byte
|
|
|
|
|
if msgTyp, bb, err = wsConn.ReadMessage(); err == nil {
|
|
|
|
|
copy(buf, bb)
|
|
|
|
|
n = len(bb)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
n, err = r.conn.Read(buf)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
|
|
|
|
|
r.conf.LogPrefix, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if n == 0 {
|
|
|
|
|
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
|
|
|
|
|
r.conf.LogPrefix)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if r.packer == nil {
|
|
|
|
|
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.conf.LogPrefix)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//nlog.Debugf("receive data %v", buf[:n])
|
|
|
|
|
// warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理
|
|
|
|
|
packets, err := r.packer.Unpack(msgTyp, buf[:n])
|
|
|
|
|
if err != nil {
|
|
|
|
|
nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error())
|
|
|
|
|
}
|
|
|
|
|
// packets 处理
|
|
|
|
|
for _, p := range packets {
|
|
|
|
|
if err := r.processPacket(p); err != nil {
|
|
|
|
|
nlog.Errorf("%s process packet err: %s", r.conf.LogPrefix, err.Error())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) processPacket(packet packet.IPacket) error {
|
|
|
|
|
if pipe := r.pipeline; pipe != nil {
|
|
|
|
|
err := pipe.Inbound().Process(r, packet)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if r.Status() == StatusWorking {
|
|
|
|
|
// 处理包消息
|
|
|
|
|
_ = r.pool.Submit(func() {
|
|
|
|
|
r.handleFn(r, packet)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) DieChan() chan struct{} {
|
|
|
|
|
return r.chDie
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Connection) Close() error {
|
|
|
|
|
if r.Status() == StatusClosed {
|
|
|
|
|
return ErrCloseClosedSession
|
|
|
|
|
}
|
|
|
|
|
r.SetStatus(StatusClosed)
|
|
|
|
|
|
|
|
|
|
if r.conf.LogDebug {
|
|
|
|
|
nlog.Debugf("%s close connection, ID: %d", r.conf.LogPrefix, r.ID())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-r.chDie:
|
|
|
|
|
default:
|
|
|
|
|
close(r.chDie)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.session.Close()
|
|
|
|
|
return r.conn.Close()
|
|
|
|
|
}
|