wip: 又加了一些新东西。

main
NorthLan 2 years ago
parent dca483dc32
commit 5779bb7989

@ -0,0 +1,20 @@
package component
type CompWithOptions struct {
Comp Component
Opts []Option
}
type Components struct {
comps []CompWithOptions
}
// Register 全局注册组件,必须在服务启动之前初始化
func (cs *Components) Register(c Component, options ...Option) {
cs.comps = append(cs.comps, CompWithOptions{c, options})
}
// List 获取所有已注册组件
func (cs *Components) List() []CompWithOptions {
return cs.comps
}

@ -1,7 +1,6 @@
package component
import (
"git.noahlan.cn/northlan/nnet/net"
"reflect"
"unicode"
"unicode/utf8"
@ -10,7 +9,7 @@ import (
var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfBytes = reflect.TypeOf(([]byte)(nil))
typeOfRequest = reflect.TypeOf(net.Request{})
typeOfRequest = reflect.TypeOf(nnet.Request{})
)
func isExported(name string) bool {

@ -1,4 +0,0 @@
package interfaces
type IMessage interface {
}

@ -1,7 +0,0 @@
package interfaces
// IPacket 面向TCP连接中的数据流网络数据包抽象接口
type IPacket interface {
// HeaderLen TCP数据帧头部长度固定长度值
HeaderLen() int
}

@ -0,0 +1,88 @@
package log
import (
"log"
"os"
)
type Logger interface {
Debugf(format string, v ...interface{})
Debug(v ...interface{})
Info(v ...interface{})
Infof(format string, v ...interface{})
Error(v ...interface{})
Errorf(format string, v ...interface{})
Panic(v ...interface{})
Panicf(format string, v ...interface{})
}
func init() {
SetLogger(newInnerLogger())
}
var (
Debugf func(format string, v ...interface{})
Debug func(v ...interface{})
Info func(v ...interface{})
Infof func(format string, v ...interface{})
Error func(v ...interface{})
Errorf func(format string, v ...interface{})
Panic func(v ...interface{})
Panicf func(format string, v ...interface{})
)
func SetLogger(logger Logger) {
if logger == nil {
return
}
Debugf = logger.Debugf
Debug = logger.Debug
Info = logger.Info
Infof = logger.Infof
Error = logger.Error
Errorf = logger.Errorf
Panic = logger.Panic
Panicf = logger.Panicf
}
type innerLogger struct {
log *log.Logger
}
func newInnerLogger() Logger {
return &innerLogger{
log: log.New(os.Stderr, "[N-Net] ", log.LstdFlags|log.Lshortfile),
}
}
func (i *innerLogger) Debugf(format string, v ...interface{}) {
i.log.Printf(format, v)
}
func (i *innerLogger) Debug(v ...interface{}) {
i.log.Println(v)
}
func (i *innerLogger) Info(v ...interface{}) {
i.log.Println(v)
}
func (i *innerLogger) Infof(format string, v ...interface{}) {
i.log.Printf(format, v)
}
func (i *innerLogger) Error(v ...interface{}) {
i.log.Println(v)
}
func (i *innerLogger) Errorf(format string, v ...interface{}) {
i.log.Printf(format, v)
}
func (i *innerLogger) Panic(v ...interface{}) {
i.log.Panic(v)
}
func (i *innerLogger) Panicf(format string, v ...interface{}) {
i.log.Panicf(format, v)
}

@ -0,0 +1,11 @@
package message
type Header struct {
}
type Message struct {
Type byte // 消息类型
ID uint64 // 消息ID
Header []byte // 消息头原始数据
Payload []byte // 数据
}

@ -1,9 +0,0 @@
package net
type Option func(server *Server)
func WithXXX() Option {
return func(server *Server) {
}
}

@ -1,37 +0,0 @@
package net
import (
"git.noahlan.cn/northlan/nnet/interfaces"
"net"
)
type Request struct {
session interfaces.ISession // Session
server *Server // Server reference
conn net.Conn // low-level conn fd
status Status // 连接状态
}
func newRequest(server *Server, conn net.Conn) *Request {
r := &Request{
server: server,
conn: conn,
status: StatusStart,
}
r.session = newSession()
return r
}
func (r *Request) Status() Status {
return r.status
}
func (r *Request) ID() int64 {
return r.session.ID()
}
func (r *Request) Session() interfaces.ISession {
return r.session
}

@ -1,124 +0,0 @@
package net
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/panjf2000/ants/v2"
"net"
"net/http"
)
// Server TCP-Server
type Server struct {
// Name 服务端名称,默认为 n-net
Name string
// protocol 协议名
// "tcp", "tcp4", "tcp6", "unix" or "unixpacket"
// 若只想开启IPv4, 使用tcp4即可
protocol string
// address 服务地址
// 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP
// 如果端口号不填或端口号为0例如"127.0.0.1:" 或 ":0",服务端将选择随机可用端口
address string
// 一些其它的东西 .etc..
handler *Handler
sessionMgr *SessionMgr
pool *ants.Pool
}
func newServer(opts ...Option) *Server {
s := &Server{
Name: "",
protocol: "",
address: "",
handler: nil,
sessionMgr: nil,
}
for _, opt := range opts {
opt(s)
}
s.pool, _ = ants.NewPool(0)
return s
}
func (s *Server) listenAndServe() {
listener, err := net.Listen(s.protocol, s.address)
if err != nil {
panic(err)
}
// 监听成功,服务已启动
// TODO log
defer listener.Close()
go func() {
for {
conn, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
// 服务端网络错误
// TODO print log
return
}
// TODO log
continue
}
// TODO 最大连接限制
//if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
// conn.Close()
// continue
//}
r := newRequest(s, conn)
s.pool.Submit(func() {
s.handler.handle(r)
})
}
}()
}
func (s *Server) listenAndServeWS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: nil,
EnableCompression: false,
}
http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
// TODO upgrade failure, uri=r.requestURI err=err.Error()
return
}
// TODO s.handler.handleWS(conn)
})
if err := http.ListenAndServe(s.address, nil); err != nil {
panic(err)
}
}
func (s *Server) listenAndServeWSTLS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: nil,
EnableCompression: false,
}
http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
// TODO upgrade failure, uri=r.requestURI err=err.Error()
return
}
// TODO s.handler.handleWS(conn)
})
if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil {
panic(err)
}
}

