fix: 修复用户积分并发问题,添加乐观锁以及重试机制。

main
NorthLan 3 years ago
parent 1b1a415a34
commit de788a6fd8

@ -2,12 +2,10 @@ package model
import ( import (
"context" "context"
"database/sql"
"fmt"
"git.noahlan.cn/northlan/ntools-go/gorm-zero/gormc" "git.noahlan.cn/northlan/ntools-go/gorm-zero/gormc"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/plugin/optimisticlock"
"live-service/common/nerr" "live-service/common/nerr"
) )
@ -20,8 +18,8 @@ type (
userIntegralModel userIntegralModel
Transact(ctx context.Context, tx *gorm.DB, fn func(tx *gorm.DB) error) error Transact(ctx context.Context, tx *gorm.DB, fn func(tx *gorm.DB) error) error
InsertTx(ctx context.Context, tx *gorm.DB, data *UserIntegral) error InsertTx(ctx context.Context, tx *gorm.DB, data *UserIntegral) error
FindIntegral(ctx context.Context, tx *gorm.DB, userId int64) (int64, error) FindOneTx(ctx context.Context, tx *gorm.DB, userId int64) (*UserIntegral, error)
UpdateIntegralTx(ctx context.Context, tx *gorm.DB, userId, addon int64) error UpdateTx(ctx context.Context, tx *gorm.DB, integral *UserIntegral) error
// ChangeIntegral 用户积分变动 // ChangeIntegral 用户积分变动
ChangeIntegral(ctx context.Context, tx *gorm.DB, userId int64, change int64) (int64, error) ChangeIntegral(ctx context.Context, tx *gorm.DB, userId int64, change int64) (int64, error)
} }
@ -47,73 +45,80 @@ func (m *customUserIntegralModel) InsertTx(ctx context.Context, tx *gorm.DB, dat
return err return err
} }
func (m *customUserIntegralModel) UpdateIntegralTx(ctx context.Context, tx *gorm.DB, userId, integral int64) error { func (m *customUserIntegralModel) UpdateTx(ctx context.Context, tx *gorm.DB, integral *UserIntegral) error {
if integral < 0 { if integral.Integral < 0 {
return errors.New("无法将积分更新至负数") return errors.New("无法将积分更新至负数")
} }
db := withTx(ctx, m.conn, tx) db := withTx(ctx, m.conn, tx)
result := db.Table(m.table). result := db.Model(&integral).Updates(&UserIntegral{Integral: integral.Integral, Version: optimisticlock.Version{Int64: 1}})
Where("`user_id` = ?", userId).
Update("`integral`", integral)
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error
} }
// TODO 这里得处理一下
if result.RowsAffected == 0 { if result.RowsAffected == 0 {
logx.Statf("更新积分影响行数为0, user_id: %d, integral: %d", userId, integral) return ErrRowsAffectedZero
return nil
} }
return nil return nil
} }
func (m *customUserIntegralModel) FindIntegral(ctx context.Context, tx *gorm.DB, userId int64) (int64, error) { func (m *customUserIntegralModel) FindOneTx(ctx context.Context, tx *gorm.DB, userId int64) (*UserIntegral, error) {
var resp int64 var resp UserIntegral
err := withTx(ctx, m.conn, tx).Table(m.table). err := withTx(ctx, m.conn, tx).Model(&UserIntegral{}).
Select(fmt.Sprintf("%s.integral", m.table)).
Where("`user_id` = ?", userId).Take(&resp).Error Where("`user_id` = ?", userId).Take(&resp).Error
switch err { switch err {
case nil: case nil:
return resp, nil return &resp, nil
case gormc.ErrNotFound: case gormc.ErrNotFound:
return 0, ErrNotFound return nil, ErrNotFound
default: default:
return 0, err return nil, err
} }
} }
func (m *customUserIntegralModel) ChangeIntegral(ctx context.Context, tx *gorm.DB, userId int64, change int64) (int64, error) { func (m *customUserIntegralModel) ChangeIntegral(ctx context.Context, tx *gorm.DB, userId int64, change int64) (int64, error) {
resp := change resp := change
err := withTx(ctx, m.conn, tx).Transaction(func(tx *gorm.DB) error { var err error
integral, err := m.FindIntegral(ctx, tx, userId) for i := VersionRetryCount; i > 0; i-- {
if err != nil { err = withTx(ctx, m.conn, tx).Transaction(func(tx *gorm.DB) error {
if errors.Is(err, ErrNotFound) { data, err := m.FindOneTx(ctx, tx, userId)
if change < 0 { if err != nil {
return nerr.NewWithCode(nerr.UserIntegralNotEnoughError) if errors.Is(err, ErrNotFound) {
if change < 0 {
return nerr.NewWithCode(nerr.UserIntegralNotEnoughError)
}
// 用户积分记录不存在,进行插入
if err = m.InsertTx(ctx, tx, &UserIntegral{
UserId: userId,
Integral: change,
}); err != nil {
return errors.Wrap(err, "插入用户积分失败")
}
return nil
} else {
return errors.Wrap(err, "获取当前用户积分失败")
} }
// 用户积分记录不存在,进行插入 }
if err = m.InsertTx(ctx, tx, &UserIntegral{ if data.Integral+change < 0 {
UserId: userId, return errors.New("用户积分不足")
Integral: change, }
}); err != nil { data.Integral += change
return errors.Wrap(err, "插入用户积分失败") if err = m.UpdateTx(ctx, tx, data); err != nil {
if errors.Is(err, ErrRowsAffectedZero) {
return err
} }
return nil return errors.Wrap(err, "更新用户积分失败")
} else {
return errors.Wrap(err, "获取当前用户积分失败")
} }
resp = data.Integral
return nil
})
if err != nil && errors.Is(err, ErrRowsAffectedZero) {
// 未能正确更新,直接重试
continue
} else {
// 其它错误退出循环
break
} }
if integral+change < 0 { }
return errors.New("用户积分不足")
}
if err = m.UpdateIntegralTx(ctx, tx, userId, integral+change); err != nil {
return errors.Wrap(err, "更新用户积分失败")
}
resp = integral + change
return nil
}, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
})
return resp, err return resp, err
} }

