You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
8.4 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package rank
import (
"context"
"git.noahlan.cn/northlan/ntools-go/uuid"
lru "github.com/hashicorp/golang-lru"
zset "github.com/longzhiri/gozset"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/core/logx"
"live-service/app/user_center/model"
"live-service/app/user_center/rpc/internal/svc"
"live-service/app/user_center/rpc/pb"
"time"
)
var Service *Job
type (
CachedUserInfo struct {
UserId int64
Username string
Avatar string
}
Job struct {
ctx context.Context
svcCtx *svc.ServiceContext
// 实时排行榜(定期读取,半实时)
rankByTypeMap map[pb.RankType]*zset.ZSetInt
//damageRank *zset.ZSetInt
//deDamageRank *zset.ZSetInt
//generalRank *zset.ZSetInt
//deGeneralRank *zset.ZSetInt
//killUnitRank *zset.ZSetInt
//deKillUnitRank *zset.ZSetInt
//killPlayerRank *zset.ZSetInt
//deKillPlayerRank *zset.ZSetInt
//winRank *zset.ZSetInt
//lostRank *zset.ZSetInt
//firstBloodRank *zset.ZSetInt
//deFirstBloodRank *zset.ZSetInt
// 用户数据表内存缓存
userCache *lru.Cache
}
)
func InitRankJob(svcCtx *svc.ServiceContext) {
lessFunc := func(l, r int32) bool {
return l > r
}
uc, _ := lru.New(4*model.MaxRankN + 1000)
Service = &Job{
ctx: context.Background(),
svcCtx: svcCtx,
rankByTypeMap: map[pb.RankType]*zset.ZSetInt{
pb.RankType_Damage: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_DeDamage: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_General: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_DeGeneral: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_KillUnit: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_DeKillUnit: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_KillPlayer: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_DeKillPlayer: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_Win: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_Lost: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_FirstBlood: zset.NewZSetInt(lessFunc, model.MaxRankN),
pb.RankType_DeFirstBlood: zset.NewZSetInt(lessFunc, model.MaxRankN),
},
userCache: uc,
}
if !svcCtx.Config.Rank.Enabled {
return
}
Service.initJob()
}
// InitJob 初始化RankJob,启动时加载
// 1. 读取落地的排行榜数据并Add到榜内读取 用户数据)
// 2. 开启任务 定时读取 分数大于排行榜最后一位的数据存在则Update
// 3. 开启任务 定时落库
func (j *Job) initJob() {
logx.Info("开启排行榜服务...")
// TODO 读取历史数据功能暂时移除
//j.initByType(model.RankTypeDamage)
//j.initByType(model.RankTypeDeDamage)
//j.initByType(model.RankTypeGeneral)
//j.initByType(model.RankTypeDeGeneral)
//j.initByType(model.RankTypeKillUnit)
//j.initByType(model.RankTypeDeKillUnit)
//j.initByType(model.RankTypeKillPlayer)
//j.initByType(model.RankTypeDeKillPlayer)
//j.initByType(model.RankTypeWin)
//j.initByType(model.RankTypeLost)
//j.initByType(model.RankTypeFirstBlood)
//j.initByType(model.RankTypeDeFirstBlood)
cfg := j.svcCtx.Config.Rank
// job read and update
c := cron.New()
_, _ = c.AddFunc(cfg.Cron.Update, func() {
go j.readAndUpdate(pb.RankType_Damage)
go j.readAndUpdate(pb.RankType_DeDamage)
go j.readAndUpdate(pb.RankType_General)
go j.readAndUpdate(pb.RankType_DeGeneral)
go j.readAndUpdate(pb.RankType_KillUnit)
go j.readAndUpdate(pb.RankType_DeKillUnit)
go j.readAndUpdate(pb.RankType_KillPlayer)
go j.readAndUpdate(pb.RankType_DeKillPlayer)
go j.readAndUpdate(pb.RankType_Win)
go j.readAndUpdate(pb.RankType_Lost)
go j.readAndUpdate(pb.RankType_FirstBlood)
go j.readAndUpdate(pb.RankType_DeFirstBlood)
})
// persistence
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
go j.persistence(pb.RankType_Damage)
go j.persistence(pb.RankType_DeDamage)
go j.persistence(pb.RankType_General)
go j.persistence(pb.RankType_DeGeneral)
go j.persistence(pb.RankType_KillUnit)
go j.persistence(pb.RankType_DeKillUnit)
go j.persistence(pb.RankType_KillPlayer)
go j.persistence(pb.RankType_DeKillPlayer)
go j.persistence(pb.RankType_Win)
go j.persistence(pb.RankType_Lost)
go j.persistence(pb.RankType_FirstBlood)
go j.persistence(pb.RankType_DeFirstBlood)
})
c.Start()
}
func (j *Job) RankByScore(rankType pb.RankType, score int32) int32 {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return 0
}
rank := rankZSet.RangeByScore(score, score)
if len(rank) > 0 {
return int32(rankZSet.Rank(rank[0]))
}
return 0
}
func (j *Job) RangeRankByType(rankType pb.RankType, topN int32) *pb.RankPvpResp {
result := &pb.RankPvpResp{
Type: int32(rankType),
}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return result
}
if topN > model.MaxRankN {
topN = model.MaxRankN
}
rank := rankZSet.RangeByRank(1, uint32(topN))
// 这里make 减少扩容次数
result.Items = make([]*pb.RankPvpResp_Item, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
var item pb.RankPvpResp_Item
if c, ok := j.userCache.Get(uid); ok {
cached := c.(CachedUserInfo)
item = pb.RankPvpResp_Item{
Uid: cached.UserId,
Uname: cached.Username,
Score: score,
Avatar: cached.Avatar,
}
} else {
dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, uid)
if err != nil {
item = pb.RankPvpResp_Item{
Uid: uid,
Score: score,
}
} else {
item = pb.RankPvpResp_Item{
Uid: uid,
Uname: dbUser.PUname,
Score: score,
Avatar: dbUser.PAvatar,
}
}
}
result.Items = append(result.Items, &item)
}
return result
}
func (j *Job) initByType(rankType pb.RankType) {
list, err := j.svcCtx.RankPvpModel.RankListByType(j.ctx, int64(rankType), model.MaxRankN)
if err != nil {
return
}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
for _, user := range list {
// 缓存用户信息
j.userCache.Add(user.UserId, CachedUserInfo{
UserId: user.UserId,
Username: user.PUname,
Avatar: user.PAvatar,
})
rankZSet.Add(user.UserId, int32(user.Score))
}
}
func (j *Job) readAndUpdate(rankType pb.RankType) {
rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rank := rankZSet.RangeByRank(1, model.MaxRankN)
rankLen := len(rank)
var score int64
if rankLen > 0 {
// 取当前榜最后一名分数
score = rank[rankLen-1][1]
}
// 若榜内数量不够,则直接取 max - len 数量的人 score排序一下
limit := model.MaxRankN
if rankLen < model.MaxRankN {
limit = model.MaxRankN - rankLen + 1 // +1是避免取到自己 少取一位
}
// 末位 score
byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit)
if err != nil {
return
}
for _, s := range byScore {
// 缓存用户信息
if ok := j.userCache.Contains(s.UserId); !ok {
if dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, s.UserId); err == nil {
j.userCache.Add(dbUser.UserId, CachedUserInfo{
UserId: dbUser.UserId,
Username: dbUser.PUname,
Avatar: dbUser.PAvatar,
})
}
}
if _, ok := rankZSet.Score(s.UserId); ok {
rankZSet.Update(s.UserId, int32(s.Score))
} else {
rankZSet.Add(s.UserId, int32(s.Score))
}
}
}
func (j *Job) persistence(rankType pb.RankType) {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rank := rankZSet.RangeByRank(1, model.MaxRankN)
dbModel := make([]model.RankPvp, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
dbModel = append(dbModel, model.RankPvp{
Id: uuid.NextId(),
UserId: uid,
RankType: int64(rankType),
Score: score,
})
}
if len(dbModel) > 0 {
// 简单避免死锁
time.Sleep(1 * time.Second)
if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, int64(rankType), dbModel); err != nil {
logx.Error("更新排行榜错误", err)
return
}
}
}
func (j *Job) getRankInstanceAndScoreType(rankType pb.RankType) (*zset.ZSetInt, model.ScoreType, error) {
var rankZSet *zset.ZSetInt
scoreType := model.ScoreTypeByRankType(rankType)
rankZSet, _ = j.rankByTypeMap[rankType]
if rankZSet == nil {
return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType)
}
return rankZSet, scoreType, nil
}