package msg_transfer // //import ( // "dcg/config" // "dcg/game/manager" // pbCommon "dcg/game/pb/common" // pbMq "dcg/game/pb/mq" // "dcg/game/svc" // kfk "dcg/pkg/kafka" // "git.noahlan.cn/northlan/ntools-go/kafka" // "git.noahlan.cn/northlan/ntools-go/logger" // "github.com/Shopify/sarama" // "google.golang.org/protobuf/proto" //) // //type RewardTransferHandler struct { // svcCtx *svc.ServiceContext // msgHandle map[string]kafkaMsgHandlerFunc // ConsumerGroup *kafka.ConsumerGroup //} // //func (h *RewardTransferHandler) Init(svcCtx *svc.ServiceContext) { // h.svcCtx = svcCtx // cfg := config.Config.Kafka.RewardPool // h.msgHandle = make(map[string]kafkaMsgHandlerFunc) // h.msgHandle[cfg.Topic] = h.handleTopic // // var err error // h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ // KafkaVersion: sarama.V3_1_0_0, // OffsetsInitial: sarama.OffsetNewest, // IsReturnErr: false, // UnMarshaler: kfk.ProtobufMarshaler, // }, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup) // // if err != nil { // logger.SLog.Error(err) // } //} // //func (h *RewardTransferHandler) handleTopic(data []byte, _ string) { // // msg proto // var msgFromMq pbMq.MqRewardPool // err := proto.Unmarshal(data, &msgFromMq) // if err != nil { // logger.SLog.Error("unmarshal msg err", err) // return // } // room, err := manager.GameManager.RoomByBattleId(msgFromMq.BattleId) // if err != nil { // return // } // room.Broadcast("game.rewardPool", &pbCommon.RewardPoolMsg{ // WelfarePool: msgFromMq.WelfarePool, // BattleId: msgFromMq.BattleId, // InitReward: msgFromMq.InitReward, // GiftReward: msgFromMq.GiftReward, // BattleReward: msgFromMq.BattleReward, // OtherReward: msgFromMq.OtherReward, // AllRewards: msgFromMq.AllRewards, // }) //} // //func (RewardTransferHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } //func (RewardTransferHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } //func (h *RewardTransferHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // for msg := range claim.Messages() { // //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) // if hFunc, ok := h.msgHandle[msg.Topic]; ok { // hFunc(msg.Value, string(msg.Key)) // } // sess.MarkMessage(msg, "") // } // return nil //}