|
|
|
|
package nnet
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/config"
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/conn"
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/event"
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/packet"
|
|
|
|
|
rt "git.noahlan.cn/noahlan/nnet/router"
|
|
|
|
|
"git.noahlan.cn/noahlan/nnet/session"
|
|
|
|
|
"git.noahlan.cn/noahlan/ntool/ndef"
|
|
|
|
|
"git.noahlan.cn/noahlan/ntool/nlog"
|
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
|
|
|
"math"
|
|
|
|
|
"net"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Engine 引擎
|
|
|
|
|
type Engine struct {
|
|
|
|
|
config.EngineConf // 引擎配置
|
|
|
|
|
middlewares []rt.Middleware // 中间件
|
|
|
|
|
routes []rt.Route // 路由
|
|
|
|
|
router rt.Router // 消息处理器
|
|
|
|
|
dieChan chan struct{} // 应用程序退出信号
|
|
|
|
|
packerBuilder packet.PackerBuilder // 封包、拆包器
|
|
|
|
|
serializer ndef.Serializer // 消息 序列化/反序列化
|
|
|
|
|
pool *ants.Pool // goroutine池
|
|
|
|
|
connMgr *conn.ConnManager // 连接管理器
|
|
|
|
|
evtMgr event.EventManager // 事件管理器
|
|
|
|
|
sessIdMgr *session.IDMgr // SessionId管理器
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
|
|
|
|
|
ngin := &Engine{
|
|
|
|
|
EngineConf: conf,
|
|
|
|
|
middlewares: make([]rt.Middleware, 0),
|
|
|
|
|
routes: make([]rt.Route, 0),
|
|
|
|
|
router: rt.NewDefaultRouter(),
|
|
|
|
|
dieChan: make(chan struct{}),
|
|
|
|
|
connMgr: conn.NewConnManager(),
|
|
|
|
|
evtMgr: event.NewEventManager(),
|
|
|
|
|
sessIdMgr: session.NewSessionIDMgr(),
|
|
|
|
|
packerBuilder: func() packet.Packer {
|
|
|
|
|
return nil
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
|
opt(ngin)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ngin.pool == nil {
|
|
|
|
|
ngin.pool, _ = ants.NewPool(math.MaxInt32)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ngin
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) Pool() *ants.Pool {
|
|
|
|
|
return ngin.pool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) Use(middleware ...rt.Middleware) {
|
|
|
|
|
ngin.middlewares = append(ngin.middlewares, middleware...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) AddRoutes(rs ...rt.Route) {
|
|
|
|
|
ngin.routes = append(ngin.routes, rs...)
|
|
|
|
|
err := ngin.bindRoutes()
|
|
|
|
|
nlog.Must(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) bindRoutes() error {
|
|
|
|
|
for _, fr := range ngin.routes {
|
|
|
|
|
if err := ngin.bindRoute(fr); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) bindRoute(route rt.Route) error {
|
|
|
|
|
// TODO default middleware
|
|
|
|
|
chain := rt.NewChain()
|
|
|
|
|
// build chain
|
|
|
|
|
for _, middleware := range ngin.middlewares {
|
|
|
|
|
chain.Append(rt.ConvertMiddleware(middleware))
|
|
|
|
|
}
|
|
|
|
|
return ngin.router.Register(route.Matches, route.Handler)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) setup() error {
|
|
|
|
|
if err := ngin.bindRoutes(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) Stop() {
|
|
|
|
|
nlog.Infof("%s server is stopping...", ngin.LogPrefix())
|
|
|
|
|
|
|
|
|
|
ngin.connMgr.PeekConn(func(_ int64, c *conn.Connection) bool {
|
|
|
|
|
_ = c.Close()
|
|
|
|
|
return false
|
|
|
|
|
})
|
|
|
|
|
close(ngin.dieChan)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) handle(rawC net.Conn) *conn.Connection {
|
|
|
|
|
nc := conn.NewConnection(ngin.sessIdMgr.SessionID(), rawC, ngin.SendChannelSize, ngin.WriteChannelSize)
|
|
|
|
|
|
|
|
|
|
ngin.evtMgr.OnConnected(nc)
|
|
|
|
|
|
|
|
|
|
ngin.serveConn(nc, ngin.packerBuilder())
|
|
|
|
|
|
|
|
|
|
err := ngin.connMgr.Store(conn.DefaultGroupName, nc)
|
|
|
|
|
nlog.Must(err)
|
|
|
|
|
return nc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) serveConn(nc *conn.Connection, packer packet.Packer) {
|
|
|
|
|
_ = ngin.pool.Submit(func() {
|
|
|
|
|
ngin.readLoop(nc, packer)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
_ = ngin.pool.Submit(func() {
|
|
|
|
|
ngin.writeLoop(nc, packer)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
_ = ngin.pool.Submit(func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-nc.ChDie():
|
|
|
|
|
if ngin.ShallLogDebug() {
|
|
|
|
|
nlog.Debugf("%s Close connection, ID=%d, Remote=%s", ngin.LogPrefix(), nc.ID(), nc.Conn().RemoteAddr().String())
|
|
|
|
|
}
|
|
|
|
|
_ = ngin.connMgr.Remove(nc)
|
|
|
|
|
ngin.evtMgr.OnClose(nc)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) readLoop(nc *conn.Connection, packer packet.Packer) {
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = nc.Close()
|
|
|
|
|
|
|
|
|
|
//if ngin.ShallLogDebug() {
|
|
|
|
|
// nlog.Debugf("%s [readLoop] connection read goroutine exit, ID=%d, UID=%s, Remote=%s",
|
|
|
|
|
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
|
|
|
|
|
//}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
buf := make([]byte, 4096)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-nc.ChDie(): // connection close signal
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
if ngin.Deadline != 0 {
|
|
|
|
|
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
|
|
|
|
|
}
|
|
|
|
|
if ngin.ReadDeadline != 0 {
|
|
|
|
|
_ = nc.Conn().SetReadDeadline(time.Now().Add(ngin.ReadDeadline))
|
|
|
|
|
}
|
|
|
|
|
var (
|
|
|
|
|
err error
|
|
|
|
|
n int
|
|
|
|
|
msgTyp int
|
|
|
|
|
)
|
|
|
|
|
// 兼容websocket
|
|
|
|
|
if nc.Type() == conn.ConnTypeWS {
|
|
|
|
|
var bb []byte
|
|
|
|
|
if msgTyp, bb, err = nc.WsConn().ReadMessage(); err == nil {
|
|
|
|
|
copy(buf, bb)
|
|
|
|
|
n = len(bb)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
n, err = nc.Conn().Read(buf)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
ngin.evtMgr.OnDisconnected(nc, err)
|
|
|
|
|
// TODO 断线重连 (仅限客户端)
|
|
|
|
|
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
|
|
|
|
|
ngin.LogPrefix(), err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if n == 0 {
|
|
|
|
|
ngin.evtMgr.OnReceiveError(nc, conn.ErrReceiveZero)
|
|
|
|
|
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
|
|
|
|
|
ngin.LogPrefix())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 兼容websocket
|
|
|
|
|
if nc.Type() == conn.ConnTypeWS {
|
|
|
|
|
ngin.processPacket(nc, packet.NewWSPacket(msgTyp, buf[:n]))
|
|
|
|
|
} else {
|
|
|
|
|
if packer == nil {
|
|
|
|
|
ngin.evtMgr.OnReceiveError(nc, conn.ErrNoPacker)
|
|
|
|
|
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", ngin.LogPrefix())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//nlog.Debugf("receive data %v", buf[:n])
|
|
|
|
|
// warning: 为性能考虑,复用slice处理数据,buf传入后必须要copy再处理
|
|
|
|
|
packets, err := packer.Unpack(buf[:n])
|
|
|
|
|
if err != nil {
|
|
|
|
|
ngin.evtMgr.OnReceiveError(nc, conn.ErrUnpack)
|
|
|
|
|
nlog.Errorf("%s unpack err: %s", ngin.LogPrefix(), err.Error())
|
|
|
|
|
}
|
|
|
|
|
// packets 处理
|
|
|
|
|
for _, p := range packets {
|
|
|
|
|
ngin.processPacket(nc, p)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) writeLoop(nc *conn.Connection, packer packet.Packer) {
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = nc.Close()
|
|
|
|
|
|
|
|
|
|
//if ngin.ShallLogDebug() {
|
|
|
|
|
// nlog.Debugf("%s [writeLoop] connection write goroutine exit, ID=%d, UID=%s, Remote=%s",
|
|
|
|
|
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
|
|
|
|
|
//}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case data := <-nc.ChSend():
|
|
|
|
|
// marshal packet body (data)
|
|
|
|
|
if ngin.serializer == nil {
|
|
|
|
|
if _, ok := data.Payload.([]byte); !ok {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendPayload)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] serializer is nil, but payload type not []byte", ngin.LogPrefix())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
payload, err := ngin.serializer.Marshal(data.Payload)
|
|
|
|
|
if err != nil {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendMarshal)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] message body marshal err: %v", ngin.LogPrefix(), err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
data.Payload = payload
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 对websocket的兼容
|
|
|
|
|
if nc.Type() == conn.ConnTypeWS {
|
|
|
|
|
messageTyp, ok := data.Header.(int)
|
|
|
|
|
if !ok {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendWSType)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] websocket message type not found", ngin.LogPrefix())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// deadline
|
|
|
|
|
if ngin.Deadline != 0 {
|
|
|
|
|
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
|
|
|
|
|
}
|
|
|
|
|
if ngin.WriteDeadline != 0 {
|
|
|
|
|
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
|
|
|
|
|
}
|
|
|
|
|
err := nc.WsConn().WriteMessage(messageTyp, data.Payload.([]byte))
|
|
|
|
|
if err != nil {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// event
|
|
|
|
|
ngin.evtMgr.OnSend(nc, data)
|
|
|
|
|
} else {
|
|
|
|
|
// packet pack data
|
|
|
|
|
if packer == nil {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrNoPacker)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] unexpected error: packer is nil", ngin.LogPrefix())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
p, err := packer.Pack(data.Header, data.Payload.([]byte))
|
|
|
|
|
if err != nil {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrPack)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] pack err: %v", ngin.LogPrefix(), err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
nc.ChWrite() <- p
|
|
|
|
|
}
|
|
|
|
|
case data := <-nc.ChWrite():
|
|
|
|
|
// 回写数据
|
|
|
|
|
if ngin.Deadline != 0 {
|
|
|
|
|
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
|
|
|
|
|
}
|
|
|
|
|
if ngin.WriteDeadline != 0 {
|
|
|
|
|
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
|
|
|
|
|
}
|
|
|
|
|
if _, err := nc.Conn().Write(data); err != nil {
|
|
|
|
|
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
|
|
|
|
|
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// event
|
|
|
|
|
ngin.evtMgr.OnSend(nc, data)
|
|
|
|
|
|
|
|
|
|
//nlog.Debugf("write data %v", data)
|
|
|
|
|
case <-nc.ChDie(): // connection close signal
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ngin *Engine) processPacket(nc *conn.Connection, p packet.IPacket) {
|
|
|
|
|
// event
|
|
|
|
|
ngin.evtMgr.OnReceive(nc, p)
|
|
|
|
|
|
|
|
|
|
if nc.Status() == conn.StatusWorking {
|
|
|
|
|
// 处理包消息
|
|
|
|
|
_ = ngin.pool.Submit(func() {
|
|
|
|
|
ngin.router.Handle(nc, p)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|