package nnet import ( "errors" "fmt" "git.noahlan.cn/northlan/nnet/component" "git.noahlan.cn/northlan/nnet/log" "git.noahlan.cn/northlan/nnet/pipeline" "git.noahlan.cn/northlan/nnet/session" "github.com/gorilla/websocket" "net" "net/http" "os" "os/signal" "syscall" "time" ) type ( Options struct { Name string // 服务端名,默认为n-net Pipeline pipeline.Pipeline // 消息管道 RetryInterval time.Duration // 消息重试间隔时长 Components *component.Components // 组件库 HeartbeatInterval time.Duration // 心跳间隔,0表示不进行心跳 WS WSOptions // websocket } WSOptions struct { IsWebsocket bool // 是否为websocket服务端 WebsocketPath string // ws地址(ws://127.0.0.1/WebsocketPath) TLSCertificate string // TLS 证书地址 (websocket) TLSKey string // TLS 证书key地址 (websocket) } ) // Server TCP-Server type Server struct { // Options 参数列表 Options // protocol 协议名 // "tcp", "tcp4", "tcp6", "unix" or "unixpacket" // 若只想开启IPv4, 使用tcp4即可 protocol string // address 服务地址 // 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP // 如果端口号不填或端口号为0,例如:"127.0.0.1:" 或 ":0",服务端将选择随机可用端口 address string // handler 消息处理器 handler *Handler // sessionMgr session管理器 sessionMgr *session.Manager } func NewServer(protocol, addr string, opts ...Option) *Server { options := Options{ Components: &component.Components{}, WS: WSOptions{}, } s := &Server{ Options: options, protocol: protocol, address: addr, } for _, opt := range opts { opt(&s.Options) } s.handler = NewHandler() s.sessionMgr = session.NewManager() initPool(0) return s } func (s *Server) Serve() { components := s.Components.List() for _, c := range components { err := s.handler.register(c.Comp, c.Opts) if err != nil { // TODO Log and return return } } // Initialize components for _, c := range components { c.Comp.OnInit() } go func() { if s.WS.IsWebsocket { if len(s.WS.TLSCertificate) != 0 && len(s.WS.TLSKey) != 0 { s.listenAndServeWSTLS() } else { s.listenAndServeWS() } } else { s.listenAndServe() } }() sg := make(chan os.Signal) signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM) select { //case <-env.Die: // log.Println("The app will shutdown in a few seconds") case s := <-sg: log.Infof("server got signal", s) } log.Infof("server is stopping...") s.Shutdown() // TODO close } func (s *Server) Shutdown() { components := s.Components.List() compLen := len(components) for i := compLen - 1; i >= 0; i-- { components[i].Comp.OnShutdown() } } func (s *Server) listenAndServe() { listener, err := net.Listen(s.protocol, s.address) if err != nil { panic(err) } // 监听成功,服务已启动 // TODO log defer listener.Close() go func() { for { conn, err := listener.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { log.Error("服务器网络错误", err) return } log.Errorf("监听错误 %v", err) continue } err = pool.SubmitConn(func() { r := newRequest(conn, s.Pipeline) s.handler.handle(r) }) if err != nil { // TODO Log continue } } }() } func (s *Server) listenAndServeWS() { upgrade := websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: nil, EnableCompression: false, } http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { conn, err := upgrade.Upgrade(w, r, nil) if err != nil { // TODO upgrade failure, uri=r.requestURI err=err.Error() return } // TODO s.handler.handleWS(conn) }) if err := http.ListenAndServe(s.address, nil); err != nil { panic(err) } } func (s *Server) listenAndServeWSTLS() { upgrade := websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: nil, EnableCompression: false, } http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) { conn, err := upgrade.Upgrade(w, r, nil) if err != nil { // TODO upgrade failure, uri=r.requestURI err=err.Error() return } // TODO s.handler.handleWS(conn) }) if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil { panic(err) } }