package danmaku import ( "dcg/config" "dcg/game/command" "dcg/game/pb" "dcg/game/svc" "dcg/pkg/kafka" "dcg/pkg/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "github.com/spf13/cast" "regexp" ) type MsgToPushHandler struct { ctx *svc.ServiceContext msgHandle map[string]msgHandlerFunc regex *regexp.Regexp commandManager *command.Manager ConsumerGroup *kafka.ConsumerGroup } func (h *MsgToPushHandler) Init(ctx *svc.ServiceContext) { h.ctx = ctx cfg := config.Config.Kafka.Danmaku h.msgHandle = make(map[string]msgHandlerFunc) h.msgHandle["danmaku"] = h.handleDanmaku h.commandManager = command.NewManager() h.commandManager.Register("j", h.handleJoinGame) h.commandManager.Register("c", h.handleCreateUnit) h.commandManager.Register("s", h.handleOutbreak) h.commandManager.Register("m", h.handleMove) h.commandManager.Register("w", h.handleWai) var err error h.regex, err = regexp.Compile(config.Config.Command.Regex) if err != nil { logger.SLog.Error(err) } h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.MsgToPush) if err != nil { logger.SLog.Error(err) } } func (h *MsgToPushHandler) handleJoinGame(_ string, user *pb.User) { h.ctx.RoomManager.Broadcast("game.join", &pb.JoinGame{User: user}) } func (h *MsgToPushHandler) handleOutbreak(_ string, user *pb.User) { h.ctx.RoomManager.Broadcast("game.outbreak", &pb.Outbreak{User: user}) } func (h *MsgToPushHandler) handleCreateUnit(cmd string, user *pb.User) { if len(cmd) < 2 { return } unit := cmd[1] h.ctx.RoomManager.Broadcast("game.createUnit", &pb.CreateUnit{ User: user, Unit: cast.ToString(unit), }) } func (h *MsgToPushHandler) handleMove(cmd string, user *pb.User) { if len(cmd) < 2 { return } line := cmd[1] h.ctx.RoomManager.Broadcast("game.move", &pb.Move{ User: user, Line: cast.ToString(line), }) } func (h *MsgToPushHandler) handleWai(cmd string, user *pb.User) { h.ctx.RoomManager.Broadcast("game.wai", &pb.Wai{User: user}) } func (h *MsgToPushHandler) handleDanmaku(data []byte, msgKey string) { // danmaku msg proto var msgFromMq pb.Danmaku err := proto.Unmarshal(data, &msgFromMq) if err != nil { logger.SLog.Error("unmarshal msg err", err) return } cmdArr := h.parseCommands(msgFromMq.Content) for _, cmd := range cmdArr { h.commandManager.Handle(cmd, &pb.User{ UId: msgFromMq.Uid, Uname: msgFromMq.Uname, }) } } func (h *MsgToPushHandler) parseCommands(msg string) []string { if h.regex == nil { logger.SLog.Error("regex is null") return []string{} } return h.regex.FindAllString(msg, -1) } func (MsgToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (MsgToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *MsgToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { //logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) if hFunc, ok := h.msgHandle[msg.Topic]; ok { hFunc(msg.Value, string(msg.Key)) } sess.MarkMessage(msg, "") } return nil }