You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nnet/connection/group.go

246 lines
4.7 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package connection
import (
"errors"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"sync"
"sync/atomic"
)
const groupKey = "NNET_GROUP#"
const DefaultGroupName = "DEFAULT_GROUP"
const (
groupStatusWorking = 0
groupStatusClosed = 1
)
var (
ErrCloseClosedGroup = errors.New("close closed group")
ErrClosedGroup = errors.New("group closed")
DeleteDefaultGroupNotAllow = errors.New("delete default group not allow")
)
type Group struct {
mu sync.RWMutex
status int32 // group current status
name string // group name
conns map[int64]*Connection
}
func NewGroup(name string) *Group {
return &Group{
mu: sync.RWMutex{},
status: groupStatusWorking,
name: name,
conns: make(map[int64]*Connection),
}
}
// Member returns connection by specified uid
func (g *Group) Member(uid string) (*Connection, bool) {
g.mu.RLock()
defer g.mu.RUnlock()
for _, e := range g.conns {
if e.Session().UID() == uid {
return e, true
}
}
return nil, false
}
// MemberBySID returns specified sId's connection
func (g *Group) MemberBySID(id int64) (*Connection, bool) {
g.mu.RLock()
defer g.mu.RUnlock()
e, ok := g.conns[id]
return e, ok
}
func (g *Group) Members() []*Connection {
var resp []*Connection
g.PeekMembers(func(_ int64, c *Connection) bool {
resp = append(resp, c)
return false
})
return resp
}
// PeekMembers returns all members in current group
// fn 返回true跳过循环反之一直循环
func (g *Group) PeekMembers(fn func(sId int64, c *Connection) bool) {
g.mu.RLock()
defer g.mu.RUnlock()
for sId, c := range g.conns {
if fn(sId, c) {
break
}
}
}
// Contains check whether a UID is contained in current group or not
func (g *Group) Contains(uid string) bool {
_, ok := g.Member(uid)
return ok
}
// Add session to group
func (g *Group) Add(c *Connection) error {
if g.isClosed() {
return ErrClosedGroup
}
g.mu.Lock()
defer g.mu.Unlock()
sess := c.Session()
id := sess.ID()
// group attribute
if sess.Exists(groupKey) {
groups, ok := sess.Attribute(groupKey).([]string)
if !ok {
groups = make([]string, 0)
sess.SetAttribute(groupKey, groups)
}
contains := false
for _, group := range groups {
if group == g.name {
contains = true
break
}
}
if !contains {
groups = append(groups, g.name)
sess.SetAttribute(groupKey, groups)
}
} else {
sess.SetAttribute(groupKey, []string{g.name})
}
if _, ok := g.conns[id]; !ok {
g.conns[id] = c
}
nlog.Debugf("Add connection to group %s, ID=%d, UID=%s", g.name, sess.ID(), sess.UID())
return nil
}
// Leave remove specified UID related session from group
func (g *Group) Leave(c *Connection) error {
if g.isClosed() {
return ErrClosedGroup
}
if c == nil {
return nil
}
sess := c.Session()
nlog.Debugf("Remove connection from group %s, UID=%s", g.name, sess.UID())
g.mu.Lock()
defer g.mu.Unlock()
if sess.Exists(groupKey) {
groups, ok := sess.Attribute(groupKey).([]string)
if !ok {
groups = make([]string, 0)
sess.SetAttribute(groupKey, groups)
}
groups = g.removeGroupAttr(groups)
if len(groups) == 0 {
sess.RemoveAttribute(groupKey)
} else {
sess.SetAttribute(groupKey, groups)
}
}
delete(g.conns, sess.ID())
return nil
}
func (g *Group) LeaveByUID(uid string) error {
if g.isClosed() {
return ErrClosedGroup
}
member, _ := g.Member(uid)
return g.Leave(member)
}
// LeaveAll clear all sessions in the group
func (g *Group) LeaveAll() error {
if g.isClosed() {
return ErrClosedGroup
}
g.mu.Lock()
defer g.mu.Unlock()
for _, e := range g.conns {
sess := e.Session()
groups, ok := sess.Attribute(groupKey).([]string)
if !ok {
groups = make([]string, 0)
sess.SetAttribute(groupKey, groups)
}
groups = g.removeGroupAttr(groups)
if len(groups) == 0 {
sess.RemoveAttribute(groupKey)
} else {
sess.SetAttribute(groupKey, groups)
}
}
g.conns = make(map[int64]*Connection)
return nil
}
// 使用移位法移除group中与name匹配的元素
func (g *Group) removeGroupAttr(groups []string) []string {
j := 0
for _, v := range groups {
if v != g.name {
groups[j] = v
j++
}
}
return groups[:j]
}
// Count get current member amount in the group
func (g *Group) Count() int {
g.mu.RLock()
defer g.mu.RUnlock()
return len(g.conns)
}
func (g *Group) isClosed() bool {
if atomic.LoadInt32(&g.status) == groupStatusClosed {
return true
}
return false
}
// Close destroy group, which will release all resource in the group
func (g *Group) Close() error {
if g.isClosed() {
return ErrCloseClosedGroup
}
if g.name == DefaultGroupName {
// 默认分组不允许删除
return DeleteDefaultGroupNotAllow
}
_ = g.LeaveAll()
atomic.StoreInt32(&g.status, groupStatusClosed)
return nil
}