|
|
|
|
package mq
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"dcg/app/user_center/usercenter"
|
|
|
|
|
"dcg/game/live_logic"
|
|
|
|
|
pbCommon "dcg/game/pb/common"
|
|
|
|
|
pbMq "dcg/game/pb/mq"
|
|
|
|
|
"dcg/game/svc"
|
|
|
|
|
kfk "dcg/pkg/kafka"
|
|
|
|
|
"git.noahlan.cn/northlan/ntools-go/kafka"
|
|
|
|
|
"git.noahlan.cn/northlan/ntools-go/logger"
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
|
"strconv"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MsgToPushHandler struct {
|
|
|
|
|
svcCtx *svc.ServiceContext
|
|
|
|
|
ConsumerGroup *kafka.ConsumerGroup // 消费组
|
|
|
|
|
|
|
|
|
|
// msgHandle 消息处理 topic:handler
|
|
|
|
|
msgHandle map[string]kafkaMsgHandlerFunc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) Init(svcCtx *svc.ServiceContext) {
|
|
|
|
|
h.svcCtx = svcCtx
|
|
|
|
|
cfg := svcCtx.Config.Kafka.Danmaku
|
|
|
|
|
h.msgHandle = make(map[string]kafkaMsgHandlerFunc)
|
|
|
|
|
h.msgHandle[cfg.Topic] = h.handleDanmaku
|
|
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
|
|
|
|
|
KafkaVersion: sarama.V3_1_0_0,
|
|
|
|
|
OffsetsInitial: sarama.OffsetNewest,
|
|
|
|
|
IsReturnErr: false,
|
|
|
|
|
UnMarshaler: kfk.ProtobufMarshaler,
|
|
|
|
|
}, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.SLog.Error(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleDanmaku(data []byte, _ string) {
|
|
|
|
|
// danmaku msg proto
|
|
|
|
|
var msgFromMq pbMq.MqDanmaku
|
|
|
|
|
if err := proto.Unmarshal(data, &msgFromMq); err != nil {
|
|
|
|
|
logger.SLog.Error("unmarshal msg err", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// rpc创建或获取用户数据
|
|
|
|
|
rpcUser, err := h.svcCtx.UserCenterRpc.RetrievePlatformUser(h.svcCtx.Ctx, &usercenter.PlatformUserReq{
|
|
|
|
|
Platform: msgFromMq.Platform,
|
|
|
|
|
PUid: strconv.FormatInt(msgFromMq.Uid, 10),
|
|
|
|
|
PUname: msgFromMq.Uname,
|
|
|
|
|
PAvatar: msgFromMq.Avatar,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.SLog.Info("rpc获取用户信息失败,本条消息丢弃...")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
pbUser := &pbCommon.PbUser{
|
|
|
|
|
UserId: rpcUser.Id,
|
|
|
|
|
Username: rpcUser.PUname,
|
|
|
|
|
Avatar: rpcUser.PAvatar,
|
|
|
|
|
Platform: rpcUser.Platform,
|
|
|
|
|
FansMedalWearingStatus: msgFromMq.FansMedalWearingStatus,
|
|
|
|
|
FansMedalName: msgFromMq.FansMedalName,
|
|
|
|
|
FansMedalLevel: msgFromMq.FansMedalLevel,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.SLog.Debugf("用户 [%s] 发送弹幕 [%s]", pbUser.Username, msgFromMq.Msg)
|
|
|
|
|
|
|
|
|
|
// 游戏命令逻辑处理
|
|
|
|
|
live_logic.LiveManager.HandleDanmaku(pbUser, &msgFromMq)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (MsgToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
|
func (MsgToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
|
func (h *MsgToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
|
|
|
for msg := range claim.Messages() {
|
|
|
|
|
//logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
|
|
|
|
if hFunc, ok := h.msgHandle[msg.Topic]; ok {
|
|
|
|
|
hFunc(msg.Value, string(msg.Key))
|
|
|
|
|
}
|
|
|
|
|
sess.MarkMessage(msg, "")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|