@ -1,40 +0,0 @@
package net
import (
"git.noahlan.cn/northlan/nnet/interfaces"
"sync"
)
type SessionMgr struct {
sync.RWMutex
sessions map[int64]interfaces.ISession
}
func (m *SessionMgr) storeSession(s interfaces.ISession) {
m.Lock()
defer m.Unlock()
m.sessions[s.ID()] = s
}
func (m *SessionMgr) findSession(sid int64) interfaces.ISession {
m.RLock()
defer m.RUnlock()
return m.sessions[sid]
}
func (m *SessionMgr) findOrCreateSession(sid int64) interfaces.ISession {
m.RLock()
s, ok := m.sessions[sid]
m.RUnlock()
if !ok {
s = newSession()
m.Lock()
m.sessions[s.ID()] = s
m.Unlock()
}
return s
}

@ -1,4 +1,4 @@
package interfaces
package nface
// IRouter 路由接口
type IRouter interface {

@ -1,4 +1,4 @@
package interfaces
package nface
// ISessionAttribute Session数据接口
type ISessionAttribute interface {

@ -1,37 +1,29 @@
package net
package nnet
import (
"fmt"
"git.noahlan.cn/northlan/nnet/component"
"github.com/panjf2000/ants/v2"
"git.noahlan.cn/northlan/nnet/pipeline"
)
type Handler struct {
server *Server // Server 引用
pipeline pipeline.Pipeline // 通道
allServices map[string]*component.Service // 所有注册的Service
allHandlers map[string]*component.Handler // 所有注册的Handler
workerSize int64 // 业务工作Worker数量
taskQueue []chan *Request // 工作协程的消息队列
}
func NewHandler() *Handler {
return &Handler{
allServices: make(map[string]*component.Service),
allHandlers: make(map[string]*component.Handler),
// TODO 读取配置
workerSize: 10,
taskQueue: make([]chan *Request, 10),
}
}
func (h *Handler) register(comp component.Component, opts []component.Option) error {
s := component.NewService(comp, opts)
p, _ := ants.NewPool()
p.Submit(func() {
})
if _, ok := h.allServices[s.Name]; ok {
return fmt.Errorf("handler: service already defined: %s", s.Name)
}
@ -52,15 +44,6 @@ func (h *Handler) register(comp component.Component, opts []component.Option) er
return nil
}
// DoWorker 将工作交给worker处理
func (h *Handler) DoWorker(request *Request) {
// 根据sessionID平均分配worker处理
workerId := request.Session.ID() % h.workerSize
fmt.Printf("sessionID %d to workerID %d\n", request.Session.ID(), workerId)
// 入队
h.taskQueue[workerId] <- request
}
func (h *Handler) handle(request *Request) {
}

@ -0,0 +1,57 @@
package nnet
import (
"git.noahlan.cn/northlan/nnet/component"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/pipeline"
"time"
)
type Option func(options *Options)
type WSOption func(opts *WSOptions)
func WithLogger(logger log.Logger) Option {
return func(_ *Options) {
log.SetLogger(logger)
}
}
func WithPipeline(pipeline pipeline.Pipeline) Option {
return func(options *Options) {
options.Pipeline = pipeline
}
}
func WithComponents(components *component.Components) Option {
return func(options *Options) {
options.Components = components
}
}
func WithHeartbeatInterval(d time.Duration) Option {
return func(options *Options) {
options.HeartbeatInterval = d
}
}
func WithWebsocket(wsOpts ...WSOption) Option {
return func(options *Options) {
for _, opt := range wsOpts {
opt(&options.WS)
}
options.WS.IsWebsocket = true
}
}
func WithWSPath(path string) WSOption {
return func(opts *WSOptions) {
opts.WebsocketPath = path
}
}
func WithWSTLS(certificate, key string) WSOption {
return func(opts *WSOptions) {
opts.TLSCertificate = certificate
opts.TLSKey = key
}
}

@ -0,0 +1,27 @@
package nnet
import "github.com/panjf2000/ants/v2"
var pool *Pool
type Pool struct {
connPool *ants.Pool
workerPool *ants.Pool
}
func initPool(size int) {
p := &Pool{}
p.connPool, _ = ants.NewPool(size, ants.WithNonblocking(true))
p.workerPool, _ = ants.NewPool(size*2, ants.WithNonblocking(true))
pool = p
}
func (p *Pool) SubmitConn(h func()) error {
return p.connPool.Submit(h)
}
func (p *Pool) SubmitWorker(h func()) error {
return p.workerPool.Submit(h)
}

@ -0,0 +1,63 @@
package nnet
import (
"git.noahlan.cn/northlan/nnet/nface"
"git.noahlan.cn/northlan/nnet/pipeline"
"git.noahlan.cn/northlan/nnet/session"
"github.com/gorilla/websocket"
"net"
"time"
)
type Request struct {
session nface.ISession // Session
conn net.Conn // low-level conn fd
status Status // 连接状态
lastMid uint64 // 最近一次消息ID
lastHeartbeatAt int64 // 最近一次心跳时间
chDie chan struct{} // 停止通道
chSend chan []byte // 消息发送通道
pipeline pipeline.Pipeline // 消息管道
}
func newRequest(conn net.Conn, pipeline pipeline.Pipeline) *Request {
r := &Request{
conn: conn,
status: StatusStart,
lastHeartbeatAt: time.Now().Unix(),
chDie: make(chan struct{}),
chSend: make(chan []byte),
pipeline: pipeline,
}
// binding session
r.session = session.New()
return r
}
func newRequestWS(conn *websocket.Conn, pipeline pipeline.Pipeline) *Request {
c, err := newWSConn(conn)
if err != nil {
// TODO panic ?
panic(err)
}
return newRequest(c, pipeline)
}
func (r *Request) Status() Status {
return r.status
}
func (r *Request) ID() int64 {
return r.session.ID()
}
func (r *Request) Session() nface.ISession {
return r.session
}

@ -0,0 +1,201 @@
package nnet
import (
"errors"
"fmt"
"git.noahlan.cn/northlan/nnet/component"
"git.noahlan.cn/northlan/nnet/log"
"git.noahlan.cn/northlan/nnet/pipeline"
"git.noahlan.cn/northlan/nnet/session"
"github.com/gorilla/websocket"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
type (
Options struct {
Name string // 服务端名默认为n-net
Pipeline pipeline.Pipeline // 消息管道
RetryInterval time.Duration // 消息重试间隔时长
Components *component.Components // 组件库
HeartbeatInterval time.Duration // 心跳间隔0表示不进行心跳
WS WSOptions // websocket
}
WSOptions struct {
IsWebsocket bool // 是否为websocket服务端
WebsocketPath string // ws地址(ws://127.0.0.1/WebsocketPath)
TLSCertificate string // TLS 证书地址 (websocket)
TLSKey string // TLS 证书key地址 (websocket)
}
)
// Server TCP-Server
type Server struct {
// Options 参数列表
Options
// protocol 协议名
// "tcp", "tcp4", "tcp6", "unix" or "unixpacket"
// 若只想开启IPv4, 使用tcp4即可
protocol string
// address 服务地址
// 地址可直接使用hostname,但强烈不建议这样做,可能会同时监听多个本地IP
// 如果端口号不填或端口号为0例如"127.0.0.1:" 或 ":0",服务端将选择随机可用端口
address string
// handler 消息处理器
handler *Handler
// sessionMgr session管理器
sessionMgr *session.Manager
}
func NewServer(protocol, addr string, opts ...Option) *Server {
options := Options{
Components: &component.Components{},
WS: WSOptions{},
}
s := &Server{
Options: options,
protocol: protocol,
address: addr,
}
for _, opt := range opts {
opt(&s.Options)
}
s.handler = NewHandler()
s.sessionMgr = session.NewManager()
initPool(0)
return s
}
func (s *Server) Serve() {
components := s.Components.List()
for _, c := range components {
err := s.handler.register(c.Comp, c.Opts)
if err != nil {
// TODO Log and return
return
}
}
// Initialize components
for _, c := range components {
c.Comp.OnInit()
}
go func() {
if s.WS.IsWebsocket {
if len(s.WS.TLSCertificate) != 0 && len(s.WS.TLSKey) != 0 {
s.listenAndServeWSTLS()
} else {
s.listenAndServeWS()
}
} else {
s.listenAndServe()
}
}()
sg := make(chan os.Signal)
signal.Notify(sg, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM)
select {
//case <-env.Die:
// log.Println("The app will shutdown in a few seconds")
case s := <-sg:
log.Infof("server got signal", s)
}
log.Infof("server is stopping...")
s.Shutdown()
// TODO close
}
func (s *Server) Shutdown() {
components := s.Components.List()
compLen := len(components)
for i := compLen - 1; i >= 0; i-- {
components[i].Comp.OnShutdown()
}
}
func (s *Server) listenAndServe() {
listener, err := net.Listen(s.protocol, s.address)
if err != nil {
panic(err)
}
// 监听成功,服务已启动
// TODO log
defer listener.Close()
go func() {
for {
conn, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
log.Error("服务器网络错误", err)
return
}
log.Errorf("监听错误 %v", err)
continue
}
err = pool.SubmitConn(func() {
r := newRequest(conn, s.Pipeline)
s.handler.handle(r)
})
if err != nil {
// TODO Log
continue
}
}
}()
}
func (s *Server) listenAndServeWS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: nil,
EnableCompression: false,
}
http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
// TODO upgrade failure, uri=r.requestURI err=err.Error()
return
}
// TODO s.handler.handleWS(conn)
})
if err := http.ListenAndServe(s.address, nil); err != nil {
panic(err)
}
}
func (s *Server) listenAndServeWSTLS() {
upgrade := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: nil,
EnableCompression: false,
}
http.HandleFunc(fmt.Sprintf("/%s/", "path"), func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
// TODO upgrade failure, uri=r.requestURI err=err.Error()
return
}
// TODO s.handler.handleWS(conn)
})
if err := http.ListenAndServeTLS(s.address, "", "", nil); err != nil {
panic(err)
}
}

