You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/core/connection.go

288 lines
6.1 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package core
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/nnet/internal/pool"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/scheduler"
"git.noahlan.cn/noahlan/nnet/session"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"net"
"sync/atomic"
"time"
)
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
// ErrBufferExceed indicates that the current session buffer is full and
// can not receive more data.
ErrBufferExceed = errors.New("session send buffer exceed")
)
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusPending 等待工作阶段
StatusPending
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type (
Connection struct {
session *session.Session // Session
ngin *engine // engine
status int32 // 连接状态
conn net.Conn // low-level conn fd
packer packet.Packer // 封包、拆包器
lastMid uint64 // 最近一次消息ID
// TODO 考虑独立出去作为一个中间件
lastHeartbeatAt int64 // 最近一次心跳时间
chDie chan struct{} // 停止通道
chSend chan pendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
}
pendingMessage struct {
header interface{}
payload interface{}
}
)
func newConn(server *engine, conn net.Conn) *Connection {
r := &Connection{
ngin: server,
status: StatusStart,
conn: conn,
packer: server.packerFn(),
lastHeartbeatAt: time.Now().Unix(),
chDie: make(chan struct{}),
chSend: make(chan pendingMessage, 128),
chWrite: make(chan []byte, 128),
}
// binding session
r.session = session.NewSession()
return r
}
func (r *Connection) Send(header, payload interface{}) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chSend <- pendingMessage{
header: header,
payload: payload,
}
return err
}
func (r *Connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chWrite <- data
return err
}
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *Connection) Conn() net.Conn {
return r.conn
}
func (r *Connection) ID() int64 {
return r.session.ID()
}
func (r *Connection) SetLastHeartbeatAt(t int64) {
atomic.StoreInt64(&r.lastHeartbeatAt, t)
}
func (r *Connection) Session() *session.Session {
return r.session
}
func (r *Connection) LastMID() uint64 {
return r.lastMid
}
func (r *Connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
}
func (r *Connection) serve() {
_ = pool.SubmitConn(func() {
r.write()
})
_ = pool.SubmitWorker(func() {
r.read()
})
}
func (r *Connection) write() {
ticker := time.NewTicker(r.ngin.heartbeatInterval)
defer func() {
ticker.Stop()
close(r.chSend)
close(r.chWrite)
_ = r.Close()
nlog.Debugf("Connection write goroutine exit, ConnID=%d, SessionUID=%s", r.ID(), r.session.UID())
}()
for {
select {
case <-ticker.C:
// TODO heartbeat enable control
deadline := time.Now().Add(-2 * r.ngin.heartbeatInterval).Unix()
if atomic.LoadInt64(&r.lastHeartbeatAt) < deadline {
nlog.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&r.lastHeartbeatAt), deadline)
return
}
// TODO heartbeat data
r.chWrite <- []byte{}
case data := <-r.chSend:
// marshal packet body (data)
if r.ngin.serializer == nil {
if _, ok := data.payload.([]byte); !ok {
nlog.Errorf("serializer is nil, but payload type not []byte")
break
}
} else {
payload, err := r.ngin.serializer.Marshal(data.payload)
if err != nil {
nlog.Errorf("message body marshal err: %v", err)
break
}
data.payload = payload
}
// invoke pipeline
if pipe := r.ngin.pipeline; pipe != nil {
err := pipe.Outbound().Process(r, data)
if err != nil {
nlog.Errorf("broken pipeline err: %s", err.Error())
break
}
}
// packet pack data
p, err := r.packer.Pack(data.header, data.payload.([]byte))
if err != nil {
nlog.Error(err.Error())
break
}
r.chWrite <- p
case data := <-r.chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
nlog.Error(err.Error())
return
}
case <-r.chDie: // connection close signal
return
case <-r.ngin.dieChan: // application quit signal
return
}
}
}
func (r *Connection) read() {
defer func() {
r.Close()
}()
buf := make([]byte, 4096)
for {
n, err := r.conn.Read(buf)
if err != nil {
nlog.Errorf("Read message error: %s, session will be closed immediately", err.Error())
return
}
if r.packer == nil {
nlog.Errorf("unexpected error: packer is nil")
return
}
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := r.packer.Unpack(buf[:n])
if err != nil {
nlog.Error(err.Error())
}
// packets 处理
for _, p := range packets {
if err := r.processPacket(p); err != nil {
nlog.Error(err.Error())
continue
}
}
}
}
func (r *Connection) processPacket(packet packet.IPacket) error {
if pipe := r.ngin.pipeline; pipe != nil {
err := pipe.Inbound().Process(r, packet)
if err != nil {
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
}
}
// packet processor
err := r.ngin.processor.Process(r, packet)
if err != nil {
return err
}
if r.Status() == StatusWorking {
// HandleFunc
_ = pool.SubmitWorker(func() {
r.ngin.handler.Handle(r, packet)
})
}
return err
}
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
nlog.Debugf("close connection, ID: %d", r.ID())
select {
case <-r.chDie:
default:
close(r.chDie)
scheduler.PushTask(func() { Lifetime.Close(r) })
}
return r.conn.Close()
}