You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/connection/group.go

246 lines
4.7 KiB
Go

package connection
1 year ago
import (
"errors"
"git.noahlan.cn/noahlan/ntool/nlog"
1 year ago
"sync"
"sync/atomic"
)
const groupKey = "NNET_GROUP#"
const DefaultGroupName = "DEFAULT_GROUP"
const (
groupStatusWorking = 0
groupStatusClosed = 1
)
var (
ErrCloseClosedGroup = errors.New("close closed group")
ErrClosedGroup = errors.New("group closed")
DeleteDefaultGroupNotAllow = errors.New("delete default group not allow")
)
type Group struct {
mu sync.RWMutex
status int32 // group current status
name string // group name
conns map[int64]*Connection
1 year ago
}
func NewGroup(name string) *Group {
return &Group{
mu: sync.RWMutex{},
status: groupStatusWorking,
name: name,
conns: make(map[int64]*Connection),
1 year ago
}
}
// Member returns connection by specified uid
func (g *Group) Member(uid string) (*Connection, bool) {
g.mu.RLock()
defer g.mu.RUnlock()
1 year ago
for _, e := range g.conns {
1 year ago
if e.Session().UID() == uid {
return e, true
}
}
return nil, false
}
// MemberBySID returns specified sId's connection
func (g *Group) MemberBySID(id int64) (*Connection, bool) {
g.mu.RLock()
defer g.mu.RUnlock()
1 year ago
e, ok := g.conns[id]
1 year ago
return e, ok
}
func (g *Group) Members() []*Connection {
var resp []*Connection
g.PeekMembers(func(_ int64, c *Connection) bool {
resp = append(resp, c)
1 year ago
return false
})
return resp
}
// PeekMembers returns all members in current group
// fn 返回true跳过循环反之一直循环
func (g *Group) PeekMembers(fn func(sId int64, c *Connection) bool) {
g.mu.RLock()
defer g.mu.RUnlock()
1 year ago
for sId, c := range g.conns {
if fn(sId, c) {
1 year ago
break
}
}
}
// Contains check whether a UID is contained in current group or not
func (g *Group) Contains(uid string) bool {
_, ok := g.Member(uid)
1 year ago
return ok
}
// Add session to group
func (g *Group) Add(c *Connection) error {
if g.isClosed() {
1 year ago
return ErrClosedGroup
}
g.mu.Lock()
defer g.mu.Unlock()
1 year ago
sess := c.Session()
1 year ago
id := sess.ID()
// group attribute
if sess.Exists(groupKey) {
groups, ok := sess.Attribute(groupKey).([]string)
if !ok {
groups = make([]string, 0)
sess.SetAttribute(groupKey, groups)
}
contains := false
for _, group := range groups {
if group == g.name {
1 year ago
contains = true
break
}
}
if !contains {
groups = append(groups, g.name)
1 year ago
sess.SetAttribute(groupKey, groups)
}
} else {
sess.SetAttribute(groupKey, []string{g.name})
1 year ago
}
if _, ok := g.conns[id]; !ok {
g.conns[id] = c
1 year ago
}
nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", g.name, sess.ID(), sess.UID())
1 year ago
return nil
}
// Leave remove specified UID related session from group
func (g *Group) Leave(c *Connection) error {
if g.isClosed() {
1 year ago
return ErrClosedGroup
}
if c == nil {
1 year ago
return nil
}
sess := c.Session()
nlog.Debugf("Remove connection from group %s, UID=%s", g.name, sess.UID())
1 year ago
g.mu.Lock()
defer g.mu.Unlock()
1 year ago
if sess.Exists(groupKey) {
groups, ok := sess.Attribute(groupKey).([]string)
if !ok {
groups = make([]string, 0)
sess.SetAttribute(groupKey, groups)
}
groups = g.removeGroupAttr(groups)
1 year ago
if len(groups) == 0 {
sess.RemoveAttribute(groupKey)
} else {
sess.SetAttribute(groupKey, groups)
}
}
delete(g.conns, sess.ID())
1 year ago
return nil
}
func (g *Group) LeaveByUID(uid string) error {
if g.isClosed() {
1 year ago
return ErrClosedGroup
}
member, _ := g.Member(uid)
return g.Leave(member)
1 year ago
}
// LeaveAll clear all sessions in the group
func (g *Group) LeaveAll() error {
if g.isClosed() {
1 year ago
return ErrClosedGroup
}
g.mu.Lock()
defer g.mu.Unlock()
1 year ago
for _, e := range g.conns {
1 year ago
sess := e.Session()
groups, ok := sess.Attribute(groupKey).([]string)
if !ok {
groups = make([]string, 0)
sess.SetAttribute(groupKey, groups)
}
groups = g.removeGroupAttr(groups)
1 year ago
if len(groups) == 0 {
sess.RemoveAttribute(groupKey)
} else {
sess.SetAttribute(groupKey, groups)
}
}
g.conns = make(map[int64]*Connection)
1 year ago
return nil
}
// 使用移位法移除group中与name匹配的元素
func (g *Group) removeGroupAttr(groups []string) []string {
1 year ago
j := 0
for _, v := range groups {
if v != g.name {
groups[j] = v
1 year ago
j++
}
}
return groups[:j]
1 year ago
}
// Count get current member amount in the group
func (g *Group) Count() int {
g.mu.RLock()
defer g.mu.RUnlock()
1 year ago
return len(g.conns)
1 year ago
}
func (g *Group) isClosed() bool {
if atomic.LoadInt32(&g.status) == groupStatusClosed {
1 year ago
return true
}
return false
}
// Close destroy group, which will release all resource in the group
func (g *Group) Close() error {
if g.isClosed() {
1 year ago
return ErrCloseClosedGroup
}
if g.name == DefaultGroupName {
1 year ago
// 默认分组不允许删除
return DeleteDefaultGroupNotAllow
}
_ = g.LeaveAll()
1 year ago
atomic.StoreInt32(&g.status, groupStatusClosed)
1 year ago
return nil
}