You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/core/connection.go

276 lines
6.0 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package core
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/scheduler"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"git.noahlan.cn/noahlan/ntools-go/core/pool"
"net"
"sync/atomic"
)
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
)
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusPending 等待工作阶段
StatusPending
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type (
connection struct {
session *session // Session
ngin *engine // engine
status int32 // 连接状态
conn net.Conn // low-level conn fd
isWS bool // 是否为websocket
packer packet.Packer // 封包、拆包器
lastMid uint64 // 最近一次消息ID
chDie chan struct{} // 停止通道
chSend chan PendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
}
PendingMessage struct {
header interface{}
payload interface{}
}
)
func newConnection(server *engine, conn net.Conn) *connection {
r := &connection{
ngin: server,
status: StatusStart,
conn: conn,
packer: server.packerFn(),
chDie: make(chan struct{}),
chSend: make(chan PendingMessage, 128),
chWrite: make(chan []byte, 128),
}
_, r.isWS = conn.(*WSConn)
// binding session
r.session = newSession(r, server.sessIdMgr.SessionID())
return r
}
func (r *connection) Send(header, payload interface{}) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chSend <- PendingMessage{
header: header,
payload: payload,
}
return err
}
func (r *connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chWrite <- data
return err
}
func (r *connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *connection) Conn() (net.Conn, bool) {
return r.conn, r.isWS
}
func (r *connection) ID() int64 {
return r.session.ID()
}
func (r *connection) Session() entity.Session {
return r.session
}
func (r *connection) LastMID() uint64 {
return r.lastMid
}
func (r *connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
}
func (r *connection) serve() {
_ = pool.Submit(func() {
r.write()
})
_ = pool.Submit(func() {
r.read()
})
}
func (r *connection) write() {
defer func() {
close(r.chSend)
close(r.chWrite)
_ = r.Close()
if r.ngin.shallLogDebug() {
nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s",
r.ngin.logPrefix(), r.ID(), r.session.UID())
}
}()
for {
select {
case data := <-r.chSend:
// marshal packet body (data)
if r.ngin.serializer == nil {
if _, ok := data.payload.([]byte); !ok {
nlog.Errorf("%s serializer is nil, but payload type not []byte", r.ngin.logPrefix())
break
}
} else {
payload, err := r.ngin.serializer.Marshal(data.payload)
if err != nil {
nlog.Errorf("%s message body marshal err: %v", r.ngin.logPrefix(), err)
break
}
data.payload = payload
}
// invoke pipeline
if pipe := r.ngin.pipeline; pipe != nil {
err := pipe.Outbound().Process(r, data)
if err != nil {
nlog.Errorf("%s pipeline err: %s", r.ngin.logPrefix(), err.Error())
}
}
// packet pack data
p, err := r.packer.Pack(data.header, data.payload.([]byte))
if err != nil {
nlog.Errorf("%s pack err: %s", r.ngin.logPrefix(), err.Error())
break
}
r.chWrite <- p
case data := <-r.chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
nlog.Errorf("%s write data err: %s", r.ngin.logPrefix(), err.Error())
break
}
//nlog.Debugf("write data %v", data)
case <-r.chDie: // connection close signal
return
case <-r.ngin.dieChan: // application quit signal
return
}
}
}
func (r *connection) read() {
defer func() {
_ = r.Close()
}()
buf := make([]byte, 4096)
for {
n, err := r.conn.Read(buf)
//nlog.Debugf("receive data %v", buf[:n])
if err != nil {
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
r.ngin.logPrefix(), err.Error())
return
}
if n == 0 {
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
r.ngin.logPrefix())
return
}
if r.packer == nil {
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.ngin.logPrefix())
return
}
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := r.packer.Unpack(buf[:n])
if err != nil {
nlog.Errorf("%s unpack err: %s", r.ngin.logPrefix(), err.Error())
}
// packets 处理
for _, p := range packets {
if err := r.processPacket(p); err != nil {
nlog.Errorf("%s process packet err: %s", r.ngin.logPrefix(), err.Error())
continue
}
}
}
}
func (r *connection) processPacket(packet packet.IPacket) error {
if pipe := r.ngin.pipeline; pipe != nil {
err := pipe.Inbound().Process(r, packet)
if err != nil {
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
}
}
if r.Status() == StatusWorking {
// HandleFunc
_ = pool.Submit(func() {
r.ngin.handler.Handle(r, packet)
})
}
return nil
}
func (r *connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
if r.ngin.shallLogDebug() {
nlog.Debugf("%s close connection, ID: %d", r.ngin.logPrefix(), r.ID())
}
select {
case <-r.chDie:
default:
close(r.chDie)
scheduler.PushTask(func() { r.ngin.lifetime.Close(r) })
}
_ = r.ngin.connManager.Remove(r)
r.session.Close()
return r.conn.Close()
}