You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/core/engine.go

250 lines
5.5 KiB
Go

package core
import (
"errors"
"git.noahlan.cn/northlan/nnet/internal/pool"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/packet"
"git.noahlan.cn/northlan/nnet/scheduler"
"git.noahlan.cn/northlan/nnet/serialize"
"git.noahlan.cn/northlan/nnet/session"
"github.com/gorilla/websocket"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
func NotFound(conn *Connection, packet packet.IPacket) {
log.Error("handler not found")
conn.SendBytes([]byte("handler not found"))
}
func NotFoundHandler() Handler {
return HandlerFunc(NotFound)
}
type (
// engine TCP-engine
engine struct {
conf EngineConf // conf 配置
middlewares []Middleware // 中间件
routes []Route // 路由
// handler 消息处理器
handler Handler
// dieChan 应用程序退出信号
dieChan chan struct{}
// sessionMgr session管理器
sessionMgr *session.Manager
pipeline Pipeline // 消息管道
packer packet.Packer // 封包、拆包器
processor Processor // 数据包处理器
serializer serialize.Serializer // 消息 序列化/反序列化
retryInterval time.Duration // 消息重试间隔时长
heartbeatInterval time.Duration // 心跳间隔0表示不进行心跳
wsOpt wsOptions // websocket
}
wsOptions struct {
IsWebsocket bool // 是否为websocket服务端
WebsocketPath string // ws地址(ws://127.0.0.1/WebsocketPath)
TLSCertificate string // TLS 证书地址 (websocket)
TLSKey string // TLS 证书key地址 (websocket)
CheckOrigin func(*http.Request) bool // check origin
}
)
func newEngine(conf EngineConf) *engine {
s := &engine{
conf: conf,
dieChan: make(chan struct{}),
sessionMgr: session.NewSessionMgr(),
}
pool.InitPool(10000)
return s
}
func (ng *engine) use(middleware Middleware) {
ng.middlewares = append(ng.middlewares, middleware)
}
func (ng *engine) addRoutes(route ...Route) {
ng.routes = append(ng.routes, route...)
}
func (ng *engine) bindRoutes(router Router) error {
for _, fr := range ng.routes {
if err := ng.bindRoute(router, fr); err != nil {
return err
}
}
return nil
}
func (ng *engine) bindRoute(router Router, route Route) error {
// TODO default middleware
chain := newChain()
// build chain
for _, middleware := range ng.middlewares {
chain.Append(convertMiddleware(middleware))
}
return router.Register(route.Matches, route.Handler)
}
func convertMiddleware(ware Middleware) func(Handler) Handler {
return func(next Handler) Handler {
return ware(next.Handle)
}
}
func (ng *engine) serve(router Router) error {
ng.handler = router
if err := ng.bindRoutes(router); err != nil {
return err
}
go func() {
if ng.wsOpt.IsWebsocket {
if len(ng.wsOpt.TLSCertificate) != 0 && len(ng.wsOpt.TLSKey) != 0 {
ng.listenAndServeWSTLS()
} else {
ng.listenAndServeWS()
}
} else {
ng.listenAndServe()
}
}()
go scheduler.Schedule()
sg := make(chan os.Signal)
signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM)
select {
case <-ng.dieChan:
log.Info("Server will shutdown in a few seconds")
case s := <-sg:
log.Infof("Server got signal: %ng", s)
}
log.Info("Server is stopping...")
ng.shutdown()
scheduler.Close()
return nil
}
func (ng *engine) close() {
close(ng.dieChan)
}
func (ng *engine) shutdown() {
}
func (ng *engine) listenAndServe() {
listener, err := net.Listen(ng.conf.Protocol, ng.conf.Addr)
if err != nil {
panic(err)
}
// 监听成功,服务已启动
log.Infof("now listening %s on %s.", ng.conf.Protocol, ng.conf.Addr)
defer func() {
listener.Close()
ng.close()
}()
for {
conn, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
log.Errorf("服务器网络错误 %+v", err)
return
}
log.Errorf("监听错误 %v", err)
continue
}
err = pool.SubmitConn(func() {
ng.handle(conn)
})
if err != nil {
log.Errorf("submit conn pool err: %ng", err.Error())
continue
}
}
}
func (ng *engine) listenAndServeWS() {
ng.setupWS()
if err := http.ListenAndServe(ng.conf.Addr, nil); err != nil {
log.Fatal(err.Error())
}
}
func (ng *engine) listenAndServeWSTLS() {
ng.setupWS()
if err := http.ListenAndServeTLS(ng.conf.Addr, ng.wsOpt.TLSCertificate, ng.wsOpt.TLSKey, nil); err != nil {
log.Fatal(err.Error())
}
}
func (ng *engine) setupWS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: ng.wsOpt.CheckOrigin,
}
http.HandleFunc("/"+strings.TrimPrefix(ng.wsOpt.WebsocketPath, "/"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
log.Errorf("Upgrade failure, URI=%ng, Error=%ng", r.RequestURI, err.Error())
return
}
err = pool.SubmitConn(func() {
ng.handleWS(conn)
})
if err != nil {
log.Fatalf("submit conn pool err: %ng", err.Error())
}
})
}
func (ng *engine) handleWS(conn *websocket.Conn) {
c, err := newWSConn(conn)
if err != nil {
log.Error(err)
return
}
ng.handle(c)
}
func (ng *engine) handle(conn net.Conn) {
connection := newConn(ng, conn)
ng.sessionMgr.StoreSession(connection.Session())
connection.serve()
// hook
}
func (ng *engine) notFoundHandler(next Handler) Handler {
return HandlerFunc(func(conn *Connection, packet packet.IPacket) {
h := next
if next == nil {
h = NotFoundHandler()
}
// TODO write to client
h.Handle(conn, packet)
})
}