@ -5,6 +5,7 @@ package model
import ( import (
"context" "context"
"git.noahlan.cn/northlan/ntools-go/gorm-zero/gormc" "git.noahlan.cn/northlan/ntools-go/gorm-zero/gormc"
"gorm.io/plugin/optimisticlock"
"time" "time"
"gorm.io/gorm" "gorm.io/gorm"
@ -24,10 +25,11 @@ type (
} }
UserIntegral struct { UserIntegral struct {
UserId int64 `gorm:"column:user_id;primaryKey"` // 用户ID UserId int64 `gorm:"column:user_id;primaryKey"` // 用户ID
Integral int64 `gorm:"column:integral"` // 用户积分1 RMB1000 Integral int64 `gorm:"column:integral"` // 用户积分1 RMB1000
CreateTime time.Time `gorm:"column:create_time;default:null"` // 创建时间 Version optimisticlock.Version `gorm:"column:version"` // 乐观锁
UpdateTime time.Time `gorm:"column:update_time;default:null"` // 更新时间 CreateTime time.Time `gorm:"column:create_time;default:null"` // 创建时间
UpdateTime time.Time `gorm:"column:update_time;default:null"` // 更新时间
} }
) )

@ -9,6 +9,8 @@ import (
var ErrNotFound = gorm.ErrRecordNotFound var ErrNotFound = gorm.ErrRecordNotFound
var ErrRowsAffectedZero = errors.New("RowsAffected zero") var ErrRowsAffectedZero = errors.New("RowsAffected zero")
const VersionRetryCount = 5 // 乐观锁重试次数
// BitBool is an implementation of a bool for the MySQL type BIT(1). // BitBool is an implementation of a bool for the MySQL type BIT(1).
// This type allows you to avoid wasting an entire byte for MySQL's boolean type TINYINT. // This type allows you to avoid wasting an entire byte for MySQL's boolean type TINYINT.
type BitBool bool type BitBool bool

@ -65,9 +65,6 @@ func (l *UserSendGiftLogic) UserSendGift(in *pb.UserSendGiftReq) (*pb.UserSendGi
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err != nil {
return nil, err
}
resp.Integral = &pb.ChangeIntegralResp{ resp.Integral = &pb.ChangeIntegralResp{
UserId: in.UserId, UserId: in.UserId,
Change: addonIntegral, Change: addonIntegral,

@ -28,13 +28,13 @@ func NewGetUserIntegralLogic(ctx context.Context, svcCtx *svc.ServiceContext) *G
// GetUserIntegral 获取用户积分 // GetUserIntegral 获取用户积分
func (l *GetUserIntegralLogic) GetUserIntegral(in *pb.UserIdReq) (*pb.UserIntegralResp, error) { func (l *GetUserIntegralLogic) GetUserIntegral(in *pb.UserIdReq) (*pb.UserIntegralResp, error) {
// 查询当前用户积分 // 查询当前用户积分
integral, err := l.svcCtx.UserIntegralModel.FindIntegral(l.ctx, nil, in.UserId) integral, err := l.svcCtx.UserIntegralModel.FindOneTx(l.ctx, nil, in.UserId)
if err != nil { if err != nil {
return nil, errors.Wrapf(nerr.NewWithCode(nerr.DBError), "查询用户积分失败, err:%+v", err) return nil, errors.Wrapf(nerr.NewWithCode(nerr.DBError), "查询用户积分失败, err:%+v", err)
} }
return &pb.UserIntegralResp{ return &pb.UserIntegralResp{
UserId: in.UserId, UserId: in.UserId,
Integral: integral, Integral: integral.Integral,
}, nil }, nil
} }

@ -67,14 +67,14 @@ func (l *StatPvpReportLogic) StatPvpReport(in *pb.StatPvPReportReq) (*pb.StatPvP
battleReportCfg := l.svcCtx.Config.Integral.BattleReport battleReportCfg := l.svcCtx.Config.Integral.BattleReport
// 名将积分 // 名将积分
if in.General.Uid > 0 { if in.General.Uid > 0 {
integral, err := l.svcCtx.UserIntegralModel.ChangeIntegral(l.ctx, nil, in.General.Uid, battleReportCfg.GeneralIntegral) _, err := l.svcCtx.UserIntegralModel.ChangeIntegral(l.ctx, nil, in.General.Uid, battleReportCfg.GeneralIntegral)
if err != nil { if err != nil {
l.Logger.Errorf("名将积分更新失败, err:%v", err) l.Logger.Errorf("名将积分更新失败, err:%v", err)
} }
resp.General = &pb.StatPvPReportResp_Item{ resp.General = &pb.StatPvPReportResp_Item{
Uid: in.General.Uid, Uid: in.General.Uid,
Uname: in.General.Uname, Uname: in.General.Uname,
AddonIntegral: integral, AddonIntegral: battleReportCfg.GeneralIntegral,
} }
} }
winItemResp := make([]*pb.StatPvPReportResp_Item, 0, len(in.WinItems)) winItemResp := make([]*pb.StatPvPReportResp_Item, 0, len(in.WinItems))

@ -29,12 +29,18 @@ type ServiceContext struct {
} }
func NewServiceContext(c config.Config) *ServiceContext { func NewServiceContext(c config.Config) *ServiceContext {
var logLevel logger.LogLevel
if c.Log.Mode == "console" {
logLevel = logger.Info
} else {
logLevel = logger.Warn
}
gormDb, err := gorm.Open(mysql.Open(c.DB.DataSource), &gorm.Config{ gormDb, err := gorm.Open(mysql.Open(c.DB.DataSource), &gorm.Config{
Logger: logger.New( Logger: logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{ logger.Config{
SlowThreshold: 5 * time.Second, SlowThreshold: 5 * time.Second,
LogLevel: logger.Warn, LogLevel: logLevel,
IgnoreRecordNotFoundError: true, IgnoreRecordNotFoundError: true,
Colorful: true, Colorful: true,
}, },

@ -12,7 +12,7 @@ require (
google.golang.org/grpc v1.45.0 google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.27.1 google.golang.org/protobuf v1.27.1
gorm.io/driver/mysql v1.3.3 gorm.io/driver/mysql v1.3.3
gorm.io/gorm v1.23.4 gorm.io/gorm v1.23.5
) )
require ( require (
@ -87,6 +87,7 @@ require (
google.golang.org/genproto v0.0.0-20220228195345-15d65a4533f7 // indirect google.golang.org/genproto v0.0.0-20220228195345-15d65a4533f7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gorm.io/plugin/optimisticlock v1.0.7 // indirect
k8s.io/api v0.20.12 // indirect k8s.io/api v0.20.12 // indirect
k8s.io/apimachinery v0.20.12 // indirect k8s.io/apimachinery v0.20.12 // indirect
k8s.io/client-go v0.20.12 // indirect k8s.io/client-go v0.20.12 // indirect

@ -852,6 +852,10 @@ gorm.io/driver/mysql v1.3.3/go.mod h1:ChK6AHbHgDCFZyJp0F+BmVGb06PSIoh9uVYKAlRbb2
gorm.io/gorm v1.23.1/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.23.1/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/gorm v1.23.4 h1:1BKWM67O6CflSLcwGQR7ccfmC4ebOxQrTfOQGRE9wjg= gorm.io/gorm v1.23.4 h1:1BKWM67O6CflSLcwGQR7ccfmC4ebOxQrTfOQGRE9wjg=
gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/gorm v1.23.5 h1:TnlF26wScKSvknUC/Rn8t0NLLM22fypYBlvj1+aH6dM=
gorm.io/gorm v1.23.5/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/plugin/optimisticlock v1.0.7 h1:H+UltfbM3twsgMj4WrRLB2YYVdAcVFegj6DdmIuiA7M=
gorm.io/plugin/optimisticlock v1.0.7/go.mod h1:NTvR8qJnB/+O3yMdVdFPRCOjmzJjIRowhFvQ8HIlODs=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

Loading…
Cancel
Save