@ -1,4 +1,4 @@
package net
package nnet
type Status uint8

@ -0,0 +1,114 @@
package nnet
import (
"github.com/gorilla/websocket"
"io"
"net"
"time"
)
// wsConn 封装 websocket.Conn 并实现所有 net.Conn 接口
// 兼容所有使用 net.Conn 的方法
type wsConn struct {
conn *websocket.Conn
typ int // message type
reader io.Reader
}
// newWSConn 新建wsConn
func newWSConn(conn *websocket.Conn) (*wsConn, error) {
c := &wsConn{conn: conn}
t, r, err := conn.NextReader()
if err != nil {
return nil, err
}
c.typ = t
c.reader = r
return c, nil
}
// Read reads data from the connection.
// Read can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetReadDeadline.
func (c *wsConn) Read(b []byte) (int, error) {
n, err := c.reader.Read(b)
if err != nil && err != io.EOF {
return n, err
} else if err == io.EOF {
_, r, err := c.conn.NextReader()
if err != nil {
return 0, err
}
c.reader = r
}
return n, nil
}
// Write writes data to the connection.
// Write can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
func (c *wsConn) Write(b []byte) (int, error) {
err := c.conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
return 0, err
}
return len(b), nil
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *wsConn) Close() error {
return c.conn.Close()
}
// LocalAddr returns the local network address.
func (c *wsConn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
// RemoteAddr returns the remote network address.
func (c *wsConn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to Read or
// Write. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c *wsConn) SetDeadline(t time.Time) error {
if err := c.conn.SetReadDeadline(t); err != nil {
return err
}
return c.conn.SetWriteDeadline(t)
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c *wsConn) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some data was successfully written.
// A zero value for t means Write will not time out.
func (c *wsConn) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}

