You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/core/nnet.go

250 lines
5.7 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package core
import (
"git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/lifetime"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/pipeline"
"git.noahlan.cn/noahlan/nnet/serialize"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"net/http"
"time"
)
type (
// RunOption defines the method to customize a NNet.
RunOption func(*NNet)
Server struct {
*NNet
}
Client struct {
*NNet
}
NNet struct {
ngin *engine
router Router
}
)
// NewServer returns a server with given config of c and options defined in opts.
// Be aware that later RunOption might overwrite previous one that write the same option.
func NewServer(c config.EngineConf, opts ...RunOption) *Server {
s := &Server{
NNet: &NNet{
ngin: newEngine(c),
router: NewDefaultRouter(),
},
}
opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...)
for _, opt := range opts {
opt(s.NNet)
}
return s
}
// NewClient returns a client with given config of c and options defined in opts.
// Be aware that later RunOption might overwrite previous one that write the same option.
func NewClient(c config.EngineConf, opts ...RunOption) *Client {
s := &Client{
NNet: &NNet{
ngin: newEngine(c),
router: NewDefaultRouter(),
},
}
opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...)
for _, opt := range opts {
opt(s.NNet)
}
return s
}
// Start starts the NNet.
// Graceful shutdown is enabled by default.
func (s *Server) Start() {
if err := s.ngin.serve(s.router); err != nil {
nlog.Error(err)
panic(err)
}
}
// Dial start the NNet client.
// Graceful shutdown is enabled by default.
func (c *Client) Dial(addr string) entity.NetworkEntity {
e, err := c.ngin.dial(addr, c.router)
nlog.Must(err)
return e
}
// AddRoutes add given routes into the NNet.
func (s *NNet) AddRoutes(rs []Route) {
s.ngin.addRoutes(rs...)
err := s.ngin.bindRoutes(s.router)
nlog.Must(err)
}
// AddRoute adds given route into the NNet.
func (s *NNet) AddRoute(r Route) {
s.AddRoutes([]Route{r})
}
// Stop stops the NNet.
func (s *NNet) Stop() {
s.ngin.close()
}
// Use adds the given middleware in the NNet.
func (s *NNet) Use(middleware ...Middleware) {
s.ngin.use(middleware...)
}
// Pipeline returns inner pipeline
func (s *NNet) Pipeline() pipeline.Pipeline {
return s.ngin.pipeline
}
// Lifetime returns lifetime interface.
func (s *NNet) Lifetime() lifetime.Lifetime {
return s.ngin.lifetime
}
// ConnManager returns connection manager
func (s *NNet) ConnManager() *conn.Manager {
return s.ngin.connManager
}
// ToMiddleware converts the given handler to a Middleware.
func ToMiddleware(handler func(next Handler) Handler) Middleware {
return func(next HandlerFunc) HandlerFunc {
return handler(next).Handle
}
}
// WithMiddlewares adds given middlewares to given routes.
func WithMiddlewares(ms []Middleware, rs ...Route) []Route {
for i := len(ms) - 1; i >= 0; i-- {
rs = WithMiddleware(ms[i], rs...)
}
return rs
}
// WithMiddleware adds given middleware to given route.
func WithMiddleware(middleware Middleware, rs ...Route) []Route {
routes := make([]Route, len(rs))
for i := range rs {
route := rs[i]
routes[i] = Route{
Matches: route.Matches,
Handler: middleware(route.Handler),
}
}
return routes
}
func UseMiddleware(middleware ...Middleware) RunOption {
return func(server *NNet) {
server.Use(middleware...)
}
}
// WithNotFoundHandler returns a RunOption with not found handler set to given handler.
func WithNotFoundHandler(handler Handler) RunOption {
return func(server *NNet) {
notFoundHandler := server.ngin.notFoundHandler(handler)
server.router.SetNotFoundHandler(notFoundHandler)
}
}
// WithRouter 设置消息路由
func WithRouter(router Router) RunOption {
return func(server *NNet) {
server.router = router
}
}
// WithPacker 设置消息的 封包/解包 方式
func WithPacker(fn packet.NewPackerFunc) RunOption {
return func(server *NNet) {
server.ngin.packerFn = fn
}
}
// WithSerializer 设置消息的 序列化/反序列化 方式
func WithSerializer(s serialize.Serializer) RunOption {
return func(server *NNet) {
server.ngin.serializer = s
}
}
// WithTimerPrecision 设置Timer精度需在 Start 或 Dial 之前执行
// 注精度需大于1ms, 并且不能在运行时更改
// 默认精度是 time.Second
func WithTimerPrecision(precision time.Duration) RunOption {
if precision < time.Millisecond {
panic("time precision can not less than a Millisecond")
}
return func(s *NNet) {
s.ngin.taskTimerPrecision = precision
}
}
func WithPipeline(pipeline pipeline.Pipeline) RunOption {
return func(server *NNet) {
server.ngin.pipeline = pipeline
}
}
type PipelineOption func(opts pipeline.Pipeline)
func WithPipelineOpt(opts ...func(pipeline.Pipeline)) RunOption {
return func(server *NNet) {
for _, opt := range opts {
opt(server.ngin.pipeline)
}
}
}
type WSOption func(opts *wsOptions)
// WithWebsocket 开启Websocket, 参数是websocket的相关参数 nnet.WSOption
func WithWebsocket(wsOpts ...WSOption) RunOption {
return func(server *NNet) {
for _, opt := range wsOpts {
opt(&server.ngin.wsOpt)
}
server.ngin.wsOpt.IsWebsocket = true
}
}
// WithWSPath 设置websocket的path
func WithWSPath(path string) WSOption {
return func(opts *wsOptions) {
opts.WebsocketPath = path
}
}
// WithWSTLSConfig 设置websocket的证书和密钥
func WithWSTLSConfig(certificate, key string) WSOption {
return func(opts *wsOptions) {
opts.TLSCertificate = certificate
opts.TLSKey = key
}
}
func WithWSCheckOriginFunc(fn func(*http.Request) bool) WSOption {
return func(opts *wsOptions) {
if fn != nil {
opts.CheckOrigin = fn
}
}
}