refactor: lifetime重构为实例而非全局变量,添加连接丢失退出所有分组逻辑。

main v0.6.0
NoahLan 2 years ago
parent 6a9a0288ec
commit 8edefc75c4

@ -35,7 +35,22 @@ func (m *Manager) Store(groupName string, s entity.NetworkEntity) error {
return group.Add(s) return group.Add(s)
} }
func (m *Manager) Remove(groupName string, s entity.NetworkEntity) error { func (m *Manager) Remove(s entity.NetworkEntity) error {
m.Lock()
defer m.Unlock()
delete(m.conns, s.Session().ID())
// 从所有group中移除
for _, group := range m.groups {
err := group.Leave(s)
if err != nil {
return err
}
}
return nil
}
func (m *Manager) RemoveFromGroup(groupName string, s entity.NetworkEntity) error {
m.Lock() m.Lock()
delete(m.conns, s.Session().ID()) delete(m.conns, s.Session().ID())
m.Unlock() m.Unlock()

@ -203,15 +203,15 @@ func (c *Group) LeaveAll() error {
} }
// 使用移位法移除group中与name匹配的元素 // 使用移位法移除group中与name匹配的元素
func (c *Group) removeGroupAttr(group []string) []string { func (c *Group) removeGroupAttr(groups []string) []string {
j := 0 j := 0
for _, v := range group { for _, v := range groups {
if v != c.name { if v != c.name {
group[j] = v groups[j] = v
j++ j++
} }
} }
return group[:j] return groups[:j]
} }
// Count get current member amount in the group // Count get current member amount in the group

@ -3,7 +3,6 @@ package core
import ( import (
"errors" "errors"
"fmt" "fmt"
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/nnet/scheduler"
@ -267,9 +266,9 @@ func (r *connection) Close() error {
case <-r.chDie: case <-r.chDie:
default: default:
close(r.chDie) close(r.chDie)
scheduler.PushTask(func() { Lifetime.Close(r) }) scheduler.PushTask(func() { r.ngin.lifetime.Close(r) })
} }
_ = r.ngin.connManager.Remove(conn.DefaultGroupName, r) _ = r.ngin.connManager.Remove(r)
r.session.Close() r.session.Close()
return r.conn.Close() return r.conn.Close()

@ -6,6 +6,7 @@ import (
"git.noahlan.cn/noahlan/nnet/config" "git.noahlan.cn/noahlan/nnet/config"
conn2 "git.noahlan.cn/noahlan/nnet/conn" conn2 "git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/lifetime"
"git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/pipeline" "git.noahlan.cn/noahlan/nnet/pipeline"
"git.noahlan.cn/noahlan/nnet/scheduler" "git.noahlan.cn/noahlan/nnet/scheduler"
@ -43,6 +44,7 @@ type (
dieChan chan struct{} dieChan chan struct{}
pipeline pipeline.Pipeline // 消息管道 pipeline pipeline.Pipeline // 消息管道
lifetime *lifetime.Mgr // 连接的生命周期管理器
packerFn packet.NewPackerFunc // 封包、拆包器 packerFn packet.NewPackerFunc // 封包、拆包器
serializer serialize.Serializer // 消息 序列化/反序列化 serializer serialize.Serializer // 消息 序列化/反序列化
@ -72,6 +74,7 @@ func newEngine(conf config.EngineConf) *engine {
taskTimerPrecision: conf.TaskTimerPrecision, taskTimerPrecision: conf.TaskTimerPrecision,
connManager: conn2.NewManager(), connManager: conn2.NewManager(),
sessIdMgr: newSessionIDMgr(), sessIdMgr: newSessionIDMgr(),
lifetime: lifetime.NewLifetime(),
} }
pool.InitPool(conf.Pool) pool.InitPool(conf.Pool)
@ -134,7 +137,7 @@ func (ng *engine) dial(addr string, router Router) (entity.NetworkEntity, error)
c := newConnection(ng, conn) c := newConnection(ng, conn)
c.serve() c.serve()
// hook // hook
Lifetime.Open(c) ng.lifetime.Open(c)
// connection manager // connection manager
err = ng.connManager.Store(conn2.DefaultGroupName, c) err = ng.connManager.Store(conn2.DefaultGroupName, c)
nlog.Must(err) nlog.Must(err)
@ -268,7 +271,7 @@ func (ng *engine) handle(conn net.Conn) {
c.serve() c.serve()
// hook // hook
Lifetime.Open(c) ng.lifetime.Open(c)
} }
func (ng *engine) notFoundHandler(next Handler) Handler { func (ng *engine) notFoundHandler(next Handler) Handler {

@ -1,42 +0,0 @@
package core
import "git.noahlan.cn/noahlan/nnet/entity"
type (
LifetimeHandler func(entity entity.NetworkEntity)
lifetime struct {
onOpen []LifetimeHandler
onClosed []LifetimeHandler
}
)
var Lifetime = &lifetime{}
func (lt *lifetime) OnClosed(h LifetimeHandler) {
lt.onClosed = append(lt.onClosed, h)
}
func (lt *lifetime) OnOpen(h LifetimeHandler) {
lt.onOpen = append(lt.onOpen, h)
}
func (lt *lifetime) Open(entity entity.NetworkEntity) {
if len(lt.onOpen) <= 0 {
return
}
for _, handler := range lt.onOpen {
handler(entity)
}
}
func (lt *lifetime) Close(entity entity.NetworkEntity) {
if len(lt.onClosed) <= 0 {
return
}
for _, handler := range lt.onClosed {
handler(entity)
}
}

@ -4,6 +4,7 @@ import (
"git.noahlan.cn/noahlan/nnet/config" "git.noahlan.cn/noahlan/nnet/config"
"git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/lifetime"
"git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/pipeline" "git.noahlan.cn/noahlan/nnet/pipeline"
"git.noahlan.cn/noahlan/nnet/serialize" "git.noahlan.cn/noahlan/nnet/serialize"
@ -110,6 +111,11 @@ func (s *NNet) Pipeline() pipeline.Pipeline {
return s.ngin.pipeline return s.ngin.pipeline
} }
// Lifetime returns lifetime interface.
func (s *NNet) Lifetime() lifetime.Lifetime {
return s.ngin.lifetime
}
// ConnManager returns connection manager // ConnManager returns connection manager
func (s *NNet) ConnManager() *conn.Manager { func (s *NNet) ConnManager() *conn.Manager {
return s.ngin.connManager return s.ngin.connManager

@ -0,0 +1,52 @@
package lifetime
import "git.noahlan.cn/noahlan/nnet/entity"
type (
Handler func(entity entity.NetworkEntity)
Lifetime interface {
OnClosed(h Handler)
OnOpen(h Handler)
}
Mgr struct {
onOpen []Handler
onClosed []Handler
}
)
func NewLifetime() *Mgr {
return &Mgr{
onOpen: make([]Handler, 0),
onClosed: make([]Handler, 0),
}
}
func (lt *Mgr) OnClosed(h Handler) {
lt.onClosed = append(lt.onClosed, h)
}
func (lt *Mgr) OnOpen(h Handler) {
lt.onOpen = append(lt.onOpen, h)
}
func (lt *Mgr) Open(entity entity.NetworkEntity) {
if len(lt.onOpen) <= 0 {
return
}
for _, handler := range lt.onOpen {
handler(entity)
}
}
func (lt *Mgr) Close(entity entity.NetworkEntity) {
if len(lt.onClosed) <= 0 {
return
}
for _, handler := range lt.onClosed {
handler(entity)
}
}

@ -25,9 +25,10 @@ func WithHeartbeat(interval time.Duration, hbdFn func(entity entity.NetworkEntit
nlog.Error("dataFn must not be nil") nlog.Error("dataFn must not be nil")
panic("dataFn must not be nil") panic("dataFn must not be nil")
} }
core.Lifetime.OnOpen(m.start)
return func(server *core.NNet) { return func(server *core.NNet) {
server.Lifetime().OnOpen(m.start)
server.Use(func(next core.HandlerFunc) core.HandlerFunc { server.Use(func(next core.HandlerFunc) core.HandlerFunc {
return func(entity entity.NetworkEntity, pkg packet.IPacket) { return func(entity entity.NetworkEntity, pkg packet.IPacket) {
m.handle(entity, pkg) m.handle(entity, pkg)

Loading…
Cancel
Save