You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/core/connection.go

284 lines
6.0 KiB
Go

package core
import (
"errors"
"fmt"
"git.noahlan.cn/northlan/nnet/internal/log"
"git.noahlan.cn/northlan/nnet/internal/packet"
"git.noahlan.cn/northlan/nnet/internal/pool"
"git.noahlan.cn/northlan/nnet/scheduler"
"git.noahlan.cn/northlan/nnet/session"
"net"
"sync/atomic"
"time"
)
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
// ErrBufferExceed indicates that the current session buffer is full and
// can not receive more data.
ErrBufferExceed = errors.New("session send buffer exceed")
)
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
StatusPrepare
// StatusPending 等待工作阶段
StatusPending
// StatusWorking 工作阶段
StatusWorking
// StatusClosed 连接关闭
StatusClosed
)
type (
Connection struct {
session *session.Session // Session
ngin *engine // engine
conn net.Conn // low-level conn fd
status int32 // 连接状态
lastMid uint64 // 最近一次消息ID
// TODO 考虑独立出去作为一个中间件
lastHeartbeatAt int64 // 最近一次心跳时间
chDie chan struct{} // 停止通道
chSend chan pendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
}
pendingMessage struct {
header interface{}
payload interface{}
}
)
func newConn(server *engine, conn net.Conn) *Connection {
r := &Connection{
conn: conn,
ngin: server,
status: StatusStart,
lastHeartbeatAt: time.Now().Unix(),
chDie: make(chan struct{}),
chSend: make(chan pendingMessage, 128),
chWrite: make(chan []byte, 128),
}
// binding session
r.session = session.NewSession()
return r
}
func (r *Connection) Send(header, payload interface{}) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chSend <- pendingMessage{
header: header,
payload: payload,
}
return err
}
func (r *Connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
}
}()
r.chWrite <- data
return err
}
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
}
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
}
func (r *Connection) Conn() net.Conn {
return r.conn
}
func (r *Connection) ID() int64 {
return r.session.ID()
}
func (r *Connection) SetLastHeartbeatAt(t int64) {
atomic.StoreInt64(&r.lastHeartbeatAt, t)
}
func (r *Connection) Session() *session.Session {
return r.session
}
func (r *Connection) LastMID() uint64 {
return r.lastMid
}
func (r *Connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
}
func (r *Connection) serve() {
_ = pool.SubmitConn(func() {
r.write()
})
_ = pool.SubmitWorker(func() {
r.read()
})
}
func (r *Connection) write() {
ticker := time.NewTicker(r.ngin.heartbeatInterval)
defer func() {
ticker.Stop()
close(r.chSend)
close(r.chWrite)
_ = r.Close()
log.Debugf("Connection write goroutine exit, ConnID=%d, SessionUID=%s", r.ID(), r.session.UID())
}()
for {
select {
case <-ticker.C:
// TODO heartbeat enable control
deadline := time.Now().Add(-2 * r.ngin.heartbeatInterval).Unix()
if atomic.LoadInt64(&r.lastHeartbeatAt) < deadline {
log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&r.lastHeartbeatAt), deadline)
return
}
// TODO heartbeat data
r.chWrite <- []byte{}
case data := <-r.chSend:
// marshal packet body (data)
if r.ngin.serializer == nil {
if _, ok := data.payload.([]byte); !ok {
log.Errorf("serializer is nil, but payload type not []byte")
break
}
} else {
payload, err := r.ngin.serializer.Marshal(data.payload)
if err != nil {
log.Errorf("message body marshal err: %v", err)
break
}
data.payload = payload
}
// invoke pipeline
if pipe := r.ngin.pipeline; pipe != nil {
err := pipe.Outbound().Process(r, data)
if err != nil {
log.Errorf("broken pipeline err: %s", err.Error())
break
}
}
// packet pack data
p, err := r.ngin.packer.Pack(data.header, data.payload.([]byte))
if err != nil {
log.Error(err.Error())
break
}
r.chWrite <- p
case data := <-r.chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
log.Error(err.Error())
return
}
case <-r.chDie: // connection close signal
return
case <-r.ngin.dieChan: // application quit signal
return
}
}
}
func (r *Connection) read() {
defer func() {
r.Close()
}()
buf := make([]byte, 4096)
for {
n, err := r.conn.Read(buf)
if err != nil {
log.Errorf("Read message error: %s, session will be closed immediately", err.Error())
return
}
if r.ngin.packer == nil {
log.Errorf("unexpected error: packer is nil")
return
}
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := r.ngin.packer.Unpack(buf[:n])
if err != nil {
log.Error(err.Error())
}
// packets 处理
for _, p := range packets {
if err := r.processPacket(p); err != nil {
log.Error(err.Error())
continue
}
}
}
}
func (r *Connection) processPacket(packet packet.IPacket) error {
if pipe := r.ngin.pipeline; pipe != nil {
err := pipe.Inbound().Process(r, packet)
if err != nil {
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
}
}
// packet processor
err := r.ngin.processor.Process(r, packet)
if err != nil {
return err
}
if r.Status() == StatusWorking {
// HandleFunc
_ = pool.SubmitWorker(func() {
r.ngin.handler.Handle(r, packet)
})
}
return err
}
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
}
r.SetStatus(StatusClosed)
log.Debugf("close connection, ID: %d", r.ID())
select {
case <-r.chDie:
default:
close(r.chDie)
scheduler.PushTask(func() { Lifetime.Close(r) })
}
return r.conn.Close()
}