package mq import ( "dcg/app/user_center/usercenter" "dcg/game/live_logic" pbCommon "dcg/game/pb/common" pbMq "dcg/game/pb/mq" "dcg/game/svc" kfk "dcg/pkg/kafka" "git.noahlan.cn/northlan/ntools-go/kafka" "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "google.golang.org/protobuf/proto" "strconv" ) type MsgToPushHandler struct { svcCtx *svc.ServiceContext ConsumerGroup *kafka.ConsumerGroup // 消费组 // msgHandle 消息处理 topic:handler msgHandle map[string]kafkaMsgHandlerFunc } func (h *MsgToPushHandler) Init(svcCtx *svc.ServiceContext) { h.svcCtx = svcCtx cfg := svcCtx.Config.Kafka.Danmaku h.msgHandle = make(map[string]kafkaMsgHandlerFunc) h.msgHandle[cfg.Topic] = h.handleDanmaku var err error h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup) if err != nil { logger.SLog.Error(err) } } func (h *MsgToPushHandler) handleDanmaku(data []byte, _ string) { // danmaku msg proto var msgFromMq pbMq.MqDanmaku if err := proto.Unmarshal(data, &msgFromMq); err != nil { logger.SLog.Error("unmarshal msg err", err) return } // rpc创建或获取用户数据 rpcUser, err := h.svcCtx.UserCenterRpc.RetrievePlatformUser(h.svcCtx.Ctx, &usercenter.PlatformUserReq{ Platform: msgFromMq.Platform, PUid: strconv.FormatInt(msgFromMq.Uid, 10), PUname: msgFromMq.Uname, PAvatar: msgFromMq.Avatar, }) if err != nil { logger.SLog.Info("rpc获取用户信息失败,本条消息丢弃...") return } pbUser := &pbCommon.PbUser{ UserId: rpcUser.Id, Username: rpcUser.PUname, Avatar: rpcUser.PAvatar, Platform: rpcUser.Platform, FansMedalWearingStatus: msgFromMq.FansMedalWearingStatus, FansMedalName: msgFromMq.FansMedalName, FansMedalLevel: msgFromMq.FansMedalLevel, } logger.SLog.Debugf("用户 [%s] 发送弹幕 [%s]", pbUser.Username, msgFromMq.Msg) // 游戏命令逻辑处理 live_logic.LiveManager.HandleDanmaku(pbUser, &msgFromMq) } func (MsgToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (MsgToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *MsgToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) if hFunc, ok := h.msgHandle[msg.Topic]; ok { hFunc(msg.Value, string(msg.Key)) } sess.MarkMessage(msg, "") } return nil }