You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/engine.go

323 lines
8.5 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package nnet
import (
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/nnet/session"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/panjf2000/ants/v2"
"math"
"net"
"time"
)
// Engine 引擎
type Engine struct {
config.EngineConf // 引擎配置
middlewares []rt.Middleware // 中间件
routes []rt.Route // 路由
router rt.Router // 消息处理器
dieChan chan struct{} // 应用程序退出信号
packerBuilder packet.PackerBuilder // 封包、拆包器
serializer ndef.Serializer // 消息 序列化/反序列化
pool *ants.Pool // goroutine池
connMgr *conn.ConnManager // 连接管理器
evtMgr event.EventManager // 事件管理器
sessIdMgr *session.IDMgr // SessionId管理器
}
func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
ngin := &Engine{
EngineConf: conf,
middlewares: make([]rt.Middleware, 0),
routes: make([]rt.Route, 0),
router: rt.NewDefaultRouter(),
dieChan: make(chan struct{}),
connMgr: conn.NewConnManager(),
sessIdMgr: session.NewSessionIDMgr(),
packerBuilder: func() packet.Packer {
return nil
},
}
for _, opt := range opts {
opt(ngin)
}
if ngin.pool == nil {
ngin.pool, _ = ants.NewPool(math.MaxInt32)
}
ngin.evtMgr = event.NewEventManager(ngin.pool)
return ngin
}
func (ngin *Engine) Pool() *ants.Pool {
return ngin.pool
}
func (ngin *Engine) Use(middleware ...rt.Middleware) {
ngin.middlewares = append(ngin.middlewares, middleware...)
}
func (ngin *Engine) AddRoutes(rs ...rt.Route) {
ngin.routes = append(ngin.routes, rs...)
err := ngin.bindRoutes()
nlog.Must(err)
}
func (ngin *Engine) bindRoutes() error {
for _, fr := range ngin.routes {
if err := ngin.bindRoute(fr); err != nil {
return err
}
}
return nil
}
func (ngin *Engine) bindRoute(route rt.Route) error {
// TODO default middleware
chain := rt.NewChain()
// build chain
for _, middleware := range ngin.middlewares {
chain.Append(rt.ConvertMiddleware(middleware))
}
return ngin.router.Register(route.Matches, route.Handler)
}
func (ngin *Engine) setup() error {
if err := ngin.bindRoutes(); err != nil {
return err
}
return nil
}
func (ngin *Engine) Stop() {
nlog.Infof("%s server is stopping...", ngin.LogPrefix())
ngin.connMgr.PeekConn(func(_ int64, c *conn.Connection) bool {
_ = c.Close()
return false
})
close(ngin.dieChan)
}
func (ngin *Engine) handle(rawC net.Conn) *conn.Connection {
nc := conn.NewConnection(ngin.sessIdMgr.SessionID(), rawC)
ngin.evtMgr.OnConnected(nc)
ngin.serveConn(nc, ngin.packerBuilder())
err := ngin.connMgr.Store(conn.DefaultGroupName, nc)
nlog.Must(err)
return nc
}
func (ngin *Engine) serveConn(nc *conn.Connection, packer packet.Packer) {
_ = ngin.pool.Submit(func() {
ngin.readLoop(nc, packer)
})
_ = ngin.pool.Submit(func() {
ngin.writeLoop(nc, packer)
})
_ = ngin.pool.Submit(func() {
select {
case <-nc.ChDie():
if ngin.ShallLogDebug() {
nlog.Debugf("%s Close connection, ID=%d, Remote=%s", ngin.LogPrefix(), nc.ID(), nc.Conn().RemoteAddr().String())
}
_ = ngin.connMgr.Remove(nc)
ngin.evtMgr.OnClose(nc)
}
})
}
func (ngin *Engine) readLoop(nc *conn.Connection, packer packet.Packer) {
defer func() {
_ = nc.Close()
//if ngin.ShallLogDebug() {
// nlog.Debugf("%s [readLoop] connection read goroutine exit, ID=%d, UID=%s, Remote=%s",
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
//}
}()
buf := make([]byte, 4096)
for {
select {
case <-nc.ChDie(): // connection close signal
return
default:
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.ReadDeadline != 0 {
_ = nc.Conn().SetReadDeadline(time.Now().Add(ngin.ReadDeadline))
}
var (
err error
n int
msgTyp int
)
// 兼容websocket
if nc.Type() == conn.ConnTypeWS {
var bb []byte
if msgTyp, bb, err = nc.WsConn().ReadMessage(); err == nil {
copy(buf, bb)
n = len(bb)
}
} else {
n, err = nc.Conn().Read(buf)
}
if err != nil {
ngin.evtMgr.OnDisconnected(nc, err)
// TODO 断线重连 (仅限客户端)
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
ngin.LogPrefix(), err.Error())
return
}
if n == 0 {
ngin.evtMgr.OnReceiveError(nc, conn.ErrReceiveZero)
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
ngin.LogPrefix())
return
}
// 兼容websocket
if nc.Type() == conn.ConnTypeWS {
ngin.processPacket(nc, packet.NewWSPacket(msgTyp, buf[:n]))
} else {
if packer == nil {
ngin.evtMgr.OnReceiveError(nc, conn.ErrNoPacker)
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", ngin.LogPrefix())
return
}
//nlog.Debugf("receive data %v", buf[:n])
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := packer.Unpack(buf[:n])
if err != nil {
ngin.evtMgr.OnReceiveError(nc, conn.ErrUnpack)
nlog.Errorf("%s unpack err: %s", ngin.LogPrefix(), err.Error())
}
// packets 处理
for _, p := range packets {
ngin.processPacket(nc, p)
}
}
}
}
}
func (ngin *Engine) writeLoop(nc *conn.Connection, packer packet.Packer) {
defer func() {
_ = nc.Close()
//if ngin.ShallLogDebug() {
// nlog.Debugf("%s [writeLoop] connection write goroutine exit, ID=%d, UID=%s, Remote=%s",
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
//}
}()
for {
select {
case data := <-nc.ChSend():
// marshal packet body (data)
if ngin.serializer == nil {
if _, ok := data.Payload.([]byte); !ok {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendPayload)
nlog.Errorf("%s [writeLoop] serializer is nil, but payload type not []byte", ngin.LogPrefix())
break
}
} else {
payload, err := ngin.serializer.Marshal(data.Payload)
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendMarshal)
nlog.Errorf("%s [writeLoop] message body marshal err: %v", ngin.LogPrefix(), err)
break
}
data.Payload = payload
}
// 对websocket的兼容
if nc.Type() == conn.ConnTypeWS {
messageTyp, ok := data.Header.(int)
if !ok {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendWSType)
nlog.Errorf("%s [writeLoop] websocket message type not found", ngin.LogPrefix())
break
}
// deadline
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.WriteDeadline != 0 {
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
}
err := nc.WsConn().WriteMessage(messageTyp, data.Payload.([]byte))
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
break
}
// event
ngin.evtMgr.OnSend(nc, data)
} else {
// packet pack data
if packer == nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrNoPacker)
nlog.Errorf("%s [writeLoop] unexpected error: packer is nil", ngin.LogPrefix())
break
}
p, err := packer.Pack(data.Header, data.Payload.([]byte))
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrPack)
nlog.Errorf("%s [writeLoop] pack err: %v", ngin.LogPrefix(), err)
break
}
nc.ChWrite() <- p
}
case data := <-nc.ChWrite():
// 回写数据
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.WriteDeadline != 0 {
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
}
if _, err := nc.Conn().Write(data); err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
break
}
// event
ngin.evtMgr.OnSend(nc, data)
//nlog.Debugf("write data %v", data)
case <-nc.ChDie(): // connection close signal
return
}
}
}
func (ngin *Engine) processPacket(nc *conn.Connection, p packet.IPacket) {
// event
ngin.evtMgr.OnReceive(nc, p)
if nc.Status() == conn.StatusWorking {
// 处理包消息
_ = ngin.pool.Submit(func() {
ngin.router.Handle(nc, p)
})
}
}