You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ngs/examples/singleon/chat/main.go

162 lines
3.9 KiB
Go

package main
import (
"fmt"
"git.noahlan.cn/northlan/ngs"
"git.noahlan.cn/northlan/ngs/component"
"git.noahlan.cn/northlan/ngs/pipeline"
"git.noahlan.cn/northlan/ngs/scheduler"
"git.noahlan.cn/northlan/ngs/serialize/json"
"git.noahlan.cn/northlan/ngs/session"
"log"
"net/http"
"strings"
"time"
)
type (
Room struct {
group *ngs.Group
}
// RoomManager represents a component that contains a bundle of room
RoomManager struct {
component.Base
timer *scheduler.Timer
rooms map[int]*Room
}
// UserMessage represents a message that user sent
UserMessage struct {
Name string `json:"name"`
Content string `json:"content"`
}
// NewUser message will be received when new user join room
NewUser struct {
Content string `json:"content"`
}
// AllMembers contains all members uid
AllMembers struct {
Members []int64 `json:"members"`
}
// JoinResponse represents the result of joining room
JoinResponse struct {
Code int `json:"code"`
Result string `json:"result"`
}
stats struct {
component.Base
timer *scheduler.Timer
outboundBytes int
inboundBytes int
}
)
func (stats *stats) outbound(s *session.Session, msg *pipeline.Message) error {
stats.outboundBytes += len(msg.Data)
return nil
}
func (stats *stats) inbound(s *session.Session, msg *pipeline.Message) error {
stats.inboundBytes += len(msg.Data)
return nil
}
func (stats *stats) AfterInit() {
stats.timer = scheduler.NewTimer(time.Minute, func() {
println("OutboundBytes", stats.outboundBytes)
println("InboundBytes", stats.outboundBytes)
})
}
const (
testRoomID = 1
roomIDKey = "ROOM_ID"
)
func NewRoomManager() *RoomManager {
return &RoomManager{
rooms: map[int]*Room{},
}
}
// AfterInit component lifetime callback
func (mgr *RoomManager) AfterInit() {
session.Lifetime.OnClosed(func(s *session.Session) {
if !s.HasKey(roomIDKey) {
return
}
room := s.Value(roomIDKey).(*Room)
room.group.Leave(s)
})
mgr.timer = scheduler.NewTimer(time.Minute, func() {
for roomId, room := range mgr.rooms {
println(fmt.Sprintf("UserCount: RoomID=%d, Time=%s, Count=%d",
roomId, time.Now().String(), room.group.Count()))
}
})
}
// Join room
func (mgr *RoomManager) Join(s *session.Session, msg []byte) error {
// NOTE: join test room only in demo
room, found := mgr.rooms[testRoomID]
if !found {
room = &Room{
group: ngs.NewGroup(fmt.Sprintf("room-%d", testRoomID)),
}
mgr.rooms[testRoomID] = room
}
fakeUID := s.ID() //just use s.ID as uid !!!
s.Bind(fakeUID) // binding session uids.Set(roomIDKey, room)
s.Set(roomIDKey, room)
s.Push("onMembers", &AllMembers{Members: room.group.MemberIDs()})
// notify others
room.group.Broadcast("onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
// new user join group
room.group.Add(s) // add session to group
return s.Response(&JoinResponse{Result: "success"})
}
// Message sync last message to all members
func (mgr *RoomManager) Message(s *session.Session, msg *UserMessage) error {
if !s.HasKey(roomIDKey) {
return fmt.Errorf("not join room yet")
}
room := s.Value(roomIDKey).(*Room)
return room.group.Broadcast("onMessage", msg)
}
func main() {
components := &component.Components{}
components.Register(
NewRoomManager(),
component.WithName("room"), // rewrite component and handler name
component.WithNameFunc(strings.ToLower),
)
// traffic stats
pip := pipeline.New()
var stats = &stats{}
pip.Outbound().PushBack(stats.outbound)
pip.Inbound().PushBack(stats.inbound)
log.SetFlags(log.LstdFlags | log.Llongfile)
http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir("web"))))
ngs.Listen(":3250",
ngs.WithIsWebsocket(true),
ngs.WithPipeline(pip),
ngs.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
ngs.WithWSPath("/ngs"),
ngs.WithDebugMode(),
ngs.WithSerializer(json.NewSerializer()), // override default serializer
ngs.WithComponents(components),
)
}