You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/engine.go

312 lines
8.4 KiB
Go

package nnet
import (
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/event"
"git.noahlan.cn/noahlan/nnet/packet"
rt "git.noahlan.cn/noahlan/nnet/router"
"git.noahlan.cn/noahlan/nnet/session"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"net"
"time"
)
// Engine 引擎
type Engine struct {
config.EngineConf // 引擎配置
middlewares []rt.Middleware // 中间件
routes []rt.Route // 路由
router rt.Router // 消息处理器
dieChan chan struct{} // 应用程序退出信号
packerBuilder packet.PackerBuilder // 封包、拆包器
serializer ndef.Serializer // 消息 序列化/反序列化
connMgr *conn.ConnManager // 连接管理器
evtMgr event.EventManager // 事件管理器
sessIdMgr *session.IDMgr // SessionId管理器
}
func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
ngin := &Engine{
EngineConf: conf,
middlewares: make([]rt.Middleware, 0),
routes: make([]rt.Route, 0),
router: rt.NewDefaultRouter(),
dieChan: make(chan struct{}),
connMgr: conn.NewConnManager(),
evtMgr: event.NewEventManager(),
sessIdMgr: session.NewSessionIDMgr(),
packerBuilder: func() packet.Packer {
return nil
},
}
for _, opt := range opts {
opt(ngin)
}
if ngin.EngineConf.SendChannelSize == 0 {
ngin.EngineConf.SendChannelSize = config.DefaultSendChannelSize
}
if ngin.EngineConf.WriteChannelSize == 0 {
ngin.EngineConf.WriteChannelSize = config.DefaultWriteChannelSize
}
return ngin
}
func (ngin *Engine) Use(middleware ...rt.Middleware) {
ngin.middlewares = append(ngin.middlewares, middleware...)
}
func (ngin *Engine) AddRoutes(rs ...rt.Route) {
ngin.routes = append(ngin.routes, rs...)
err := ngin.bindRoutes()
nlog.Must(err)
}
func (ngin *Engine) bindRoutes() error {
for _, fr := range ngin.routes {
if err := ngin.bindRoute(fr); err != nil {
return err
}
}
return nil
}
func (ngin *Engine) bindRoute(route rt.Route) error {
// TODO default middleware
chain := rt.NewChain()
// build chain
for _, middleware := range ngin.middlewares {
chain.Append(rt.ConvertMiddleware(middleware))
}
return ngin.router.Register(route.Matches, route.Handler)
}
func (ngin *Engine) setup() error {
if err := ngin.bindRoutes(); err != nil {
return err
}
return nil
}
func (ngin *Engine) Stop() {
nlog.Infof("%s server is stopping...", ngin.LogPrefix())
ngin.connMgr.PeekConn(func(_ int64, c *conn.Connection) bool {
_ = c.Close()
return false
})
close(ngin.dieChan)
}
func (ngin *Engine) handle(rawC net.Conn) *conn.Connection {
nc := conn.NewConnection(ngin.sessIdMgr.SessionID(), rawC, ngin.SendChannelSize, ngin.WriteChannelSize)
ngin.evtMgr.OnConnected(nc)
ngin.serveConn(nc, ngin.packerBuilder())
err := ngin.connMgr.Store(conn.DefaultGroupName, nc)
nlog.Must(err)
return nc
}
func (ngin *Engine) serveConn(nc *conn.Connection, packer packet.Packer) {
go ngin.readLoop(nc, packer)
go ngin.writeLoop(nc, packer)
go func() {
select {
case <-nc.ChDie():
if ngin.ShallLogDebug() {
nlog.Debugf("%s Close connection, ID=%d, Remote=%s", ngin.LogPrefix(), nc.ID(), nc.Conn().RemoteAddr().String())
}
_ = ngin.connMgr.Remove(nc)
ngin.evtMgr.OnClose(nc)
}
}()
}
func (ngin *Engine) readLoop(nc *conn.Connection, packer packet.Packer) {
defer func() {
_ = nc.Close()
//if ngin.ShallLogDebug() {
// nlog.Debugf("%s [readLoop] connection read goroutine exit, ID=%d, UID=%s, Remote=%s",
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
//}
}()
buf := make([]byte, 4096)
for {
select {
case <-nc.ChDie(): // connection close signal
return
default:
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.ReadDeadline != 0 {
_ = nc.Conn().SetReadDeadline(time.Now().Add(ngin.ReadDeadline))
}
var (
err error
n int
msgTyp int
)
// 兼容websocket
if nc.Type() == conn.ConnTypeWS {
var bb []byte
if msgTyp, bb, err = nc.WsConn().ReadMessage(); err == nil {
copy(buf, bb)
n = len(bb)
}
} else {
n, err = nc.Conn().Read(buf)
}
if err != nil {
ngin.evtMgr.OnDisconnected(nc, err)
// TODO 断线重连 (仅限客户端)
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
ngin.LogPrefix(), err.Error())
return
}
if n == 0 {
ngin.evtMgr.OnReceiveError(nc, conn.ErrReceiveZero)
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
ngin.LogPrefix())
return
}
// 兼容websocket
if nc.Type() == conn.ConnTypeWS {
ngin.processPacket(nc, packet.NewWSPacket(msgTyp, buf[:n]))
} else {
if packer == nil {
ngin.evtMgr.OnReceiveError(nc, conn.ErrNoPacker)
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", ngin.LogPrefix())
return
}
//nlog.Debugf("receive data %v", buf[:n])
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := packer.Unpack(buf[:n])
if err != nil {
ngin.evtMgr.OnReceiveError(nc, conn.ErrUnpack)
nlog.Errorf("%s unpack err: %s", ngin.LogPrefix(), err.Error())
}
// packets 处理
for _, p := range packets {
ngin.processPacket(nc, p)
}
}
}
}
}
func (ngin *Engine) writeLoop(nc *conn.Connection, packer packet.Packer) {
defer func() {
_ = nc.Close()
//if ngin.ShallLogDebug() {
// nlog.Debugf("%s [writeLoop] connection write goroutine exit, ID=%d, UID=%s, Remote=%s",
// ngin.LogPrefix(), nc.ID(), nc.Session().UID(), nc.Conn().RemoteAddr())
//}
}()
for {
select {
case data := <-nc.ChSend():
// marshal packet body (data)
if ngin.serializer == nil {
if _, ok := data.Payload.([]byte); !ok {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendPayload)
nlog.Errorf("%s [writeLoop] serializer is nil, but payload type not []byte", ngin.LogPrefix())
break
}
} else {
payload, err := ngin.serializer.Marshal(data.Payload)
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendMarshal)
nlog.Errorf("%s [writeLoop] message body marshal err: %v", ngin.LogPrefix(), err)
break
}
data.Payload = payload
}
// 对websocket的兼容
if nc.Type() == conn.ConnTypeWS {
messageTyp, ok := data.Header.(int)
if !ok {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSendWSType)
nlog.Errorf("%s [writeLoop] websocket message type not found", ngin.LogPrefix())
break
}
// deadline
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.WriteDeadline != 0 {
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
}
err := nc.WsConn().WriteMessage(messageTyp, data.Payload.([]byte))
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
break
}
// event
ngin.evtMgr.OnSend(nc, data)
} else {
// packet pack data
if packer == nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrNoPacker)
nlog.Errorf("%s [writeLoop] unexpected error: packer is nil", ngin.LogPrefix())
break
}
p, err := packer.Pack(data.Header, data.Payload.([]byte))
if err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrPack)
nlog.Errorf("%s [writeLoop] pack err: %v", ngin.LogPrefix(), err)
break
}
nc.ChWrite() <- p
}
case data := <-nc.ChWrite():
// 回写数据
if ngin.Deadline != 0 {
_ = nc.Conn().SetDeadline(time.Now().Add(ngin.Deadline))
}
if ngin.WriteDeadline != 0 {
_ = nc.Conn().SetWriteDeadline(time.Now().Add(ngin.WriteDeadline))
}
if _, err := nc.Conn().Write(data); err != nil {
ngin.evtMgr.OnSendError(nc, data, conn.ErrSend)
nlog.Errorf("%s [writeLoop] write data err: %v", ngin.LogPrefix(), err)
break
}
// event
ngin.evtMgr.OnSend(nc, data)
//nlog.Debugf("write data %v", data)
case <-nc.ChDie(): // connection close signal
return
}
}
}
func (ngin *Engine) processPacket(nc *conn.Connection, p packet.IPacket) {
// event
ngin.evtMgr.OnReceive(nc, p)
if nc.Status() == conn.StatusWorking {
// 处理包消息
go ngin.router.Handle(nc, p)
}
}