package danmaku import ( "context" "dcg/app/user_center/usercenter" "dcg/config" "dcg/game/command" "dcg/game/logic" pbMq "dcg/game/pb/mq" pushPb "dcg/game/pb/push" "dcg/game/svc" kfk "dcg/pkg/kafka" "git.noahlan.cn/northlan/ntools-go/kafka" "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "strconv" ) type MsgToPushHandler struct { svc *svc.ServiceContext msgHandle map[string]msgHandlerFunc cmdParser *Parser commandManager *command.Manager ConsumerGroup *kafka.ConsumerGroup } func (h *MsgToPushHandler) Init(svc *svc.ServiceContext) { h.svc = svc cfg := config.Config.Kafka.Danmaku h.msgHandle = make(map[string]msgHandlerFunc) h.msgHandle["danmaku"] = h.handleDanmaku h.cmdParser = NewCMDParser(config.Config.Command.Keys) // h.commandManager = command.NewManager() h.commandManager.Register(h.handleJoinGame, "j", "加入", "加入游戏") h.commandManager.Register(h.handleOutbreak, "s") h.commandManager.Register(h.handleWai, "w", "我在哪") h.commandManager.Register(h.handleCreateUnit, "c1", "c2", "c3", "c4") h.commandManager.Register(h.handleMove, "m1", "m2", "m3") h.commandManager.Register(h.handleMode, "r1", "r2", "r3") var err error h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.MsgToPush) if err != nil { logger.SLog.Error(err) } } func (h *MsgToPushHandler) handleJoinGame(roomId int64, _ string, user *pushPb.PushUser) { logic.GameLogic.RoomManager.PushToRoom(roomId, "game.join", &pushPb.JoinGame{User: user}) } func (h *MsgToPushHandler) handleOutbreak(roomId int64, _ string, user *pushPb.PushUser) { logic.GameLogic.RoomManager.PushToRoom(roomId, "game.outbreak", &pushPb.Outbreak{User: user}) } func (h *MsgToPushHandler) handleCreateUnit(roomId int64, cmd string, user *pushPb.PushUser) { if len(cmd) < 2 { return } unit := cmd[1] logic.GameLogic.RoomManager.PushToRoom(roomId, "game.createUnit", &pushPb.CreateUnit{ User: user, Unit: string(unit), }) } func (h *MsgToPushHandler) handleMove(roomId int64, cmd string, user *pushPb.PushUser) { if len(cmd) < 2 { return } line := cmd[1] logic.GameLogic.RoomManager.PushToRoom(roomId, "game.move", &pushPb.Move{ User: user, Line: string(line), }) } func (h *MsgToPushHandler) handleWai(roomId int64, _ string, user *pushPb.PushUser) { logic.GameLogic.RoomManager.PushToRoom(roomId, "game.wai", &pushPb.Wai{User: user}) } func (h *MsgToPushHandler) handleMode(roomId int64, cmd string, user *pushPb.PushUser) { if len(cmd) < 2 { return } line := cmd[1] logic.GameLogic.RoomManager.PushToRoom(roomId, "game.mode", &pushPb.BuildingMode{ User: user, Mode: string(line), }) } func (h *MsgToPushHandler) handleDanmaku(data []byte, _ string) { // danmaku msg proto var msgFromMq pbMq.MqDanmaku if err := proto.Unmarshal(data, &msgFromMq); err != nil { logger.SLog.Error("unmarshal msg err", err) return } // rpc创建或获取用户数据 pbUser := &pushPb.PushUser{ UId: msgFromMq.Uid, Uname: msgFromMq.Uname, } rpcUser, err := h.svc.UserCenterRpc.RetrievePlatformUser(context.Background(), &usercenter.PlatformUserReq{ Platform: msgFromMq.Platform, PUid: strconv.FormatInt(msgFromMq.Uid, 10), }) if err == nil { pbUser.UId = rpcUser.User.Id pbUser.Avatar = rpcUser.User.PAvatar } else { logger.SLog.Info("rpc获取用户信息失败") } cmdStruct := h.cmdParser.Parse(msgFromMq.Content) if cmdStruct.IsCMD { for _, cmd := range cmdStruct.Arr { h.commandManager.Handle(msgFromMq.LiveRoomId, cmd, pbUser) } } else { // 发送正常的非命令弹幕消息 logic.GameLogic.RoomManager.PushToRoom(msgFromMq.LiveRoomId, "live.danmaku", &pushPb.DanmakuMsg{ User: pbUser, Content: msgFromMq.Content, }) } } func (MsgToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (MsgToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *MsgToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) if hFunc, ok := h.msgHandle[msg.Topic]; ok { hFunc(msg.Value, string(msg.Key)) } sess.MarkMessage(msg, "") } return nil }