package connection import ( "errors" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "sync" "sync/atomic" ) const groupKey = "NNET_GROUP#" const DefaultGroupName = "DEFAULT_GROUP" const ( groupStatusWorking = 0 groupStatusClosed = 1 ) var ( ErrCloseClosedGroup = errors.New("close closed group") ErrClosedGroup = errors.New("group closed") DeleteDefaultGroupNotAllow = errors.New("delete default group not allow") ) type Group struct { mu sync.RWMutex status int32 // group current status name string // group name conns map[int64]*Connection } func NewGroup(name string) *Group { return &Group{ mu: sync.RWMutex{}, status: groupStatusWorking, name: name, conns: make(map[int64]*Connection), } } // Member returns connection by specified uid func (g *Group) Member(uid string) (*Connection, bool) { g.mu.RLock() defer g.mu.RUnlock() for _, e := range g.conns { if e.Session().UID() == uid { return e, true } } return nil, false } // MemberBySID returns specified sId's connection func (g *Group) MemberBySID(id int64) (*Connection, bool) { g.mu.RLock() defer g.mu.RUnlock() e, ok := g.conns[id] return e, ok } func (g *Group) Members() []*Connection { var resp []*Connection g.PeekMembers(func(_ int64, c *Connection) bool { resp = append(resp, c) return false }) return resp } // PeekMembers returns all members in current group // fn 返回true跳过循环,反之一直循环 func (g *Group) PeekMembers(fn func(sId int64, c *Connection) bool) { g.mu.RLock() defer g.mu.RUnlock() for sId, c := range g.conns { if fn(sId, c) { break } } } // Contains check whether a UID is contained in current group or not func (g *Group) Contains(uid string) bool { _, ok := g.Member(uid) return ok } // Add session to group func (g *Group) Add(c *Connection) error { if g.isClosed() { return ErrClosedGroup } g.mu.Lock() defer g.mu.Unlock() sess := c.Session() id := sess.ID() // group attribute if sess.Exists(groupKey) { groups, ok := sess.Attribute(groupKey).([]string) if !ok { groups = make([]string, 0) sess.SetAttribute(groupKey, groups) } contains := false for _, group := range groups { if group == g.name { contains = true break } } if !contains { groups = append(groups, g.name) sess.SetAttribute(groupKey, groups) } } else { sess.SetAttribute(groupKey, []string{g.name}) } if _, ok := g.conns[id]; !ok { g.conns[id] = c } nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", g.name, sess.ID(), sess.UID()) return nil } // Leave remove specified UID related session from group func (g *Group) Leave(c *Connection) error { if g.isClosed() { return ErrClosedGroup } if c == nil { return nil } sess := c.Session() nlog.Debugf("Remove connection from group %s, UID=%s", g.name, sess.UID()) g.mu.Lock() defer g.mu.Unlock() if sess.Exists(groupKey) { groups, ok := sess.Attribute(groupKey).([]string) if !ok { groups = make([]string, 0) sess.SetAttribute(groupKey, groups) } groups = g.removeGroupAttr(groups) if len(groups) == 0 { sess.RemoveAttribute(groupKey) } else { sess.SetAttribute(groupKey, groups) } } delete(g.conns, sess.ID()) return nil } func (g *Group) LeaveByUID(uid string) error { if g.isClosed() { return ErrClosedGroup } member, _ := g.Member(uid) return g.Leave(member) } // LeaveAll clear all sessions in the group func (g *Group) LeaveAll() error { if g.isClosed() { return ErrClosedGroup } g.mu.Lock() defer g.mu.Unlock() for _, e := range g.conns { sess := e.Session() groups, ok := sess.Attribute(groupKey).([]string) if !ok { groups = make([]string, 0) sess.SetAttribute(groupKey, groups) } groups = g.removeGroupAttr(groups) if len(groups) == 0 { sess.RemoveAttribute(groupKey) } else { sess.SetAttribute(groupKey, groups) } } g.conns = make(map[int64]*Connection) return nil } // 使用移位法移除group中与name匹配的元素 func (g *Group) removeGroupAttr(groups []string) []string { j := 0 for _, v := range groups { if v != g.name { groups[j] = v j++ } } return groups[:j] } // Count get current member amount in the group func (g *Group) Count() int { g.mu.RLock() defer g.mu.RUnlock() return len(g.conns) } func (g *Group) isClosed() bool { if atomic.LoadInt32(&g.status) == groupStatusClosed { return true } return false } // Close destroy group, which will release all resource in the group func (g *Group) Close() error { if g.isClosed() { return ErrCloseClosedGroup } if g.name == DefaultGroupName { // 默认分组不允许删除 return DeleteDefaultGroupNotAllow } _ = g.LeaveAll() atomic.StoreInt32(&g.status, groupStatusClosed) return nil }