package msg_transfer import ( "dcg/app/user_center/usercenter" "dcg/config" "dcg/game/live" "dcg/game/logic" pbCommon "dcg/game/pb/common" pbMq "dcg/game/pb/mq" "dcg/game/svc" kfk "dcg/pkg/kafka" "git.noahlan.cn/northlan/ntools-go/kafka" "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "strconv" ) type GiftToPushHandler struct { svcCtx *svc.ServiceContext msgHandle map[string]kafkaMsgHandlerFunc ConsumerGroup *kafka.ConsumerGroup } func (h *GiftToPushHandler) Init(svcCtx *svc.ServiceContext) { h.svcCtx = svcCtx cfg := config.Config.Kafka.Gift h.msgHandle = make(map[string]kafkaMsgHandlerFunc) h.msgHandle[cfg.Topic] = h.handleGift var err error h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup) if err != nil { logger.SLog.Error(err) } } func (h *GiftToPushHandler) handleGift(data []byte, msgKey string) { // msg proto var msgFromMq pbMq.MqGift err := proto.Unmarshal(data, &msgFromMq) if err != nil { logger.SLog.Error("unmarshal msg err", err) return } // rpc记录送礼 logger.SLog.Debugf("队列礼物消息: %s 投喂 %s 价值 %d", msgFromMq.Uname, msgFromMq.GiftName, msgFromMq.Price*msgFromMq.Num) req := &usercenter.UserSendGiftReq{ Platform: msgFromMq.Platform, PUid: strconv.FormatInt(msgFromMq.Uid, 10), RoomId: strconv.FormatInt(msgFromMq.LiveRoomId, 10), GiftId: msgFromMq.GiftId, GiftName: msgFromMq.GiftName, Num: msgFromMq.Num, Price: msgFromMq.Price, IsPaid: msgFromMq.IsPaid, } giftResp, err := h.svcCtx.UserCenterRpc.UserSendGift(h.svcCtx.Ctx, req) if err != nil { logger.SLog.Info("rpc 用户送礼记录失败,本条消息丢弃...") return } rpcUser := giftResp.User pbUser := &pbCommon.PbUser{ UId: rpcUser.Id, Uname: rpcUser.PUname, Avatar: rpcUser.PAvatar, NobilityLevel: rpcUser.NobilityLevel, Integral: rpcUser.Integral, } // 用户积分变更,通用消息 if room, err := logic.GameLogic.RoomManager.RoomByLiveRoomId(msgFromMq.LiveRoomId); err == nil { room.PushToLiveRoom(msgFromMq.LiveRoomId, "user.integral", &pbCommon.UserIntegralChanged{ User: pbUser, Change: giftResp.Integral.Change, Integral: giftResp.Integral.Integral, }) } // 游戏逻辑处理 live.LiveManager.HandleGift(msgFromMq.LiveRoomId, pbUser, &msgFromMq) } func (GiftToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (GiftToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *GiftToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) if hFunc, ok := h.msgHandle[msg.Topic]; ok { hFunc(msg.Value, string(msg.Key)) } sess.MarkMessage(msg, "") } return nil }