package rank import ( "context" "git.noahlan.cn/northlan/ntools-go/uuid" lru "github.com/hashicorp/golang-lru" zset "github.com/longzhiri/gozset" "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/zeromicro/go-zero/core/logx" "live-service/app/user_center/model" "live-service/app/user_center/rpc/internal/svc" "live-service/app/user_center/rpc/pb" "time" ) var Service *Job type ( CachedUserInfo struct { UserId int64 Username string Avatar string } Job struct { ctx context.Context svcCtx *svc.ServiceContext // 实时排行榜(定期读取,半实时) damageRank *zset.ZSetInt deDamageRank *zset.ZSetInt generalRank *zset.ZSetInt deGeneralRank *zset.ZSetInt killUnitRank *zset.ZSetInt deKillUnitRank *zset.ZSetInt killPlayerRank *zset.ZSetInt deKillPlayerRank *zset.ZSetInt winRank *zset.ZSetInt lostRank *zset.ZSetInt firstBloodRank *zset.ZSetInt deFirstBloodRank *zset.ZSetInt // 用户数据表内存缓存 userCache *lru.Cache } ) func InitRankJob(svcCtx *svc.ServiceContext) { lessFunc := func(l, r int32) bool { return l > r } uc, _ := lru.New(4*model.MaxRankN + 1000) Service = &Job{ ctx: context.Background(), svcCtx: svcCtx, damageRank: zset.NewZSetInt(lessFunc, model.MaxRankN), deDamageRank: zset.NewZSetInt(lessFunc, model.MaxRankN), generalRank: zset.NewZSetInt(lessFunc, model.MaxRankN), deGeneralRank: zset.NewZSetInt(lessFunc, model.MaxRankN), killUnitRank: zset.NewZSetInt(lessFunc, model.MaxRankN), deKillUnitRank: zset.NewZSetInt(lessFunc, model.MaxRankN), killPlayerRank: zset.NewZSetInt(lessFunc, model.MaxRankN), deKillPlayerRank: zset.NewZSetInt(lessFunc, model.MaxRankN), winRank: zset.NewZSetInt(lessFunc, model.MaxRankN), lostRank: zset.NewZSetInt(lessFunc, model.MaxRankN), firstBloodRank: zset.NewZSetInt(lessFunc, model.MaxRankN), deFirstBloodRank: zset.NewZSetInt(lessFunc, model.MaxRankN), userCache: uc, } Service.initJob() } // InitJob 初始化RankJob,启动时加载 // 1. 读取落地的排行榜数据,并Add到榜内(读取 用户数据) // 2. 开启任务 定时读取 分数大于排行榜最后一位的数据,存在则Update // 3. 开启任务 定时落库 func (j *Job) initJob() { logx.Info("开启排行榜服务...") j.initByType(model.RankTypeDamage) j.initByType(model.RankTypeDeDamage) j.initByType(model.RankTypeGeneral) j.initByType(model.RankTypeDeGeneral) j.initByType(model.RankTypeKillUnit) j.initByType(model.RankTypeDeKillUnit) j.initByType(model.RankTypeKillPlayer) j.initByType(model.RankTypeDeKillPlayer) j.initByType(model.RankTypeWin) j.initByType(model.RankTypeLost) j.initByType(model.RankTypeFirstBlood) j.initByType(model.RankTypeDeFirstBlood) // job read and update c := cron.New() _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeDamage) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeDeDamage) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeGeneral) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeDeGeneral) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeKillUnit) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeDeKillUnit) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeKillPlayer) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeDeKillPlayer) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeWin) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeLost) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeFirstBlood) }) _, _ = c.AddFunc("@every 10s", func() { j.readAndUpdate(model.RankTypeDeFirstBlood) }) // persistence _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeDamage) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeDeDamage) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeGeneral) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeDeGeneral) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeKillUnit) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeDeKillUnit) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeKillPlayer) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeDeKillPlayer) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeWin) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeLost) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeFirstBlood) }) _, _ = c.AddFunc("@every 10m", func() { j.persistence(model.RankTypeDeFirstBlood) }) c.Start() } func (j *Job) RangeRankByType(rankType, topN int32) *pb.RankPvpResp { result := &pb.RankPvpResp{ Type: rankType, } rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return result } if topN > model.MaxRankN { topN = model.MaxRankN } rank := rankZSet.RangeByRank(1, uint32(topN)) // 这里make 减少扩容次数 result.Items = make([]*pb.RankPvpResp_Item, 0, len(rank)) for _, r := range rank { uid := r[0] score := r[1] var item pb.RankPvpResp_Item if c, ok := j.userCache.Get(uid); ok { cached := c.(CachedUserInfo) item = pb.RankPvpResp_Item{ Uid: cached.UserId, Uname: cached.Username, Score: score, Avatar: cached.Avatar, } } else { dbUser, err := j.svcCtx.UserPlatformModel.FindOneForRankByUserId(j.ctx, uid) if err != nil { item = pb.RankPvpResp_Item{ Uid: uid, Score: score, } } else { item = pb.RankPvpResp_Item{ Uid: uid, Uname: dbUser.PUname, Score: score, Avatar: dbUser.PAvatar, } } } result.Items = append(result.Items, &item) } return result } func (j *Job) initByType(rankType int32) { list, err := j.svcCtx.RankPvpModel.RankListByType(j.ctx, rankType, model.MaxRankN) if err != nil { return } rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } for _, user := range list { // 缓存用户信息 j.userCache.Add(user.UserId, CachedUserInfo{ UserId: user.UserId, Username: user.PUname, Avatar: user.PAvatar, }) rankZSet.Add(user.UserId, int32(user.Score)) } } func (j *Job) readAndUpdate(rankType int32) { rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } rank := rankZSet.RangeByRank(1, model.MaxRankN) var score int64 if len(rank) == 0 { score = 0 } else { last := rank[len(rank)-1] score = last[1] } // 若榜内数量不够,则直接取 max - current 数量的人 score排序一下 limit := model.MaxRankN if len(rank) < model.MaxRankN { limit = model.MaxRankN - len(rank) score = 0 } // 末位 score byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit) if err != nil { return } for _, s := range byScore { // 缓存用户信息 if ok := j.userCache.Contains(s.UserId); !ok { if dbUser, err := j.svcCtx.UserPlatformModel.FindOneForRankByUserId(j.ctx, s.UserId); err == nil { j.userCache.Add(dbUser.UserId, CachedUserInfo{ UserId: dbUser.UserId, Username: dbUser.PUname, Avatar: dbUser.PAvatar, }) } } if _, ok := rankZSet.Score(s.UserId); ok { rankZSet.Update(s.UserId, int32(s.Score)) } else { rankZSet.Add(s.UserId, int32(s.Score)) } } } func (j *Job) persistence(rankType int32) { rankZSet, _, err := j.getRankInstanceAndScoreType(rankType) if err != nil { return } rank := rankZSet.RangeByRank(1, model.MaxRankN) dbModel := make([]model.RankPvp, 0, len(rank)) for _, r := range rank { uid := r[0] score := r[1] dbModel = append(dbModel, model.RankPvp{ Id: uuid.NextId(), UserId: uid, RankType: int64(rankType), Score: score, }) } if len(dbModel) > 0 { // 简单避免死锁 time.Sleep(1 * time.Second) if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, rankType, dbModel); err != nil { logx.Error("更新排行榜错误", err) return } } } func (j *Job) getRankInstanceAndScoreType(rankType int32) (*zset.ZSetInt, model.ScoreType, error) { var rankZSet *zset.ZSetInt scoreType := model.ScoreTypeDamage switch rankType { case model.RankTypeDamage: rankZSet = j.damageRank scoreType = model.ScoreTypeDamage case model.RankTypeDeDamage: rankZSet = j.deDamageRank scoreType = model.ScoreTypeDeDamage case model.RankTypeGeneral: rankZSet = j.generalRank scoreType = model.ScoreTypeGeneral case model.RankTypeDeGeneral: rankZSet = j.deGeneralRank scoreType = model.ScoreTypeDeGeneral case model.RankTypeKillUnit: rankZSet = j.killUnitRank scoreType = model.ScoreTypeKillUnit case model.RankTypeDeKillUnit: rankZSet = j.deKillUnitRank scoreType = model.ScoreTypeDeKillUnit case model.RankTypeKillPlayer: rankZSet = j.killPlayerRank scoreType = model.ScoreTypeKillPlayer case model.RankTypeDeKillPlayer: rankZSet = j.deKillPlayerRank scoreType = model.ScoreTypeDeKillPlayer case model.RankTypeWin: rankZSet = j.winRank scoreType = model.ScoreTypeWin case model.RankTypeLost: rankZSet = j.lostRank scoreType = model.ScoreTypeLost case model.RankTypeFirstBlood: rankZSet = j.firstBloodRank scoreType = model.ScoreTypeFirstBlood case model.RankTypeDeFirstBlood: rankZSet = j.deFirstBloodRank scoreType = model.ScoreTypeDeFirstBlood } if rankZSet == nil { return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType) } return rankZSet, scoreType, nil }