|
|
|
package msg_transfer
|
|
|
|
|
|
|
|
//
|
|
|
|
//import (
|
|
|
|
// "dcg/config"
|
|
|
|
// "dcg/game/manager"
|
|
|
|
// pbCommon "dcg/game/pb/common"
|
|
|
|
// pbMq "dcg/game/pb/mq"
|
|
|
|
// "dcg/game/svc"
|
|
|
|
// kfk "dcg/pkg/kafka"
|
|
|
|
// "git.noahlan.cn/northlan/ntools-go/kafka"
|
|
|
|
// "git.noahlan.cn/northlan/ntools-go/logger"
|
|
|
|
// "github.com/Shopify/sarama"
|
|
|
|
// "google.golang.org/protobuf/proto"
|
|
|
|
//)
|
|
|
|
//
|
|
|
|
//type RewardTransferHandler struct {
|
|
|
|
// svcCtx *svc.ServiceContext
|
|
|
|
// msgHandle map[string]kafkaMsgHandlerFunc
|
|
|
|
// ConsumerGroup *kafka.ConsumerGroup
|
|
|
|
//}
|
|
|
|
//
|
|
|
|
//func (h *RewardTransferHandler) Init(svcCtx *svc.ServiceContext) {
|
|
|
|
// h.svcCtx = svcCtx
|
|
|
|
// cfg := config.Config.Kafka.RewardPool
|
|
|
|
// h.msgHandle = make(map[string]kafkaMsgHandlerFunc)
|
|
|
|
// h.msgHandle[cfg.Topic] = h.handleTopic
|
|
|
|
//
|
|
|
|
// var err error
|
|
|
|
// h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
|
|
|
|
// KafkaVersion: sarama.V3_1_0_0,
|
|
|
|
// OffsetsInitial: sarama.OffsetNewest,
|
|
|
|
// IsReturnErr: false,
|
|
|
|
// UnMarshaler: kfk.ProtobufMarshaler,
|
|
|
|
// }, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup)
|
|
|
|
//
|
|
|
|
// if err != nil {
|
|
|
|
// logger.SLog.Error(err)
|
|
|
|
// }
|
|
|
|
//}
|
|
|
|
//
|
|
|
|
//func (h *RewardTransferHandler) handleTopic(data []byte, _ string) {
|
|
|
|
// // msg proto
|
|
|
|
// var msgFromMq pbMq.MqRewardPool
|
|
|
|
// err := proto.Unmarshal(data, &msgFromMq)
|
|
|
|
// if err != nil {
|
|
|
|
// logger.SLog.Error("unmarshal msg err", err)
|
|
|
|
// return
|
|
|
|
// }
|
|
|
|
// room, err := manager.GameManager.RoomByBattleId(msgFromMq.BattleId)
|
|
|
|
// if err != nil {
|
|
|
|
// return
|
|
|
|
// }
|
|
|
|
// room.Broadcast("game.rewardPool", &pbCommon.RewardPoolMsg{
|
|
|
|
// WelfarePool: msgFromMq.WelfarePool,
|
|
|
|
// BattleId: msgFromMq.BattleId,
|
|
|
|
// InitReward: msgFromMq.InitReward,
|
|
|
|
// GiftReward: msgFromMq.GiftReward,
|
|
|
|
// BattleReward: msgFromMq.BattleReward,
|
|
|
|
// OtherReward: msgFromMq.OtherReward,
|
|
|
|
// AllRewards: msgFromMq.AllRewards,
|
|
|
|
// })
|
|
|
|
//}
|
|
|
|
//
|
|
|
|
//func (RewardTransferHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
//func (RewardTransferHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
//func (h *RewardTransferHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
|
|
// for msg := range claim.Messages() {
|
|
|
|
// //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
|
|
|
// if hFunc, ok := h.msgHandle[msg.Topic]; ok {
|
|
|
|
// hFunc(msg.Value, string(msg.Key))
|
|
|
|
// }
|
|
|
|
// sess.MarkMessage(msg, "")
|
|
|
|
// }
|
|
|
|
// return nil
|
|
|
|
//}
|