You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
4.9 KiB
Go
242 lines
4.9 KiB
Go
package ngs
|
|
|
|
import (
|
|
"fmt"
|
|
"git.noahlan.cn/northlan/ngs/internal/env"
|
|
"git.noahlan.cn/northlan/ngs/internal/log"
|
|
"git.noahlan.cn/northlan/ngs/internal/message"
|
|
"git.noahlan.cn/northlan/ngs/session"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
const (
|
|
groupStatusWorking = 0
|
|
groupStatusClosed = 1
|
|
)
|
|
|
|
// SessionFilter represents a filter which was used to filter session when Multicast,
|
|
// the session will receive the message while filter returns true.
|
|
type SessionFilter func(*session.Session) bool
|
|
|
|
// Group represents a session group which used to manage a number of
|
|
// sessions, data send to the group will send to all session in it.
|
|
type Group struct {
|
|
mu sync.RWMutex
|
|
status int32 // channel current status
|
|
name string // channel name
|
|
sessions map[int64]*session.Session // session id map to session instance
|
|
}
|
|
|
|
// NewGroup returns a new group instance
|
|
func NewGroup(n string) *Group {
|
|
return &Group{
|
|
status: groupStatusWorking,
|
|
name: n,
|
|
sessions: make(map[int64]*session.Session),
|
|
}
|
|
}
|
|
|
|
// Member returns specified UID's session
|
|
func (c *Group) Member(uid int64) (*session.Session, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
for _, s := range c.sessions {
|
|
if s.UID() == uid {
|
|
return s, nil
|
|
}
|
|
}
|
|
|
|
return nil, ErrMemberNotFound
|
|
}
|
|
|
|
// MemberBySID returns specified sId's session
|
|
func (c *Group) MemberBySID(id int64) (*session.Session, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
for sId, s := range c.sessions {
|
|
if sId == id {
|
|
return s, nil
|
|
}
|
|
}
|
|
|
|
return nil, ErrMemberNotFound
|
|
}
|
|
|
|
func (c *Group) Members() []*session.Session {
|
|
var resp []*session.Session
|
|
c.PeekMembers(func(_ int64, s *session.Session) bool {
|
|
resp = append(resp, s)
|
|
return true
|
|
})
|
|
return resp
|
|
}
|
|
|
|
// PeekMembers returns all members in current group
|
|
func (c *Group) PeekMembers(fn func(sId int64, s *session.Session) bool) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
for sId, s := range c.sessions {
|
|
if !fn(sId, s) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// MemberUIDs returns all member's UID in current group
|
|
func (c *Group) MemberUIDs() []int64 {
|
|
var members []int64
|
|
c.PeekMembers(func(_ int64, s *session.Session) bool {
|
|
members = append(members, s.UID())
|
|
return true
|
|
})
|
|
return members
|
|
}
|
|
|
|
// Multicast push the message to the filtered clients
|
|
func (c *Group) Multicast(route string, v interface{}, filter SessionFilter) error {
|
|
if c.isClosed() {
|
|
return ErrClosedGroup
|
|
}
|
|
|
|
data, err := message.Serialize(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.Debug {
|
|
log.Println(fmt.Sprintf("Multicast %s, Data=%+v", route, v))
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
for _, s := range c.sessions {
|
|
if !filter(s) {
|
|
continue
|
|
}
|
|
if err = s.Push(route, data); err != nil {
|
|
log.Println(err.Error())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Broadcast push the message(s) to all members
|
|
func (c *Group) Broadcast(route string, v interface{}) error {
|
|
if c.isClosed() {
|
|
return ErrClosedGroup
|
|
}
|
|
|
|
data, err := message.Serialize(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.Debug {
|
|
log.Println(fmt.Sprintf("Broadcast %s, Data=%+v", route, v))
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
for _, s := range c.sessions {
|
|
if err = s.Push(route, data); err != nil {
|
|
log.Println(fmt.Sprintf("Session push message error, ID=%d, UID=%d, Error=%s", s.ID(), s.UID(), err.Error()))
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Contains check whether a UID is contained in current group or not
|
|
func (c *Group) Contains(uid int64) bool {
|
|
_, err := c.Member(uid)
|
|
return err == nil
|
|
}
|
|
|
|
// Add add session to group
|
|
func (c *Group) Add(session *session.Session) error {
|
|
if c.isClosed() {
|
|
return ErrClosedGroup
|
|
}
|
|
|
|
if env.Debug {
|
|
log.Println(fmt.Sprintf("Add session to group %s, ID=%d, UID=%d", c.name, session.ID(), session.UID()))
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
id := session.ID()
|
|
_, ok := c.sessions[session.ID()]
|
|
if ok {
|
|
return ErrSessionDuplication
|
|
}
|
|
|
|
c.sessions[id] = session
|
|
return nil
|
|
}
|
|
|
|
// Leave remove specified UID related session from group
|
|
func (c *Group) Leave(s *session.Session) error {
|
|
if c.isClosed() {
|
|
return ErrClosedGroup
|
|
}
|
|
|
|
if env.Debug {
|
|
log.Println(fmt.Sprintf("Remove session from group %s, UID=%d", c.name, s.UID()))
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
delete(c.sessions, s.ID())
|
|
return nil
|
|
}
|
|
|
|
// LeaveAll clear all sessions in the group
|
|
func (c *Group) LeaveAll() error {
|
|
if c.isClosed() {
|
|
return ErrClosedGroup
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
c.sessions = make(map[int64]*session.Session)
|
|
return nil
|
|
}
|
|
|
|
// Count get current member amount in the group
|
|
func (c *Group) Count() int {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return len(c.sessions)
|
|
}
|
|
|
|
func (c *Group) isClosed() bool {
|
|
if atomic.LoadInt32(&c.status) == groupStatusClosed {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Close destroy group, which will release all resource in the group
|
|
func (c *Group) Close() error {
|
|
if c.isClosed() {
|
|
return ErrCloseClosedGroup
|
|
}
|
|
|
|
atomic.StoreInt32(&c.status, groupStatusClosed)
|
|
|
|
// release all reference
|
|
c.sessions = make(map[int64]*session.Session)
|
|
return nil
|
|
}
|