package statistics import ( "context" "git.noahlan.cn/northlan/ntools-go/uuid" lru "github.com/hashicorp/golang-lru" zset "github.com/longzhiri/gozset" "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/zeromicro/go-zero/core/logx" pbVars "live-service/app/pb/vars" "live-service/app/user_center/model" "live-service/app/user_center/rpc/internal/svc" "sync" "time" ) var Service *Job type ( CachedUserInfo struct { UserId int64 Username string Avatar string } RangeItem struct { UserId int64 Username string Avatar string Score int64 } Range struct { Type pbVars.RankType Items []RangeItem } ZSetWithLock struct { *zset.ZSetInt sync.RWMutex } Job struct { ctx context.Context svcCtx *svc.ServiceContext // 实时排行榜(定期读取,半实时) rankByTypeMap map[pbVars.RankType]*ZSetWithLock // 用户数据表内存缓存 userCache *lru.Cache } ) func newZSetWithLock(scoreLessThan func(l, r int32) bool, rankN int) *ZSetWithLock { return &ZSetWithLock{ ZSetInt: zset.NewZSetInt(scoreLessThan, rankN), } } func InitRankJob(svcCtx *svc.ServiceContext) { lessFunc := func(l, r int32) bool { return l > r } uc, _ := lru.New(4*model.MaxRankN + 1000) Service = &Job{ ctx: context.Background(), svcCtx: svcCtx, rankByTypeMap: map[pbVars.RankType]*ZSetWithLock{ pbVars.RankType_Damage: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_DeDamage: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_General: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_DeGeneral: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_KillUnit: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_DeKillUnit: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_KillPlayer: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_DeKillPlayer: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_Win: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_Lost: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_FirstBlood: newZSetWithLock(lessFunc, model.MaxRankN), pbVars.RankType_DeFirstBlood: newZSetWithLock(lessFunc, model.MaxRankN), }, userCache: uc, } if !svcCtx.GameConfig.Rank.Enabled { return } Service.initJob() } // InitJob 初始化RankJob,启动时加载 // 1. 读取落地的排行榜数据,并Add到榜内(读取 用户数据) // 2. 开启任务 定时读取 分数大于排行榜最后一位的数据,存在则Update // 3. 开启任务 定时落库 func (j *Job) initJob() { logx.Info("开启排行榜服务...") cfg := j.svcCtx.GameConfig.Rank // job read and update c := cron.New() _, _ = c.AddFunc(cfg.Cron.Update, func() { go j.readAndUpdate(pbVars.RankType_Damage) go j.readAndUpdate(pbVars.RankType_DeDamage) go j.readAndUpdate(pbVars.RankType_General) go j.readAndUpdate(pbVars.RankType_DeGeneral) go j.readAndUpdate(pbVars.RankType_KillUnit) go j.readAndUpdate(pbVars.RankType_DeKillUnit) go j.readAndUpdate(pbVars.RankType_KillPlayer) go j.readAndUpdate(pbVars.RankType_DeKillPlayer) go j.readAndUpdate(pbVars.RankType_Win) go j.readAndUpdate(pbVars.RankType_Lost) go j.readAndUpdate(pbVars.RankType_FirstBlood) go j.readAndUpdate(pbVars.RankType_DeFirstBlood) }) // persistence _, _ = c.AddFunc(cfg.Cron.Persistence, func() { go j.persistence(pbVars.RankType_Damage) go j.persistence(pbVars.RankType_DeDamage) go j.persistence(pbVars.RankType_General) go j.persistence(pbVars.RankType_DeGeneral) go j.persistence(pbVars.RankType_KillUnit) go j.persistence(pbVars.RankType_DeKillUnit) go j.persistence(pbVars.RankType_KillPlayer) go j.persistence(pbVars.RankType_DeKillPlayer) go j.persistence(pbVars.RankType_Win) go j.persistence(pbVars.RankType_Lost) go j.persistence(pbVars.RankType_FirstBlood) go j.persistence(pbVars.RankType_DeFirstBlood) }) c.Start() } func (j *Job) CleanByType(rankType pbVars.RankType) bool { rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return false } rankZSet.Lock() defer rankZSet.Unlock() rankZSet.Clear() return true } func (j *Job) RankByScore(rankType pbVars.RankType, score int32) int32 { rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return 0 } rankZSet.Lock() defer rankZSet.Unlock() rank := rankZSet.RangeByScore(score, score) if len(rank) > 0 { return int32(rankZSet.Rank(rank[0])) } return 0 } func (j *Job) RangeRankByType(rankType pbVars.RankType, topN int32) Range { result := Range{Type: rankType} rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return result } if topN > model.MaxRankN { topN = model.MaxRankN } rankZSet.Lock() rank := rankZSet.RangeByRank(1, uint32(topN)) rankZSet.Unlock() // 这里make 减少扩容次数 result.Items = make([]RangeItem, 0, len(rank)) for _, r := range rank { uid := r[0] score := r[1] var item RangeItem var userInfoNeedUpdate bool if c, ok := j.userCache.Get(uid); ok { cached := c.(CachedUserInfo) if cached.Username == "" || cached.Avatar == "" { userInfoNeedUpdate = true } else { item = RangeItem{ UserId: cached.UserId, Username: cached.Username, Score: score, Avatar: cached.Avatar, } } } else { userInfoNeedUpdate = true } if userInfoNeedUpdate { dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, uid) if err != nil { item = RangeItem{ UserId: uid, Score: score, } } else { j.userCache.Add(dbUser.UserId, CachedUserInfo{ UserId: dbUser.UserId, Username: dbUser.PUname, Avatar: dbUser.PAvatar, }) item = RangeItem{ UserId: uid, Username: dbUser.PUname, Score: score, Avatar: dbUser.PAvatar, } } } result.Items = append(result.Items, item) } return result } func (j *Job) readAndUpdate(rankType pbVars.RankType) { rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } rankZSet.Lock() defer rankZSet.Unlock() rank := rankZSet.RangeByRank(1, model.MaxRankN) rankLen := len(rank) var score int64 if rankLen > 0 { // 取当前榜最后一名分数 score = rank[rankLen-1][1] } // 若榜内数量不够,则直接取 max - len 数量的人 score排序一下 limit := model.MaxRankN if rankLen < model.MaxRankN { limit = model.MaxRankN - rankLen + 1 // +1是避免取到自己 少取一位 } // 末位 score byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit) if err != nil { return } for _, s := range byScore { // 缓存用户信息 if ok := j.userCache.Contains(s.UserId); !ok { if dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, s.UserId); err == nil { j.userCache.Add(dbUser.UserId, CachedUserInfo{ UserId: dbUser.UserId, Username: dbUser.PUname, Avatar: dbUser.PAvatar, }) } } if _, ok := rankZSet.Score(s.UserId); ok { rankZSet.Update(s.UserId, int32(s.Score)) } else { rankZSet.Add(s.UserId, int32(s.Score)) } } } func (j *Job) persistence(rankType pbVars.RankType) { rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } rankZSet.Lock() rank := rankZSet.RangeByRank(1, model.MaxRankN) rankZSet.Unlock() dbModel := make([]model.RankPvp, 0, len(rank)) for _, r := range rank { uid := r[0] score := r[1] dbModel = append(dbModel, model.RankPvp{ Id: uuid.NextId(), UserId: uid, RankType: int64(rankType), Score: score, }) } if len(dbModel) > 0 { // 简单避免死锁 time.Sleep(1 * time.Second) if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, int64(rankType), dbModel); err != nil { logx.Error("更新排行榜错误", err) return } } } func (j *Job) getRankInstanceAndScoreType(rankType pbVars.RankType) (*ZSetWithLock, model.ScoreType, error) { var rankZSet *ZSetWithLock scoreType := model.ScoreTypeByRankType(rankType) rankZSet, _ = j.rankByTypeMap[rankType] if rankZSet == nil { return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType) } return rankZSet, scoreType, nil }