|
|
|
package danmaku
|
|
|
|
|
|
|
|
import (
|
|
|
|
"dcg/config"
|
|
|
|
"dcg/game/command"
|
|
|
|
pbMq "dcg/game/pb/mq"
|
|
|
|
pushPb "dcg/game/pb/push"
|
|
|
|
"dcg/game/svc"
|
|
|
|
kfk "dcg/pkg/kafka"
|
|
|
|
"git.noahlan.cn/northlan/ntools-go/kafka"
|
|
|
|
"git.noahlan.cn/northlan/ntools-go/logger"
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"regexp"
|
|
|
|
)
|
|
|
|
|
|
|
|
type MsgToPushHandler struct {
|
|
|
|
ctx *svc.ServiceContext
|
|
|
|
msgHandle map[string]msgHandlerFunc
|
|
|
|
regex *regexp.Regexp
|
|
|
|
commandManager *command.Manager
|
|
|
|
ConsumerGroup *kafka.ConsumerGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) Init(ctx *svc.ServiceContext) {
|
|
|
|
h.ctx = ctx
|
|
|
|
|
|
|
|
cfg := config.Config.Kafka.Danmaku
|
|
|
|
h.msgHandle = make(map[string]msgHandlerFunc)
|
|
|
|
h.msgHandle["danmaku"] = h.handleDanmaku
|
|
|
|
|
|
|
|
h.commandManager = command.NewManager()
|
|
|
|
h.commandManager.Register("j", h.handleJoinGame)
|
|
|
|
h.commandManager.Register("c", h.handleCreateUnit)
|
|
|
|
h.commandManager.Register("s", h.handleOutbreak)
|
|
|
|
h.commandManager.Register("m", h.handleMove)
|
|
|
|
h.commandManager.Register("w", h.handleWai)
|
|
|
|
h.commandManager.Register("b", h.handleMode)
|
|
|
|
|
|
|
|
var err error
|
|
|
|
h.regex, err = regexp.Compile(config.Config.Command.Regex)
|
|
|
|
if err != nil {
|
|
|
|
logger.SLog.Error(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
|
|
|
|
KafkaVersion: sarama.V3_1_0_0,
|
|
|
|
OffsetsInitial: sarama.OffsetNewest,
|
|
|
|
IsReturnErr: false,
|
|
|
|
UnMarshaler: kfk.ProtobufMarshaler,
|
|
|
|
}, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.MsgToPush)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
logger.SLog.Error(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleJoinGame(roomId int64, _ string, user *pushPb.User) {
|
|
|
|
h.ctx.RoomManager.PushToRoom(roomId, "game.join", &pushPb.JoinGame{User: user})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleOutbreak(roomId int64, _ string, user *pushPb.User) {
|
|
|
|
h.ctx.RoomManager.PushToRoom(roomId, "game.outbreak", &pushPb.Outbreak{User: user})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleCreateUnit(roomId int64, cmd string, user *pushPb.User) {
|
|
|
|
if len(cmd) < 2 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
unit := cmd[1]
|
|
|
|
h.ctx.RoomManager.PushToRoom(roomId, "game.createUnit", &pushPb.CreateUnit{
|
|
|
|
User: user,
|
|
|
|
Unit: string(unit),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleMove(roomId int64, cmd string, user *pushPb.User) {
|
|
|
|
if len(cmd) < 2 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
line := cmd[1]
|
|
|
|
h.ctx.RoomManager.PushToRoom(roomId, "game.move", &pushPb.Move{
|
|
|
|
User: user,
|
|
|
|
Line: string(line),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleWai(roomId int64, _ string, user *pushPb.User) {
|
|
|
|
h.ctx.RoomManager.PushToRoom(roomId, "game.wai", &pushPb.Wai{User: user})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleMode(roomId int64, cmd string, user *pushPb.User) {
|
|
|
|
if len(cmd) < 2 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
line := cmd[1]
|
|
|
|
h.ctx.RoomManager.PushToRoom(roomId, "game.mode", &pushPb.BuildingMode{
|
|
|
|
User: user,
|
|
|
|
Mode: string(line),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) handleDanmaku(data []byte, msgKey string) {
|
|
|
|
// danmaku msg proto
|
|
|
|
var msgFromMq pbMq.MqDanmaku
|
|
|
|
err := proto.Unmarshal(data, &msgFromMq)
|
|
|
|
if err != nil {
|
|
|
|
logger.SLog.Error("unmarshal msg err", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
pbUser := &pushPb.User{
|
|
|
|
UId: msgFromMq.Uid,
|
|
|
|
Uname: msgFromMq.Uname,
|
|
|
|
}
|
|
|
|
cmdArr := h.parseCommands(msgFromMq.Content)
|
|
|
|
for _, cmd := range cmdArr {
|
|
|
|
h.commandManager.Handle(msgFromMq.LiveRoomId, cmd, pbUser)
|
|
|
|
}
|
|
|
|
// 发送正常非命令弹幕消息
|
|
|
|
if len(cmdArr) <= 0 {
|
|
|
|
h.ctx.RoomManager.PushToRoom(msgFromMq.LiveRoomId, "live.danmaku", &pushPb.DanmakuMsg{
|
|
|
|
User: pbUser,
|
|
|
|
Content: msgFromMq.Content,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *MsgToPushHandler) parseCommands(msg string) []string {
|
|
|
|
if h.regex == nil {
|
|
|
|
logger.SLog.Error("regex is null")
|
|
|
|
return []string{}
|
|
|
|
}
|
|
|
|
return h.regex.FindAllString(msg, -1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (MsgToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
func (MsgToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
func (h *MsgToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
|
|
for msg := range claim.Messages() {
|
|
|
|
//logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
|
|
|
if hFunc, ok := h.msgHandle[msg.Topic]; ok {
|
|
|
|
hFunc(msg.Value, string(msg.Key))
|
|
|
|
}
|
|
|
|
sess.MarkMessage(msg, "")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|