package rank import ( "context" "git.noahlan.cn/northlan/ntools-go/uuid" lru "github.com/hashicorp/golang-lru" zset "github.com/longzhiri/gozset" "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/zeromicro/go-zero/core/logx" "live-service/app/user_center/model" "live-service/app/user_center/rpc/internal/svc" "live-service/app/user_center/rpc/pb" "time" ) var Service *Job type ( CachedUserInfo struct { UserId int64 Username string Avatar string } Job struct { ctx context.Context svcCtx *svc.ServiceContext // 实时排行榜(定期读取,半实时) rankByTypeMap map[pb.RankType]*zset.ZSetInt //damageRank *zset.ZSetInt //deDamageRank *zset.ZSetInt //generalRank *zset.ZSetInt //deGeneralRank *zset.ZSetInt //killUnitRank *zset.ZSetInt //deKillUnitRank *zset.ZSetInt //killPlayerRank *zset.ZSetInt //deKillPlayerRank *zset.ZSetInt //winRank *zset.ZSetInt //lostRank *zset.ZSetInt //firstBloodRank *zset.ZSetInt //deFirstBloodRank *zset.ZSetInt // 用户数据表内存缓存 userCache *lru.Cache } ) func InitRankJob(svcCtx *svc.ServiceContext) { lessFunc := func(l, r int32) bool { return l > r } uc, _ := lru.New(4*model.MaxRankN + 1000) Service = &Job{ ctx: context.Background(), svcCtx: svcCtx, rankByTypeMap: map[pb.RankType]*zset.ZSetInt{ pb.RankType_Damage: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_DeDamage: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_General: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_DeGeneral: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_KillUnit: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_DeKillUnit: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_KillPlayer: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_DeKillPlayer: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_Win: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_Lost: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_FirstBlood: zset.NewZSetInt(lessFunc, model.MaxRankN), pb.RankType_DeFirstBlood: zset.NewZSetInt(lessFunc, model.MaxRankN), }, userCache: uc, } if !svcCtx.Config.Rank.Enabled { return } Service.initJob() } // InitJob 初始化RankJob,启动时加载 // 1. 读取落地的排行榜数据,并Add到榜内(读取 用户数据) // 2. 开启任务 定时读取 分数大于排行榜最后一位的数据,存在则Update // 3. 开启任务 定时落库 func (j *Job) initJob() { logx.Info("开启排行榜服务...") // TODO 读取历史数据功能暂时移除 //j.initByType(model.RankTypeDamage) //j.initByType(model.RankTypeDeDamage) //j.initByType(model.RankTypeGeneral) //j.initByType(model.RankTypeDeGeneral) //j.initByType(model.RankTypeKillUnit) //j.initByType(model.RankTypeDeKillUnit) //j.initByType(model.RankTypeKillPlayer) //j.initByType(model.RankTypeDeKillPlayer) //j.initByType(model.RankTypeWin) //j.initByType(model.RankTypeLost) //j.initByType(model.RankTypeFirstBlood) //j.initByType(model.RankTypeDeFirstBlood) cfg := j.svcCtx.Config.Rank // job read and update c := cron.New() _, _ = c.AddFunc(cfg.Cron.Update, func() { go j.readAndUpdate(pb.RankType_Damage) go j.readAndUpdate(pb.RankType_DeDamage) go j.readAndUpdate(pb.RankType_General) go j.readAndUpdate(pb.RankType_DeGeneral) go j.readAndUpdate(pb.RankType_KillUnit) go j.readAndUpdate(pb.RankType_DeKillUnit) go j.readAndUpdate(pb.RankType_KillPlayer) go j.readAndUpdate(pb.RankType_DeKillPlayer) go j.readAndUpdate(pb.RankType_Win) go j.readAndUpdate(pb.RankType_Lost) go j.readAndUpdate(pb.RankType_FirstBlood) go j.readAndUpdate(pb.RankType_DeFirstBlood) }) // persistence _, _ = c.AddFunc(cfg.Cron.Persistence, func() { go j.persistence(pb.RankType_Damage) go j.persistence(pb.RankType_DeDamage) go j.persistence(pb.RankType_General) go j.persistence(pb.RankType_DeGeneral) go j.persistence(pb.RankType_KillUnit) go j.persistence(pb.RankType_DeKillUnit) go j.persistence(pb.RankType_KillPlayer) go j.persistence(pb.RankType_DeKillPlayer) go j.persistence(pb.RankType_Win) go j.persistence(pb.RankType_Lost) go j.persistence(pb.RankType_FirstBlood) go j.persistence(pb.RankType_DeFirstBlood) }) c.Start() } func (j *Job) RankByScore(rankType pb.RankType, score int32) int32 { rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return 0 } rank := rankZSet.RangeByScore(score, score) if len(rank) > 0 { return int32(rankZSet.Rank(rank[0])) } return 0 } func (j *Job) RangeRankByType(rankType pb.RankType, topN int32) *pb.RankPvpResp { result := &pb.RankPvpResp{ Type: int32(rankType), } rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return result } if topN > model.MaxRankN { topN = model.MaxRankN } rank := rankZSet.RangeByRank(1, uint32(topN)) // 这里make 减少扩容次数 result.Items = make([]*pb.RankPvpResp_Item, 0, len(rank)) for _, r := range rank { uid := r[0] score := r[1] var item pb.RankPvpResp_Item if c, ok := j.userCache.Get(uid); ok { cached := c.(CachedUserInfo) item = pb.RankPvpResp_Item{ Uid: cached.UserId, Uname: cached.Username, Score: score, Avatar: cached.Avatar, } } else { dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, uid) if err != nil { item = pb.RankPvpResp_Item{ Uid: uid, Score: score, } } else { item = pb.RankPvpResp_Item{ Uid: uid, Uname: dbUser.PUname, Score: score, Avatar: dbUser.PAvatar, } } } result.Items = append(result.Items, &item) } return result } func (j *Job) initByType(rankType pb.RankType) { list, err := j.svcCtx.RankPvpModel.RankListByType(j.ctx, int64(rankType), model.MaxRankN) if err != nil { return } rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } for _, user := range list { // 缓存用户信息 j.userCache.Add(user.UserId, CachedUserInfo{ UserId: user.UserId, Username: user.PUname, Avatar: user.PAvatar, }) rankZSet.Add(user.UserId, int32(user.Score)) } } func (j *Job) readAndUpdate(rankType pb.RankType) { rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } rank := rankZSet.RangeByRank(1, model.MaxRankN) rankLen := len(rank) var score int64 if rankLen > 0 { // 取当前榜最后一名分数 score = rank[rankLen-1][1] } // 若榜内数量不够,则直接取 max - len 数量的人 score排序一下 limit := model.MaxRankN if rankLen < model.MaxRankN { limit = model.MaxRankN - rankLen + 1 // +1是避免取到自己 少取一位 } // 末位 score byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit) if err != nil { return } for _, s := range byScore { // 缓存用户信息 if ok := j.userCache.Contains(s.UserId); !ok { if dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, s.UserId); err == nil { j.userCache.Add(dbUser.UserId, CachedUserInfo{ UserId: dbUser.UserId, Username: dbUser.PUname, Avatar: dbUser.PAvatar, }) } } if _, ok := rankZSet.Score(s.UserId); ok { rankZSet.Update(s.UserId, int32(s.Score)) } else { rankZSet.Add(s.UserId, int32(s.Score)) } } } func (j *Job) persistence(rankType pb.RankType) { rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } rank := rankZSet.RangeByRank(1, model.MaxRankN) dbModel := make([]model.RankPvp, 0, len(rank)) for _, r := range rank { uid := r[0] score := r[1] dbModel = append(dbModel, model.RankPvp{ Id: uuid.NextId(), UserId: uid, RankType: int64(rankType), Score: score, }) } if len(dbModel) > 0 { // 简单避免死锁 time.Sleep(1 * time.Second) if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, int64(rankType), dbModel); err != nil { logx.Error("更新排行榜错误", err) return } } } func (j *Job) getRankInstanceAndScoreType(rankType pb.RankType) (*zset.ZSetInt, model.ScoreType, error) { var rankZSet *zset.ZSetInt scoreType := model.ScoreTypeByRankType(rankType) rankZSet, _ = j.rankByTypeMap[rankType] if rankZSet == nil { return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType) } return rankZSet, scoreType, nil }