You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
7.8 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package rank
import (
"context"
"git.noahlan.cn/northlan/ntools-go/uuid"
lru "github.com/hashicorp/golang-lru"
zset "github.com/longzhiri/gozset"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/core/logx"
pbVars "live-service/app/pb/vars"
"live-service/app/user_center/model"
"live-service/app/user_center/rpc/internal/svc"
"sync"
"time"
)
var Service *Job
type (
CachedUserInfo struct {
UserId int64
Username string
Avatar string
}
RangeItem struct {
UserId int64
Username string
Avatar string
Score int64
}
Range struct {
Type pbVars.RankType
Items []RangeItem
}
ZSetWithLock struct {
*zset.ZSetInt
sync.RWMutex
}
Job struct {
ctx context.Context
svcCtx *svc.ServiceContext
// 实时排行榜(定期读取,半实时)
rankByTypeMap map[pbVars.RankType]*ZSetWithLock
// 用户数据表内存缓存
userCache *lru.Cache
}
)
func newZSetWithLock(scoreLessThan func(l, r int32) bool, rankN int) *ZSetWithLock {
return &ZSetWithLock{
ZSetInt: zset.NewZSetInt(scoreLessThan, rankN),
}
}
func InitRankJob(svcCtx *svc.ServiceContext) {
lessFunc := func(l, r int32) bool {
return l > r
}
uc, _ := lru.New(4*model.MaxRankN + 1000)
Service = &Job{
ctx: context.Background(),
svcCtx: svcCtx,
rankByTypeMap: map[pbVars.RankType]*ZSetWithLock{
pbVars.RankType_Damage: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeDamage: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_General: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeGeneral: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_KillUnit: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeKillUnit: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_KillPlayer: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeKillPlayer: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_Win: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_Lost: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_FirstBlood: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeFirstBlood: newZSetWithLock(lessFunc, model.MaxRankN),
},
userCache: uc,
}
if !svcCtx.GameConfig.Rank.Enabled {
return
}
Service.initJob()
}
// InitJob 初始化RankJob,启动时加载
// 1. 读取落地的排行榜数据并Add到榜内读取 用户数据)
// 2. 开启任务 定时读取 分数大于排行榜最后一位的数据存在则Update
// 3. 开启任务 定时落库
func (j *Job) initJob() {
logx.Info("开启排行榜服务...")
cfg := j.svcCtx.GameConfig.Rank
// job read and update
c := cron.New()
_, _ = c.AddFunc(cfg.Cron.Update, func() {
go j.readAndUpdate(pbVars.RankType_Damage)
go j.readAndUpdate(pbVars.RankType_DeDamage)
go j.readAndUpdate(pbVars.RankType_General)
go j.readAndUpdate(pbVars.RankType_DeGeneral)
go j.readAndUpdate(pbVars.RankType_KillUnit)
go j.readAndUpdate(pbVars.RankType_DeKillUnit)
go j.readAndUpdate(pbVars.RankType_KillPlayer)
go j.readAndUpdate(pbVars.RankType_DeKillPlayer)
go j.readAndUpdate(pbVars.RankType_Win)
go j.readAndUpdate(pbVars.RankType_Lost)
go j.readAndUpdate(pbVars.RankType_FirstBlood)
go j.readAndUpdate(pbVars.RankType_DeFirstBlood)
})
// persistence
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
go j.persistence(pbVars.RankType_Damage)
go j.persistence(pbVars.RankType_DeDamage)
go j.persistence(pbVars.RankType_General)
go j.persistence(pbVars.RankType_DeGeneral)
go j.persistence(pbVars.RankType_KillUnit)
go j.persistence(pbVars.RankType_DeKillUnit)
go j.persistence(pbVars.RankType_KillPlayer)
go j.persistence(pbVars.RankType_DeKillPlayer)
go j.persistence(pbVars.RankType_Win)
go j.persistence(pbVars.RankType_Lost)
go j.persistence(pbVars.RankType_FirstBlood)
go j.persistence(pbVars.RankType_DeFirstBlood)
})
c.Start()
}
func (j *Job) CleanByType(rankType pbVars.RankType) bool {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return false
}
rankZSet.Lock()
defer rankZSet.Unlock()
rankZSet.Clear()
return true
}
func (j *Job) RankByScore(rankType pbVars.RankType, score int32) int32 {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return 0
}
rankZSet.Lock()
defer rankZSet.Unlock()
rank := rankZSet.RangeByScore(score, score)
if len(rank) > 0 {
return int32(rankZSet.Rank(rank[0]))
}
return 0
}
func (j *Job) RangeRankByType(rankType pbVars.RankType, topN int32) Range {
result := Range{Type: rankType}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return result
}
if topN > model.MaxRankN {
topN = model.MaxRankN
}
rankZSet.Lock()
rank := rankZSet.RangeByRank(1, uint32(topN))
rankZSet.Unlock()
// 这里make 减少扩容次数
result.Items = make([]RangeItem, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
var item RangeItem
if c, ok := j.userCache.Get(uid); ok {
cached := c.(CachedUserInfo)
item = RangeItem{
UserId: cached.UserId,
Username: cached.Username,
Score: score,
Avatar: cached.Avatar,
}
} else {
dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, uid)
if err != nil {
item = RangeItem{
UserId: uid,
Score: score,
}
} else {
item = RangeItem{
UserId: uid,
Username: dbUser.PUname,
Score: score,
Avatar: dbUser.PAvatar,
}
}
}
result.Items = append(result.Items, item)
}
return result
}
func (j *Job) readAndUpdate(rankType pbVars.RankType) {
rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rankZSet.Lock()
defer rankZSet.Unlock()
rank := rankZSet.RangeByRank(1, model.MaxRankN)
rankLen := len(rank)
var score int64
if rankLen > 0 {
// 取当前榜最后一名分数
score = rank[rankLen-1][1]
}
// 若榜内数量不够,则直接取 max - len 数量的人 score排序一下
limit := model.MaxRankN
if rankLen < model.MaxRankN {
limit = model.MaxRankN - rankLen + 1 // +1是避免取到自己 少取一位
}
// 末位 score
byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit)
if err != nil {
return
}
for _, s := range byScore {
// 缓存用户信息
if ok := j.userCache.Contains(s.UserId); !ok {
if dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, s.UserId); err == nil {
j.userCache.Add(dbUser.UserId, CachedUserInfo{
UserId: dbUser.UserId,
Username: dbUser.PUname,
Avatar: dbUser.PAvatar,
})
}
}
if _, ok := rankZSet.Score(s.UserId); ok {
rankZSet.Update(s.UserId, int32(s.Score))
} else {
rankZSet.Add(s.UserId, int32(s.Score))
}
}
}
func (j *Job) persistence(rankType pbVars.RankType) {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rankZSet.Lock()
rank := rankZSet.RangeByRank(1, model.MaxRankN)
rankZSet.Unlock()
dbModel := make([]model.RankPvp, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
dbModel = append(dbModel, model.RankPvp{
Id: uuid.NextId(),
UserId: uid,
RankType: int64(rankType),
Score: score,
})
}
if len(dbModel) > 0 {
// 简单避免死锁
time.Sleep(1 * time.Second)
if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, int64(rankType), dbModel); err != nil {
logx.Error("更新排行榜错误", err)
return
}
}
}
func (j *Job) getRankInstanceAndScoreType(rankType pbVars.RankType) (*ZSetWithLock, model.ScoreType, error) {
var rankZSet *ZSetWithLock
scoreType := model.ScoreTypeByRankType(rankType)
rankZSet, _ = j.rankByTypeMap[rankType]
if rankZSet == nil {
return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType)
}
return rankZSet, scoreType, nil
}