@ -0,0 +1,48 @@
package packet
import (
"fmt"
)
// Type 数据帧类型,如:握手,心跳,数据等
type Type byte
const (
// Default 默认,暂无意义
Default Type = iota
// Handshake 握手数据(服务端主动发起)
Handshake = 0x01
// HandshakeAck 握手回复(客户端回复)
HandshakeAck = 0x02
// Heartbeat 心跳(服务端发起)
Heartbeat = 0x03
// Data 数据传输
Data = 0x04
// Kick 服务端主动断开连接
Kick = 0x05
)
type Packet struct {
Type Type // 数据帧 类型
HeaderLen uint32 // 数据帧头 长度
HeaderRaw []byte // 头原始数据
DataLen uint32 // 数据长度
DataRaw []byte // 原始数据
}
func New() *Packet {
return &Packet{
Type: Default,
}
}
func (p *Packet) String() string {
return fmt.Sprintf("Type: %d, HeaderLen: %d, DataLen: %d, Header: %s, Data: %s", p.Type, p.HeaderLen, p.DataLen, string(p.HeaderRaw), string(p.DataRaw))
}

@ -0,0 +1,83 @@
package pipeline
import (
"sync"
)
type (
Func func(request *nnet.Request) error
// Pipeline 消息管道
Pipeline interface {
Outbound() Channel
Inbound() Channel
}
pipeline struct {
outbound, inbound *pipelineChannel
}
Channel interface {
PushFront(h Func)
PushBack(h Func)
Process(request *nnet.Request) error
}
pipelineChannel struct {
mu sync.RWMutex
handlers []Func
}
)
func New() Pipeline {
return &pipeline{
outbound: &pipelineChannel{},
inbound: &pipelineChannel{},
}
}
func (p *pipeline) Outbound() Channel {
return p.outbound
}
func (p *pipeline) Inbound() Channel {
return p.inbound
}
// PushFront 将func压入slice首位
func (p *pipelineChannel) PushFront(h Func) {
p.mu.Lock()
defer p.mu.Unlock()
handlers := make([]Func, len(p.handlers)+1)
handlers[0] = h
copy(handlers[1:], p.handlers)
p.handlers = handlers
}
// PushBack 将func压入slice末位
func (p *pipelineChannel) PushBack(h Func) {
p.mu.Lock()
defer p.mu.Unlock()
p.handlers = append(p.handlers, h)
}
// Process 处理所有的pipeline方法
func (p *pipelineChannel) Process(request *nnet.Request) error {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.handlers) < 1 {
return nil
}
for _, handler := range p.handlers {
err := handler(request)
if err != nil {
return err
}
}
return nil
}

