You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.2 KiB
Go

package statistics
import (
"context"
"git.noahlan.cn/northlan/ntools-go/uuid"
lru "github.com/hashicorp/golang-lru"
zset "github.com/longzhiri/gozset"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/core/logx"
pbVars "live-service/app/pb/vars"
"live-service/app/user_center/model"
"live-service/app/user_center/rpc/internal/svc"
"sync"
"time"
)
var Service *Job
type (
CachedUserInfo struct {
UserId int64
Username string
Avatar string
}
RangeItem struct {
UserId int64
Username string
Avatar string
Score int64
}
Range struct {
Type pbVars.RankType
Items []RangeItem
}
ZSetWithLock struct {
*zset.ZSetInt
sync.RWMutex
}
Job struct {
ctx context.Context
svcCtx *svc.ServiceContext
// 实时排行榜(定期读取,半实时)
rankByTypeMap map[pbVars.RankType]*ZSetWithLock
// 用户数据表内存缓存
userCache *lru.Cache
}
)
func newZSetWithLock(scoreLessThan func(l, r int32) bool, rankN int) *ZSetWithLock {
return &ZSetWithLock{
ZSetInt: zset.NewZSetInt(scoreLessThan, rankN),
}
}
func InitRankJob(svcCtx *svc.ServiceContext) {
lessFunc := func(l, r int32) bool {
return l > r
}
uc, _ := lru.New(4*model.MaxRankN + 1000)
Service = &Job{
ctx: context.Background(),
svcCtx: svcCtx,
rankByTypeMap: map[pbVars.RankType]*ZSetWithLock{
pbVars.RankType_Damage: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeDamage: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_General: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeGeneral: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_KillUnit: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeKillUnit: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_KillPlayer: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeKillPlayer: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_Win: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_Lost: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_FirstBlood: newZSetWithLock(lessFunc, model.MaxRankN),
pbVars.RankType_DeFirstBlood: newZSetWithLock(lessFunc, model.MaxRankN),
},
userCache: uc,
}
if !svcCtx.GameConfig.Rank.Enabled {
return
}
Service.initJob()
}
// InitJob 初始化RankJob,启动时加载
// 1. 读取落地的排行榜数据并Add到榜内读取 用户数据)
// 2. 开启任务 定时读取 分数大于排行榜最后一位的数据存在则Update
// 3. 开启任务 定时落库
func (j *Job) initJob() {
logx.Info("开启排行榜服务...")
cfg := j.svcCtx.GameConfig.Rank
// job read and update
c := cron.New()
_, _ = c.AddFunc(cfg.Cron.Update, func() {
go j.readAndUpdate(pbVars.RankType_Damage)
go j.readAndUpdate(pbVars.RankType_DeDamage)
go j.readAndUpdate(pbVars.RankType_General)
go j.readAndUpdate(pbVars.RankType_DeGeneral)
go j.readAndUpdate(pbVars.RankType_KillUnit)
go j.readAndUpdate(pbVars.RankType_DeKillUnit)
go j.readAndUpdate(pbVars.RankType_KillPlayer)
go j.readAndUpdate(pbVars.RankType_DeKillPlayer)
go j.readAndUpdate(pbVars.RankType_Win)
go j.readAndUpdate(pbVars.RankType_Lost)
go j.readAndUpdate(pbVars.RankType_FirstBlood)
go j.readAndUpdate(pbVars.RankType_DeFirstBlood)
})
// persistence
_, _ = c.AddFunc(cfg.Cron.Persistence, func() {
go j.persistence(pbVars.RankType_Damage)
go j.persistence(pbVars.RankType_DeDamage)
go j.persistence(pbVars.RankType_General)
go j.persistence(pbVars.RankType_DeGeneral)
go j.persistence(pbVars.RankType_KillUnit)
go j.persistence(pbVars.RankType_DeKillUnit)
go j.persistence(pbVars.RankType_KillPlayer)
go j.persistence(pbVars.RankType_DeKillPlayer)
go j.persistence(pbVars.RankType_Win)
go j.persistence(pbVars.RankType_Lost)
go j.persistence(pbVars.RankType_FirstBlood)
go j.persistence(pbVars.RankType_DeFirstBlood)
})
c.Start()
}
func (j *Job) CleanByType(rankType pbVars.RankType) bool {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return false
}
rankZSet.Lock()
defer rankZSet.Unlock()
rankZSet.Clear()
return true
}
func (j *Job) RankByScore(rankType pbVars.RankType, score int32) int32 {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return 0
}
rankZSet.Lock()
defer rankZSet.Unlock()
rank := rankZSet.RangeByScore(score, score)
if len(rank) > 0 {
return int32(rankZSet.Rank(rank[0]))
}
return 0
}
func (j *Job) RangeRankByType(rankType pbVars.RankType, topN int32) Range {
result := Range{Type: rankType}
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return result
}
if topN > model.MaxRankN {
topN = model.MaxRankN
}
rankZSet.Lock()
rank := rankZSet.RangeByRank(1, uint32(topN))
rankZSet.Unlock()
// 这里make 减少扩容次数
result.Items = make([]RangeItem, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
var item RangeItem
var userInfoNeedUpdate bool
if c, ok := j.userCache.Get(uid); ok {
cached := c.(CachedUserInfo)
if cached.Username == "" || cached.Avatar == "" {
userInfoNeedUpdate = true
} else {
item = RangeItem{
UserId: cached.UserId,
Username: cached.Username,
Score: score,
Avatar: cached.Avatar,
}
}
} else {
userInfoNeedUpdate = true
}
if userInfoNeedUpdate {
dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, uid)
if err != nil {
item = RangeItem{
UserId: uid,
Score: score,
}
} else {
j.userCache.Add(dbUser.UserId, CachedUserInfo{
UserId: dbUser.UserId,
Username: dbUser.PUname,
Avatar: dbUser.PAvatar,
})
item = RangeItem{
UserId: uid,
Username: dbUser.PUname,
Score: score,
Avatar: dbUser.PAvatar,
}
}
}
result.Items = append(result.Items, item)
}
return result
}
func (j *Job) readAndUpdate(rankType pbVars.RankType) {
rankZSet, scoreType, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rankZSet.Lock()
defer rankZSet.Unlock()
rank := rankZSet.RangeByRank(1, model.MaxRankN)
rankLen := len(rank)
var score int64
if rankLen > 0 {
// 取当前榜最后一名分数
score = rank[rankLen-1][1]
}
// 若榜内数量不够,则直接取 max - len 数量的人 score排序一下
limit := model.MaxRankN
if rankLen < model.MaxRankN {
limit = model.MaxRankN - rankLen + 1 // +1是避免取到自己 少取一位
}
// 末位 score
byScore, err := j.svcCtx.StatisticsPvpModel.FindGreaterByScore(j.ctx, score, scoreType, limit)
if err != nil {
return
}
for _, s := range byScore {
// 缓存用户信息
if ok := j.userCache.Contains(s.UserId); !ok {
if dbUser, err := j.svcCtx.UserPlatformModel.FindDisplayOneByUserId(j.ctx, s.UserId); err == nil {
j.userCache.Add(dbUser.UserId, CachedUserInfo{
UserId: dbUser.UserId,
Username: dbUser.PUname,
Avatar: dbUser.PAvatar,
})
}
}
if _, ok := rankZSet.Score(s.UserId); ok {
rankZSet.Update(s.UserId, int32(s.Score))
} else {
rankZSet.Add(s.UserId, int32(s.Score))
}
}
}
func (j *Job) persistence(rankType pbVars.RankType) {
rankZSet, _, err := j.getRankInstanceAndScoreType(rankType)
if err != nil {
return
}
rankZSet.Lock()
rank := rankZSet.RangeByRank(1, model.MaxRankN)
rankZSet.Unlock()
dbModel := make([]model.RankPvp, 0, len(rank))
for _, r := range rank {
uid := r[0]
score := r[1]
dbModel = append(dbModel, model.RankPvp{
Id: uuid.NextId(),
UserId: uid,
RankType: int64(rankType),
Score: score,
})
}
if len(dbModel) > 0 {
// 简单避免死锁
time.Sleep(1 * time.Second)
if err = j.svcCtx.RankPvpModel.UpdateRank(j.ctx, int64(rankType), dbModel); err != nil {
logx.Error("更新排行榜错误", err)
return
}
}
}
func (j *Job) getRankInstanceAndScoreType(rankType pbVars.RankType) (*ZSetWithLock, model.ScoreType, error) {
var rankZSet *ZSetWithLock
scoreType := model.ScoreTypeByRankType(rankType)
rankZSet, _ = j.rankByTypeMap[rankType]
if rankZSet == nil {
return nil, scoreType, errors.Errorf("没有此类型 [%d] 的排行榜", rankType)
}
return rankZSet, scoreType, nil
}