You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

43 lines
1.2 KiB
Go

package mq
import (
"context"
"git.noahlan.cn/northlan/ntools-go/kafka"
"github.com/zeromicro/go-zero/core/logx"
pbMq "live-service/app/pb/mq"
"live-service/app/user_center/model"
"live-service/app/user_center/rpc/internal/config"
kfk "live-service/common/kafka"
)
type (
UserMq struct {
userPlatformModel model.UserPlatformModel
coinChangedKfk *kafka.Producer
logx.Logger
}
)
func NewUserMq(userPlatformModel model.UserPlatformModel, cfg config.KafkaProducer) *UserMq {
return &UserMq{
userPlatformModel: userPlatformModel,
coinChangedKfk: kafka.NewKafkaProducer(kfk.DefaultProducerConfig, cfg.UserCoinNotify.Addr, cfg.UserCoinNotify.Topic),
Logger: logx.WithContext(context.Background()),
}
}
func (m *UserMq) NotifyUserCoinChanged(req *pbMq.MqUserCoinChanged) {
if req.Username == "" || req.Avatar == "" {
// 读取用户数据
if dbModel, err := m.userPlatformModel.FindDisplayOneByUserId(context.Background(), req.UserId); err == nil {
req.Username = dbModel.PUname
req.Avatar = dbModel.PAvatar
}
}
err := m.coinChangedKfk.SendMessageAsync(req)
if err != nil {
m.Logger.Error("发送通知用户金币变更消息失败")
}
}