You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

247 lines
6.2 KiB
Go

package rank
import (
"context"
"git.noahlan.cn/northlan/ntools-go/uuid"
lru "github.com/hashicorp/golang-lru"
zset "github.com/longzhiri/gozset"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/core/logx"
"live-service/app/user_center/model"
"live-service/app/user_center/rpc/internal/svc"
"live-service/app/user_center/rpc/pb"
)
var Service *Job
type (
CachedUserInfo struct {
UserId int64
Username string
Avatar string
}
Job struct {
ctx context.Context
svcCtx *svc.ServiceContext
// 实时排行榜(定期读取,半实时)
damageRank *zset.ZSetInt
generalRank *zset.ZSetInt
killUnitRank *zset.ZSetInt
killPlayerRank *zset.ZSetInt
// 用户数据表内存缓存
userCache *lru.Cache
}
)
func InitRankJob(svcCtx *svc.ServiceContext) {
lessFunc := func(l, r int32) bool {
return l > r
}
uc, _ := lru.New(4*model.MaxRankN + 1000)
Service = &Job{
ctx: context.Background(),
svcCtx: svcCtx,
damageRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
generalRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
killUnitRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
killPlayerRank: zset.NewZSetInt(lessFunc, model.MaxRankN),
userCache: uc,
}
Service.initJob()
}
// InitJob 初始化RankJob,启动时加载
// 1. 读取落地的排行榜数据并Add到榜内读取 用户数据)
// 2. 开启任务 定时读取 分数大于排行榜最后一位的数据存在则Update
// 3. 开启任务 定时落库
func (j *Job) initJob() {
logx.Info("开启排行榜服务...")
j.initByType(model.RankTypeDamage)
j.initByType(model.RankTypeGeneral)
j.initByType(model.RankTypeKillUnit)
j.initByType(model.RankTypeKillPlayer)
// job read and update
c1 := cron.New()
_, _ = c1.AddFunc("@every 1s", func() {
go j.readAndUpdate(model.RankTypeDamage)
go j.readAndUpdate(model.RankTypeGeneral)
go j.readAndUpdate(model.RankTypeKillUnit)
go j.readAndUpdate(model.RankTypeKillPlayer)
})
c2 := cron.New()
_, _ = c2.AddFunc("@every 10min", func() {
go j.persistence(model.RankTypeDamage)
go j.persistence(model.RankTypeGeneral)
go j.persistence(model.RankTypeKillUnit)
go j.persistence(model.RankTypeKillPlayer)
})
c1.Start()
c2.Start()
}
func (j *Job) RangeRankByType(rankType, topN int32) *pb.RankPvpResp {
result := &pb.RankPvpResp{
Type: rankType,
}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return result
}
if topN > model.MaxRankN {
topN = model.MaxRankN
}
rank := rankZSet.RangeByRank(1, uint32(topN))
// 这里make 减少扩容次数
result.Items = make([]*pb.RankPvpResp_Item, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
var item pb.RankPvpResp_Item
if c, ok := j.userCache.Get(uid); ok {
cached := c.(CachedUserInfo)
item = pb.RankPvpResp_Item{
Uid: cached.UserId,
Uname: cached.Username,
Score: score,
Avatar: cached.Avatar,
}
} else {
dbUser, err := j.svcCtx.UserPlatformModel.FindOneForRankByUserId(j.ctx, uid)
if err != nil {
item = pb.RankPvpResp_Item{
Uid: uid,
Score: score,
}
} else {
item = pb.RankPvpResp_Item{
Uid: uid,
Uname: dbUser.PUname,
Score: score,
Avatar: dbUser.PAvatar,
}
}
}
result.Items = append(result.Items, &item)
}
return result
}
func (j *Job) initByType(rankType int32) {
list, err := j.svcCtx.RankPvpModel.RankListByType(j.ctx, rankType, model.MaxRankN)
if err != nil {
return
}
for _, user := range list {
// 缓存用户信息
j.userCache.Add(user.UserId, CachedUserInfo{
UserId: user.UserId,
Username: user.PUname,
Avatar: user.PAvatar,
})
switch rankType {
case model.RankTypeDamage:
j.damageRank.Add(user.UserId, int32(user.Score))
case model.RankTypeGeneral:
j.generalRank.Add(user.UserId, int32(user.Score))
case model.RankTypeKillUnit:
j.killUnitRank.Add(user.UserId, int32(user.Score))
case model.RankTypeKillPlayer:
j.killPlayerRank.Add(user.UserId, int32(user.Score))
}
}
}
func (j *Job) readAndUpdate(rankType int32) {
rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rank := rankZSet.RangeByRank(1, model.MaxRankN)
var score int64
if len(rank) == 0 {
score = 0
} else {
last := rank[len(rank)-1]
score = last[1]
}
// 末位 score
byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType)
if err != nil {
return
}
for _, s := range byScore {
// 缓存用户信息
if ok := j.userCache.Contains(s.UserId); !ok {
if dbUser, err := j.svcCtx.UserPlatformModel.FindOneForRankByUserId(j.ctx, s.UserId); err == nil {
j.userCache.Add(dbUser.UserId, CachedUserInfo{
UserId: dbUser.UserId,
Username: dbUser.PUname,
Avatar: dbUser.PAvatar,
})
}
}
if _, ok := rankZSet.Score(s.UserId); ok {
rankZSet.Update(s.UserId, int32(s.Score))
} else {
rankZSet.Add(s.UserId, int32(s.Score))
}
}
}
func (j *Job) persistence(rankType int32) {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rank := rankZSet.RangeByRank(1, model.MaxRankN)
dbModel := make([]model.RankPvp, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
dbModel = append(dbModel, model.RankPvp{
Id: uuid.NextId(),
UserId: uid,
RankType: int64(rankType),
Score: score,
})
}
if len(dbModel) > 0 {
if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, rankType, dbModel); err != nil {
logx.Error("更新排行榜错误", err)
return
}
}
}
func (j *Job) getRankInstanceAndScoreType(rankType int32) (*zset.ZSetInt, model.ScoreType, error) {
var rankZSet *zset.ZSetInt
scoreType := model.ScoreTypeDamage
switch rankType {
case model.RankTypeDamage:
rankZSet = j.damageRank
scoreType = model.ScoreTypeDamage
case model.RankTypeGeneral:
rankZSet = j.generalRank
scoreType = model.ScoreTypeGeneral
case model.RankTypeKillUnit:
rankZSet = j.killUnitRank
scoreType = model.ScoreTypeKillUnit
case model.RankTypeKillPlayer:
rankZSet = j.killPlayerRank
scoreType = model.ScoreTypeKillPlayer
}
if rankZSet == nil {
return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType)
}
return rankZSet, scoreType, nil
}