@ -1,12 +1,11 @@
package net
package session
import (
"git.noahlan.cn/northlan/nnet/interfaces"
"sync"
"sync/atomic"
)
var _ interfaces.ISession = (*Session)(nil)
var _ nface.ISession = (*Session)(nil)
type Session struct {
sync.RWMutex // 数据锁
@ -16,7 +15,7 @@ type Session struct {
data map[string]interface{} // session数据存储内存
}
func newSession() interfaces.ISession {
func New() nface.ISession {
return &Session{
id: sessionIDMgrInstance.SessionID(),
uid: "",

@ -0,0 +1,47 @@
package session
import (
"git.noahlan.cn/northlan/nnet/nface"
"sync"
)
type Manager struct {
sync.RWMutex
sessions map[int64]nface.ISession
}
func NewManager() *Manager {
return &Manager{
RWMutex: sync.RWMutex{},
sessions: make(map[int64]nface.ISession),
}
}
func (m *Manager) storeSession(s nface.ISession) {
m.Lock()
defer m.Unlock()
m.sessions[s.ID()] = s
}
func (m *Manager) findSession(sid int64) nface.ISession {
m.RLock()
defer m.RUnlock()
return m.sessions[sid]
}
func (m *Manager) findOrCreateSession(sid int64) nface.ISession {
m.RLock()
s, ok := m.sessions[sid]
m.RUnlock()
if !ok {
s = New()
m.Lock()
m.sessions[s.ID()] = s
m.Unlock()
}
return s
}
Loading…
Cancel
Save