You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/connection/connection.go

319 lines
6.8 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package connection
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/session"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/panjf2000/ants/v2"
"net"
"sync/atomic"
)
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
)
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusPending 等待工作阶段
StatusPending
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type ConnType int
const (
ConnTypeTCP ConnType = iota // TCP connection
ConnTypeWS // Websocket connection
ConnTypeSerial // Websocket connection
)
type (
Connection struct {
conf Config // 配置
session *session.Session // Session
pool *ants.Pool // 连接池
status int32 // 连接状态
conn net.Conn // low-level conn fd
typ ConnType // 连接类型
packer packet.Packer // 封包、拆包器
serializer ndef.Serializer // 消息序列化/反序列化器
pipeline Pipeline // 连接生命周期管理
handleFn func(conn *Connection, pkg packet.IPacket) // 消息处理方法
lastMid uint64 // 最近一次消息ID
chDie chan struct{} // 停止通道
chSend chan PendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
}
packetFn func(conn *Connection, pkg packet.IPacket)
Config struct {
LogDebug bool
LogPrefix string
}
PendingMessage struct {
header any
payload any
}
)
func NewConnection(
id int64,
conn net.Conn,
pool *ants.Pool,
conf Config,
packerBuilder packet.PackerBuilder,
serializer ndef.Serializer,
pipeline Pipeline,
handleFn packetFn) *Connection {
r := &Connection{
conf: conf,
session: session.NewSession(id),
pool: pool,
status: StatusStart,
conn: conn,
typ: ConnTypeTCP,
packer: packerBuilder(),
serializer: serializer,
pipeline: pipeline,
handleFn: handleFn,
lastMid: 0,
chDie: make(chan struct{}),
chSend: make(chan PendingMessage, 128),
chWrite: make(chan []byte, 128),
}
if _, ok := conn.(*WSConn); ok {
r.typ = ConnTypeWS
return r
}
if _, ok := conn.(*SerialConn); ok {
r.typ = ConnTypeSerial
return r
}
return r
}
func (r *Connection) Send(header, payload any) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chSend <- PendingMessage{
header: header,
payload: payload,
}
return err
}
func (r *Connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chWrite <- data
return err
}
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *Connection) Conn() (net.Conn, ConnType) {
return r.conn, r.typ
}
func (r *Connection) ID() int64 {
return r.session.ID()
}
func (r *Connection) Session() *session.Session {
return r.session
}
func (r *Connection) LastMID() uint64 {
return r.lastMid
}
func (r *Connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
}
func (r *Connection) Serve() {
_ = r.pool.Submit(func() {
r.write()
})
_ = r.pool.Submit(func() {
r.read()
})
}
func (r *Connection) write() {
defer func() {
close(r.chSend)
close(r.chWrite)
_ = r.Close()
if r.conf.LogDebug {
nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s",
r.conf.LogPrefix, r.ID(), r.session.UID())
}
}()
for {
select {
case data := <-r.chSend:
// marshal packet body (data)
if r.serializer == nil {
if _, ok := data.payload.([]byte); !ok {
nlog.Errorf("%s serializer is nil, but payload type not []byte", r.conf.LogPrefix)
break
}
} else {
payload, err := r.serializer.Marshal(data.payload)
if err != nil {
nlog.Errorf("%s message body marshal err: %v", r.conf.LogPrefix, err)
break
}
data.payload = payload
}
// invoke pipeline
if pipe := r.pipeline; pipe != nil {
err := pipe.Outbound().Process(r, data)
if err != nil {
nlog.Errorf("%s pipeline err: %s", r.conf.LogPrefix, err.Error())
}
}
// packet pack data
p, err := r.packer.Pack(data.header, data.payload.([]byte))
if err != nil {
nlog.Errorf("%s pack err: %s", r.conf.LogPrefix, err.Error())
break
}
r.chWrite <- p
case data := <-r.chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
nlog.Errorf("%s write data err: %s", r.conf.LogPrefix, err.Error())
break
}
//nlog.Debugf("write data %v", data)
case <-r.chDie: // connection close signal
return
// TODO
//case <-r.ngin.dieChan: // application quit signal
// return
}
}
}
func (r *Connection) read() {
defer func() {
_ = r.Close()
}()
buf := make([]byte, 4096)
for {
n, err := r.conn.Read(buf)
//nlog.Debugf("receive data %v", buf[:n])
if err != nil {
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
r.conf.LogPrefix, err.Error())
return
}
if n == 0 {
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
r.conf.LogPrefix)
return
}
if r.packer == nil {
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.conf.LogPrefix)
return
}
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := r.packer.Unpack(buf[:n])
if err != nil {
nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error())
}
// packets 处理
for _, p := range packets {
if err := r.processPacket(p); err != nil {
nlog.Errorf("%s process packet err: %s", r.conf.LogPrefix, err.Error())
continue
}
}
}
}
func (r *Connection) processPacket(packet packet.IPacket) error {
if pipe := r.pipeline; pipe != nil {
err := pipe.Inbound().Process(r, packet)
if err != nil {
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
}
}
if r.Status() == StatusWorking {
// 处理包消息
_ = r.pool.Submit(func() {
r.handleFn(r, packet)
})
}
return nil
}
func (r *Connection) DieChan() chan struct{} {
return r.chDie
}
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
if r.conf.LogDebug {
nlog.Debugf("%s close connection, ID: %d", r.conf.LogPrefix, r.ID())
}
select {
case <-r.chDie:
default:
close(r.chDie)
}
r.session.Close()
return r.conn.Close()
}