You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

101 lines
3.1 KiB
Go

package msg_transfer
import (
"dcg/app/user_center/usercenter"
"dcg/config"
"dcg/game/live"
"dcg/game/logic"
pbCommon "dcg/game/pb/common"
pbMq "dcg/game/pb/mq"
"dcg/game/svc"
kfk "dcg/pkg/kafka"
"git.noahlan.cn/northlan/ntools-go/kafka"
"git.noahlan.cn/northlan/ntools-go/logger"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"strconv"
)
type GiftToPushHandler struct {
svcCtx *svc.ServiceContext
msgHandle map[string]kafkaMsgHandlerFunc
ConsumerGroup *kafka.ConsumerGroup
}
func (h *GiftToPushHandler) Init(svcCtx *svc.ServiceContext) {
h.svcCtx = svcCtx
cfg := config.Config.Kafka.Gift
h.msgHandle = make(map[string]kafkaMsgHandlerFunc)
h.msgHandle[cfg.Topic] = h.handleGift
var err error
h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
KafkaVersion: sarama.V3_1_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UnMarshaler: kfk.ProtobufMarshaler,
}, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup)
if err != nil {
logger.SLog.Error(err)
}
}
func (h *GiftToPushHandler) handleGift(data []byte, msgKey string) {
// msg proto
var msgFromMq pbMq.MqGift
err := proto.Unmarshal(data, &msgFromMq)
if err != nil {
logger.SLog.Error("unmarshal msg err", err)
return
}
// rpc记录送礼
logger.SLog.Debugf("队列礼物消息: %s 投喂 %s 价值 %d", msgFromMq.Uname, msgFromMq.GiftName, msgFromMq.Price*msgFromMq.Num)
req := &usercenter.UserSendGiftReq{
Platform: msgFromMq.Platform,
PUid: strconv.FormatInt(msgFromMq.Uid, 10),
RoomId: strconv.FormatInt(msgFromMq.LiveRoomId, 10),
GiftId: msgFromMq.GiftId,
GiftName: msgFromMq.GiftName,
Num: msgFromMq.Num,
Price: msgFromMq.Price,
IsPaid: msgFromMq.IsPaid,
}
giftResp, err := h.svcCtx.UserCenterRpc.UserSendGift(h.svcCtx.Ctx, req)
if err != nil {
logger.SLog.Info("rpc 用户送礼记录失败,本条消息丢弃...")
return
}
rpcUser := giftResp.User
pbUser := &pbCommon.PbUser{
UId: rpcUser.Id,
Uname: rpcUser.PUname,
Avatar: rpcUser.PAvatar,
NobilityLevel: rpcUser.NobilityLevel,
Integral: rpcUser.Integral,
}
// 用户积分变更,通用消息
if room, err := logic.GameLogic.RoomManager.RoomByLiveRoomId(msgFromMq.LiveRoomId); err == nil {
room.PushToLiveRoom(msgFromMq.LiveRoomId, "user.integral", &pbCommon.UserIntegralChanged{
User: pbUser,
Change: giftResp.Integral.Change,
Integral: giftResp.Integral.Integral,
})
}
// 游戏逻辑处理
live.LiveManager.HandleGift(msgFromMq.LiveRoomId, pbUser, &msgFromMq)
}
func (GiftToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (GiftToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *GiftToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
//logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
if hFunc, ok := h.msgHandle[msg.Topic]; ok {
hFunc(msg.Value, string(msg.Key))
}
sess.MarkMessage(msg, "")
}
return nil
}