package conn import ( "errors" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "sync" "sync/atomic" ) const groupKey = "NNET_GROUP#" const DefaultGroupName = "DEFAULT_GROUP" const ( groupStatusWorking = 0 groupStatusClosed = 1 ) var ( ErrCloseClosedGroup = errors.New("close closed group") ErrClosedGroup = errors.New("group closed") DeleteDefaultGroupNotAllow = errors.New("delete default group not allow") ) type Group struct { mu sync.RWMutex status int32 // group current status name string // group name conns map[int64]entity.NetworkEntity } func NewGroup(name string) *Group { return &Group{ mu: sync.RWMutex{}, status: groupStatusWorking, name: name, conns: make(map[int64]entity.NetworkEntity), } } // Member returns connection by specified uid func (c *Group) Member(uid string) (entity.NetworkEntity, bool) { c.mu.RLock() defer c.mu.RUnlock() for _, e := range c.conns { if e.Session().UID() == uid { return e, true } } return nil, false } // MemberBySID returns specified sId's connection func (c *Group) MemberBySID(id int64) (entity.NetworkEntity, bool) { c.mu.RLock() defer c.mu.RUnlock() e, ok := c.conns[id] return e, ok } func (c *Group) Members() []entity.NetworkEntity { var resp []entity.NetworkEntity c.PeekMembers(func(_ int64, e entity.NetworkEntity) bool { resp = append(resp, e) return false }) return resp } // PeekMembers returns all members in current group // fn 返回true跳过循环,反之一直循环 func (c *Group) PeekMembers(fn func(sId int64, e entity.NetworkEntity) bool) { c.mu.RLock() defer c.mu.RUnlock() for sId, e := range c.conns { if fn(sId, e) { break } } } // Contains check whether a UID is contained in current group or not func (c *Group) Contains(uid string) bool { _, ok := c.Member(uid) return ok } // Add session to group func (c *Group) Add(e entity.NetworkEntity) error { if c.isClosed() { return ErrClosedGroup } c.mu.Lock() defer c.mu.Unlock() sess := e.Session() id := sess.ID() // group attribute if sess.Exists(groupKey) { groups, ok := sess.Attribute(groupKey).([]string) if !ok { groups = make([]string, 0) sess.SetAttribute(groupKey, groups) } contains := false for _, g := range groups { if g == c.name { contains = true break } } if !contains { groups = append(groups, c.name) sess.SetAttribute(groupKey, groups) } } else { sess.SetAttribute(groupKey, []string{c.name}) } if _, ok := c.conns[id]; !ok { c.conns[id] = e } nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", c.name, sess.ID(), sess.UID()) return nil } // Leave remove specified UID related session from group func (c *Group) Leave(e entity.NetworkEntity) error { if c.isClosed() { return ErrClosedGroup } if e == nil { return nil } sess := e.Session() nlog.Debugf("Remove connection from group %s, UID=%s", c.name, sess.UID()) c.mu.Lock() defer c.mu.Unlock() if sess.Exists(groupKey) { groups, ok := sess.Attribute(groupKey).([]string) if !ok { groups = make([]string, 0) sess.SetAttribute(groupKey, groups) } groups = c.removeGroupAttr(groups) if len(groups) == 0 { sess.RemoveAttribute(groupKey) } else { sess.SetAttribute(groupKey, groups) } } delete(c.conns, sess.ID()) return nil } func (c *Group) LeaveByUID(uid string) error { if c.isClosed() { return ErrClosedGroup } member, _ := c.Member(uid) return c.Leave(member) } // LeaveAll clear all sessions in the group func (c *Group) LeaveAll() error { if c.isClosed() { return ErrClosedGroup } c.mu.Lock() defer c.mu.Unlock() for _, e := range c.conns { sess := e.Session() groups, ok := sess.Attribute(groupKey).([]string) if !ok { groups = make([]string, 0) sess.SetAttribute(groupKey, groups) } groups = c.removeGroupAttr(groups) if len(groups) == 0 { sess.RemoveAttribute(groupKey) } else { sess.SetAttribute(groupKey, groups) } } c.conns = make(map[int64]entity.NetworkEntity) return nil } // 使用移位法移除group中与name匹配的元素 func (c *Group) removeGroupAttr(groups []string) []string { j := 0 for _, v := range groups { if v != c.name { groups[j] = v j++ } } return groups[:j] } // Count get current member amount in the group func (c *Group) Count() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.conns) } func (c *Group) isClosed() bool { if atomic.LoadInt32(&c.status) == groupStatusClosed { return true } return false } // Close destroy group, which will release all resource in the group func (c *Group) Close() error { if c.isClosed() { return ErrCloseClosedGroup } if c.name == DefaultGroupName { // 默认分组不允许删除 return DeleteDefaultGroupNotAllow } _ = c.LeaveAll() atomic.StoreInt32(&c.status, groupStatusClosed) return nil }