package ngs import ( "fmt" "git.noahlan.cn/northlan/ngs/internal/env" "git.noahlan.cn/northlan/ngs/internal/log" "git.noahlan.cn/northlan/ngs/internal/message" "git.noahlan.cn/northlan/ngs/session" "sync" "sync/atomic" ) const ( groupStatusWorking = 0 groupStatusClosed = 1 ) // SessionFilter represents a filter which was used to filter session when Multicast, // the session will receive the message while filter returns true. type SessionFilter func(*session.Session) bool // Group represents a session group which used to manage a number of // sessions, data send to the group will send to all session in it. type Group struct { mu sync.RWMutex status int32 // channel current status name string // channel name sessions map[int64]*session.Session // session id map to session instance } // NewGroup returns a new group instance func NewGroup(n string) *Group { return &Group{ status: groupStatusWorking, name: n, sessions: make(map[int64]*session.Session), } } // Member returns specified UID's session func (c *Group) Member(uid int64) (*session.Session, error) { c.mu.RLock() defer c.mu.RUnlock() for _, s := range c.sessions { if s.UID() == uid { return s, nil } } return nil, ErrMemberNotFound } // Members returns all members in current group func (c *Group) Members() map[int64]*session.Session { c.mu.RLock() defer c.mu.RUnlock() return c.sessions } // MemberIDs returns all member's UID in current group func (c *Group) MemberIDs() []int64 { c.mu.RLock() defer c.mu.RUnlock() var members []int64 for _, s := range c.sessions { members = append(members, s.UID()) } return members } // Multicast push the message to the filtered clients func (c *Group) Multicast(route string, v interface{}, filter SessionFilter) error { if c.isClosed() { return ErrClosedGroup } data, err := message.Serialize(v) if err != nil { return err } if env.Debug { log.Println(fmt.Sprintf("Multicast %s, Data=%+v", route, v)) } c.mu.RLock() defer c.mu.RUnlock() for _, s := range c.sessions { if !filter(s) { continue } if err = s.Push(route, data); err != nil { log.Println(err.Error()) } } return nil } // Broadcast push the message(s) to all members func (c *Group) Broadcast(route string, v interface{}) error { if c.isClosed() { return ErrClosedGroup } data, err := message.Serialize(v) if err != nil { return err } if env.Debug { log.Println(fmt.Sprintf("Broadcast %s, Data=%+v", route, v)) } c.mu.RLock() defer c.mu.RUnlock() for _, s := range c.sessions { if err = s.Push(route, data); err != nil { log.Println(fmt.Sprintf("Session push message error, ID=%d, UID=%d, Error=%s", s.ID(), s.UID(), err.Error())) } } return err } // Contains check whether a UID is contained in current group or not func (c *Group) Contains(uid int64) bool { _, err := c.Member(uid) return err == nil } // Add add session to group func (c *Group) Add(session *session.Session) error { if c.isClosed() { return ErrClosedGroup } if env.Debug { log.Println(fmt.Sprintf("Add session to group %s, ID=%d, UID=%d", c.name, session.ID(), session.UID())) } c.mu.Lock() defer c.mu.Unlock() id := session.ID() _, ok := c.sessions[session.ID()] if ok { return ErrSessionDuplication } c.sessions[id] = session return nil } // Leave remove specified UID related session from group func (c *Group) Leave(s *session.Session) error { if c.isClosed() { return ErrClosedGroup } if env.Debug { log.Println(fmt.Sprintf("Remove session from group %s, UID=%d", c.name, s.UID())) } c.mu.Lock() defer c.mu.Unlock() delete(c.sessions, s.ID()) return nil } // LeaveAll clear all sessions in the group func (c *Group) LeaveAll() error { if c.isClosed() { return ErrClosedGroup } c.mu.Lock() defer c.mu.Unlock() c.sessions = make(map[int64]*session.Session) return nil } // Count get current member amount in the group func (c *Group) Count() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.sessions) } func (c *Group) isClosed() bool { if atomic.LoadInt32(&c.status) == groupStatusClosed { return true } return false } // Close destroy group, which will release all resource in the group func (c *Group) Close() error { if c.isClosed() { return ErrCloseClosedGroup } atomic.StoreInt32(&c.status, groupStatusClosed) // release all reference c.sessions = make(map[int64]*session.Session) return nil }