You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
10 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package rank
import (
"context"
"git.noahlan.cn/northlan/ntools-go/uuid"
lru "github.com/hashicorp/golang-lru"
zset "github.com/longzhiri/gozset"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/core/logx"
"live-service/app/user_center/model"
"live-service/app/user_center/rpc/internal/svc"
"live-service/app/user_center/rpc/pb"
"time"
)
var Service *Job
type (
CachedUserInfo struct {
UserId int64
Username string
Avatar string
}
Job struct {
ctx context.Context
svcCtx *svc.ServiceContext
// 实时排行榜(定期读取,半实时)
damageRank *zset.ZSetInt
deDamageRank *zset.ZSetInt
generalRank *zset.ZSetInt
deGeneralRank *zset.ZSetInt
killUnitRank *zset.ZSetInt
deKillUnitRank *zset.ZSetInt
killPlayerRank *zset.ZSetInt
deKillPlayerRank *zset.ZSetInt
winRank *zset.ZSetInt
lostRank *zset.ZSetInt
firstBloodRank *zset.ZSetInt
deFirstBloodRank *zset.ZSetInt
// 用户数据表内存缓存
userCache *lru.Cache
}
)
func InitRankJob(svcCtx *svc.ServiceContext) {
lessFunc := func(l, r int32) bool {
return l > r
}
uc, _ := lru.New(4*model.MaxRankN + 1000)
Service = &Job{
ctx: context.Background(),
svcCtx: svcCtx,
damageRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
deDamageRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
generalRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
deGeneralRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
killUnitRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
deKillUnitRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
killPlayerRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
deKillPlayerRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
winRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
lostRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
firstBloodRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
deFirstBloodRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
userCache: uc,
}
if !svcCtx.Config.Rank.Enabled {
return
}
Service.initJob()
}
// InitJob 初始化RankJob,启动时加载
// 1. 读取落地的排行榜数据并Add到榜内读取 用户数据)
// 2. 开启任务 定时读取 分数大于排行榜最后一位的数据存在则Update
// 3. 开启任务 定时落库
func (j *Job) initJob() {
logx.Info("开启排行榜服务...")
j.initByType(model.RankTypeDamage)
j.initByType(model.RankTypeDeDamage)
j.initByType(model.RankTypeGeneral)
j.initByType(model.RankTypeDeGeneral)
j.initByType(model.RankTypeKillUnit)
j.initByType(model.RankTypeDeKillUnit)
j.initByType(model.RankTypeKillPlayer)
j.initByType(model.RankTypeDeKillPlayer)
j.initByType(model.RankTypeWin)
j.initByType(model.RankTypeLost)
j.initByType(model.RankTypeFirstBlood)
j.initByType(model.RankTypeDeFirstBlood)
cfg := j.svcCtx.Config.Rank
// job read and update
c := cron.New()
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeDamage)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeDeDamage)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeGeneral)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeDeGeneral)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeKillUnit)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeDeKillUnit)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeKillPlayer)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeDeKillPlayer)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeWin)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeLost)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeFirstBlood)
})
_, _ = c.AddFunc(cfg.Cron.Update, func() {
j.readAndUpdate(model.RankTypeDeFirstBlood)
})
// persistence
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeDamage)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeDeDamage)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeGeneral)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeDeGeneral)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeKillUnit)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeDeKillUnit)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeKillPlayer)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeDeKillPlayer)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeWin)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeLost)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeFirstBlood)
})
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
j.persistence(model.RankTypeDeFirstBlood)
})
c.Start()
}
func (j *Job) RangeRankByType(rankType, topN int32) *pb.RankPvpResp {
result := &pb.RankPvpResp{
Type: rankType,
}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return result
}
if topN > model.MaxRankN {
topN = model.MaxRankN
}
rank := rankZSet.RangeByRank(1, uint32(topN))
// 这里make 减少扩容次数
result.Items = make([]*pb.RankPvpResp_Item, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
var item pb.RankPvpResp_Item
if c, ok := j.userCache.Get(uid); ok {
cached := c.(CachedUserInfo)
item = pb.RankPvpResp_Item{
Uid: cached.UserId,
Uname: cached.Username,
Score: score,
Avatar: cached.Avatar,
}
} else {
dbUser, err := j.svcCtx.UserPlatformModel.FindOneForRankByUserId(j.ctx, uid)
if err != nil {
item = pb.RankPvpResp_Item{
Uid: uid,
Score: score,
}
} else {
item = pb.RankPvpResp_Item{
Uid: uid,
Uname: dbUser.PUname,
Score: score,
Avatar: dbUser.PAvatar,
}
}
}
result.Items = append(result.Items, &item)
}
return result
}
func (j *Job) initByType(rankType int32) {
list, err := j.svcCtx.RankPvpModel.RankListByType(j.ctx, rankType, model.MaxRankN)
if err != nil {
return
}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
for _, user := range list {
// 缓存用户信息
j.userCache.Add(user.UserId, CachedUserInfo{
UserId: user.UserId,
Username: user.PUname,
Avatar: user.PAvatar,
})
rankZSet.Add(user.UserId, int32(user.Score))
}
}
func (j *Job) readAndUpdate(rankType int32) {
rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rank := rankZSet.RangeByRank(1, model.MaxRankN)
rankLen := len(rank)
var score int64
if rankLen == 0 {
score = 0
} else {
// 取当前榜最后一名分数
score = rank[rankLen-1][1]
}
// 若榜内数量不够,则直接取 max - len 数量的人 score排序一下
limit := model.MaxRankN
if rankLen < model.MaxRankN {
limit = model.MaxRankN - rankLen + 1 // +1是避免取到自己后少取一位
}
// 末位 score
byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit)
if err != nil {
return
}
for _, s := range byScore {
// 缓存用户信息
if ok := j.userCache.Contains(s.UserId); !ok {
if dbUser, err := j.svcCtx.UserPlatformModel.FindOneForRankByUserId(j.ctx, s.UserId); err == nil {
j.userCache.Add(dbUser.UserId, CachedUserInfo{
UserId: dbUser.UserId,
Username: dbUser.PUname,
Avatar: dbUser.PAvatar,
})
}
}
if _, ok := rankZSet.Score(s.UserId); ok {
rankZSet.Update(s.UserId, int32(s.Score))
} else {
rankZSet.Add(s.UserId, int32(s.Score))
}
}
}
func (j *Job) persistence(rankType int32) {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rank := rankZSet.RangeByRank(1, model.MaxRankN)
dbModel := make([]model.RankPvp, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
dbModel = append(dbModel, model.RankPvp{
Id: uuid.NextId(),
UserId: uid,
RankType: int64(rankType),
Score: score,
})
}
if len(dbModel) > 0 {
// 简单避免死锁
time.Sleep(1 * time.Second)
if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, rankType, dbModel); err != nil {
logx.Error("更新排行榜错误", err)
return
}
}
}
func (j *Job) getRankInstanceAndScoreType(rankType int32) (*zset.ZSetInt, model.ScoreType, error) {
var rankZSet *zset.ZSetInt
scoreType := model.ScoreTypeDamage
switch rankType {
case model.RankTypeDamage:
rankZSet = j.damageRank
scoreType = model.ScoreTypeDamage
case model.RankTypeDeDamage:
rankZSet = j.deDamageRank
scoreType = model.ScoreTypeDeDamage
case model.RankTypeGeneral:
rankZSet = j.generalRank
scoreType = model.ScoreTypeGeneral
case model.RankTypeDeGeneral:
rankZSet = j.deGeneralRank
scoreType = model.ScoreTypeDeGeneral
case model.RankTypeKillUnit:
rankZSet = j.killUnitRank
scoreType = model.ScoreTypeKillUnit
case model.RankTypeDeKillUnit:
rankZSet = j.deKillUnitRank
scoreType = model.ScoreTypeDeKillUnit
case model.RankTypeKillPlayer:
rankZSet = j.killPlayerRank
scoreType = model.ScoreTypeKillPlayer
case model.RankTypeDeKillPlayer:
rankZSet = j.deKillPlayerRank
scoreType = model.ScoreTypeDeKillPlayer
case model.RankTypeWin:
rankZSet = j.winRank
scoreType = model.ScoreTypeWin
case model.RankTypeLost:
rankZSet = j.lostRank
scoreType = model.ScoreTypeLost
case model.RankTypeFirstBlood:
rankZSet = j.firstBloodRank
scoreType = model.ScoreTypeFirstBlood
case model.RankTypeDeFirstBlood:
rankZSet = j.deFirstBloodRank
scoreType = model.ScoreTypeDeFirstBlood
}
if rankZSet == nil {
return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType)
}
return rankZSet, scoreType, nil
}