|
|
|
|
package connection
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"errors"
|
|
|
|
|
"git.noahlan.cn/noahlan/ntool/nlog"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const groupKey = "NNET_GROUP#"
|
|
|
|
|
const DefaultGroupName = "DEFAULT_GROUP"
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
groupStatusWorking = 0
|
|
|
|
|
groupStatusClosed = 1
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
ErrCloseClosedGroup = errors.New("close closed group")
|
|
|
|
|
ErrClosedGroup = errors.New("group closed")
|
|
|
|
|
DeleteDefaultGroupNotAllow = errors.New("delete default group not allow")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Group struct {
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
|
|
|
|
|
status int32 // group current status
|
|
|
|
|
name string // group name
|
|
|
|
|
conns map[int64]*Connection
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewGroup(name string) *Group {
|
|
|
|
|
return &Group{
|
|
|
|
|
mu: sync.RWMutex{},
|
|
|
|
|
status: groupStatusWorking,
|
|
|
|
|
name: name,
|
|
|
|
|
conns: make(map[int64]*Connection),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Member returns connection by specified uid
|
|
|
|
|
func (g *Group) Member(uid string) (*Connection, bool) {
|
|
|
|
|
g.mu.RLock()
|
|
|
|
|
defer g.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
for _, e := range g.conns {
|
|
|
|
|
if e.Session().UID() == uid {
|
|
|
|
|
return e, true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MemberBySID returns specified sId's connection
|
|
|
|
|
func (g *Group) MemberBySID(id int64) (*Connection, bool) {
|
|
|
|
|
g.mu.RLock()
|
|
|
|
|
defer g.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
e, ok := g.conns[id]
|
|
|
|
|
return e, ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (g *Group) Members() []*Connection {
|
|
|
|
|
var resp []*Connection
|
|
|
|
|
g.PeekMembers(func(_ int64, c *Connection) bool {
|
|
|
|
|
resp = append(resp, c)
|
|
|
|
|
return false
|
|
|
|
|
})
|
|
|
|
|
return resp
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// PeekMembers returns all members in current group
|
|
|
|
|
// fn 返回true跳过循环,反之一直循环
|
|
|
|
|
func (g *Group) PeekMembers(fn func(sId int64, c *Connection) bool) {
|
|
|
|
|
g.mu.RLock()
|
|
|
|
|
defer g.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
for sId, c := range g.conns {
|
|
|
|
|
if fn(sId, c) {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Contains check whether a UID is contained in current group or not
|
|
|
|
|
func (g *Group) Contains(uid string) bool {
|
|
|
|
|
_, ok := g.Member(uid)
|
|
|
|
|
return ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add session to group
|
|
|
|
|
func (g *Group) Add(c *Connection) error {
|
|
|
|
|
if g.isClosed() {
|
|
|
|
|
return ErrClosedGroup
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g.mu.Lock()
|
|
|
|
|
defer g.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
sess := c.Session()
|
|
|
|
|
id := sess.ID()
|
|
|
|
|
|
|
|
|
|
// group attribute
|
|
|
|
|
if sess.Exists(groupKey) {
|
|
|
|
|
groups, ok := sess.Attribute(groupKey).([]string)
|
|
|
|
|
if !ok {
|
|
|
|
|
groups = make([]string, 0)
|
|
|
|
|
sess.SetAttribute(groupKey, groups)
|
|
|
|
|
}
|
|
|
|
|
contains := false
|
|
|
|
|
for _, group := range groups {
|
|
|
|
|
if group == g.name {
|
|
|
|
|
contains = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !contains {
|
|
|
|
|
groups = append(groups, g.name)
|
|
|
|
|
sess.SetAttribute(groupKey, groups)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
sess.SetAttribute(groupKey, []string{g.name})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, ok := g.conns[id]; !ok {
|
|
|
|
|
g.conns[id] = c
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", g.name, sess.ID(), sess.UID())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Leave remove specified UID related session from group
|
|
|
|
|
func (g *Group) Leave(c *Connection) error {
|
|
|
|
|
if g.isClosed() {
|
|
|
|
|
return ErrClosedGroup
|
|
|
|
|
}
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
sess := c.Session()
|
|
|
|
|
nlog.Debugf("Remove connection from group %s, UID=%s", g.name, sess.UID())
|
|
|
|
|
|
|
|
|
|
g.mu.Lock()
|
|
|
|
|
defer g.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if sess.Exists(groupKey) {
|
|
|
|
|
groups, ok := sess.Attribute(groupKey).([]string)
|
|
|
|
|
if !ok {
|
|
|
|
|
groups = make([]string, 0)
|
|
|
|
|
sess.SetAttribute(groupKey, groups)
|
|
|
|
|
}
|
|
|
|
|
groups = g.removeGroupAttr(groups)
|
|
|
|
|
|
|
|
|
|
if len(groups) == 0 {
|
|
|
|
|
sess.RemoveAttribute(groupKey)
|
|
|
|
|
} else {
|
|
|
|
|
sess.SetAttribute(groupKey, groups)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
delete(g.conns, sess.ID())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (g *Group) LeaveByUID(uid string) error {
|
|
|
|
|
if g.isClosed() {
|
|
|
|
|
return ErrClosedGroup
|
|
|
|
|
}
|
|
|
|
|
member, _ := g.Member(uid)
|
|
|
|
|
return g.Leave(member)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LeaveAll clear all sessions in the group
|
|
|
|
|
func (g *Group) LeaveAll() error {
|
|
|
|
|
if g.isClosed() {
|
|
|
|
|
return ErrClosedGroup
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g.mu.Lock()
|
|
|
|
|
defer g.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
for _, e := range g.conns {
|
|
|
|
|
sess := e.Session()
|
|
|
|
|
|
|
|
|
|
groups, ok := sess.Attribute(groupKey).([]string)
|
|
|
|
|
if !ok {
|
|
|
|
|
groups = make([]string, 0)
|
|
|
|
|
sess.SetAttribute(groupKey, groups)
|
|
|
|
|
}
|
|
|
|
|
groups = g.removeGroupAttr(groups)
|
|
|
|
|
|
|
|
|
|
if len(groups) == 0 {
|
|
|
|
|
sess.RemoveAttribute(groupKey)
|
|
|
|
|
} else {
|
|
|
|
|
sess.SetAttribute(groupKey, groups)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
g.conns = make(map[int64]*Connection)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 使用移位法移除group中与name匹配的元素
|
|
|
|
|
func (g *Group) removeGroupAttr(groups []string) []string {
|
|
|
|
|
j := 0
|
|
|
|
|
for _, v := range groups {
|
|
|
|
|
if v != g.name {
|
|
|
|
|
groups[j] = v
|
|
|
|
|
j++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return groups[:j]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Count get current member amount in the group
|
|
|
|
|
func (g *Group) Count() int {
|
|
|
|
|
g.mu.RLock()
|
|
|
|
|
defer g.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
return len(g.conns)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (g *Group) isClosed() bool {
|
|
|
|
|
if atomic.LoadInt32(&g.status) == groupStatusClosed {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close destroy group, which will release all resource in the group
|
|
|
|
|
func (g *Group) Close() error {
|
|
|
|
|
if g.isClosed() {
|
|
|
|
|
return ErrCloseClosedGroup
|
|
|
|
|
}
|
|
|
|
|
if g.name == DefaultGroupName {
|
|
|
|
|
// 默认分组不允许删除
|
|
|
|
|
return DeleteDefaultGroupNotAllow
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ = g.LeaveAll()
|
|
|
|
|
|
|
|
|
|
atomic.StoreInt32(&g.status, groupStatusClosed)
|
|
|
|
|
return nil
|
|
|
|
|
}
|