package msg_transfer import ( "dcg/config" "dcg/game/manager" pbCommon "dcg/game/pb/common" pbMq "dcg/game/pb/mq" "dcg/game/svc" kfk "dcg/pkg/kafka" "git.noahlan.cn/northlan/ntools-go/kafka" "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) type RewardTransferHandler struct { svcCtx *svc.ServiceContext msgHandle map[string]kafkaMsgHandlerFunc ConsumerGroup *kafka.ConsumerGroup } func (h *RewardTransferHandler) Init(svcCtx *svc.ServiceContext) { h.svcCtx = svcCtx cfg := config.Config.Kafka.RewardPool h.msgHandle = make(map[string]kafkaMsgHandlerFunc) h.msgHandle[cfg.Topic] = h.handleTopic var err error h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup) if err != nil { logger.SLog.Error(err) } } func (h *RewardTransferHandler) handleTopic(data []byte, _ string) { // msg proto var msgFromMq pbMq.MqRewardPool err := proto.Unmarshal(data, &msgFromMq) if err != nil { logger.SLog.Error("unmarshal msg err", err) return } roomId := manager.GameManager.LiveRoomIdByBattleId(msgFromMq.BattleId) room, err := manager.GameManager.RoomManager.RoomByLiveRoomId(roomId) if err != nil { return } room.PushToLiveRoom(roomId, "game.rewardPool", &pbCommon.RewardPoolMsg{ WelfarePool: msgFromMq.WelfarePool, BattleId: msgFromMq.BattleId, InitReward: msgFromMq.InitReward, GiftReward: msgFromMq.GiftReward, BattleReward: msgFromMq.BattleReward, OtherReward: msgFromMq.OtherReward, AllRewards: msgFromMq.AllRewards, }) } func (RewardTransferHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (RewardTransferHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *RewardTransferHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) if hFunc, ok := h.msgHandle[msg.Topic]; ok { hFunc(msg.Value, string(msg.Key)) } sess.MarkMessage(msg, "") } return nil }