You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
2.4 KiB
Go

package msg_transfer
//
//import (
// "dcg/config"
// "dcg/game/manager"
// pbCommon "dcg/game/pb/common"
// pbMq "dcg/game/pb/mq"
// "dcg/game/svc"
// kfk "dcg/pkg/kafka"
// "git.noahlan.cn/northlan/ntools-go/kafka"
// "git.noahlan.cn/northlan/ntools-go/logger"
// "github.com/Shopify/sarama"
// "google.golang.org/protobuf/proto"
//)
//
//type RewardTransferHandler struct {
// svcCtx *svc.ServiceContext
// msgHandle map[string]kafkaMsgHandlerFunc
// ConsumerGroup *kafka.ConsumerGroup
//}
//
//func (h *RewardTransferHandler) Init(svcCtx *svc.ServiceContext) {
// h.svcCtx = svcCtx
// cfg := config.Config.Kafka.RewardPool
// h.msgHandle = make(map[string]kafkaMsgHandlerFunc)
// h.msgHandle[cfg.Topic] = h.handleTopic
//
// var err error
// h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
// KafkaVersion: sarama.V3_1_0_0,
// OffsetsInitial: sarama.OffsetNewest,
// IsReturnErr: false,
// UnMarshaler: kfk.ProtobufMarshaler,
// }, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup)
//
// if err != nil {
// logger.SLog.Error(err)
// }
//}
//
//func (h *RewardTransferHandler) handleTopic(data []byte, _ string) {
// // msg proto
// var msgFromMq pbMq.MqRewardPool
// err := proto.Unmarshal(data, &msgFromMq)
// if err != nil {
// logger.SLog.Error("unmarshal msg err", err)
// return
// }
// room, err := manager.GameManager.RoomByBattleId(msgFromMq.BattleId)
// if err != nil {
// return
// }
// room.Broadcast("game.rewardPool", &pbCommon.RewardPoolMsg{
// WelfarePool: msgFromMq.WelfarePool,
// BattleId: msgFromMq.BattleId,
// InitReward: msgFromMq.InitReward,
// GiftReward: msgFromMq.GiftReward,
// BattleReward: msgFromMq.BattleReward,
// OtherReward: msgFromMq.OtherReward,
// AllRewards: msgFromMq.AllRewards,
// })
//}
//
//func (RewardTransferHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
//func (RewardTransferHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
//func (h *RewardTransferHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// for msg := range claim.Messages() {
// //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// if hFunc, ok := h.msgHandle[msg.Topic]; ok {
// hFunc(msg.Value, string(msg.Key))
// }
// sess.MarkMessage(msg, "")
// }
// return nil
//}