You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
4.4 KiB
Go

package danmaku
import (
"context"
"dcg/app/user_center/pb"
"dcg/config"
"dcg/game/command"
pbMq "dcg/game/pb/mq"
pushPb "dcg/game/pb/push"
"dcg/game/svc"
kfk "dcg/pkg/kafka"
"git.noahlan.cn/northlan/ntools-go/kafka"
"git.noahlan.cn/northlan/ntools-go/logger"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"strconv"
)
type MsgToPushHandler struct {
svc *svc.ServiceContext
msgHandle map[string]msgHandlerFunc
cmdParser *Parser
commandManager *command.Manager
ConsumerGroup *kafka.ConsumerGroup
}
func (h *MsgToPushHandler) Init(svc *svc.ServiceContext) {
h.svc = svc
cfg := config.Config.Kafka.Danmaku
h.msgHandle = make(map[string]msgHandlerFunc)
h.msgHandle["danmaku"] = h.handleDanmaku
h.cmdParser = NewCMDParser(config.Config.Command.Keys)
//
h.commandManager = command.NewManager()
h.commandManager.Register(h.handleJoinGame, "j", "加入", "加入游戏")
h.commandManager.Register(h.handleOutbreak, "s")
h.commandManager.Register(h.handleWai, "w", "我在哪")
h.commandManager.Register(h.handleCreateUnit, "c1", "c2", "c3", "c4")
h.commandManager.Register(h.handleMove, "m1", "m2", "m3")
h.commandManager.Register(h.handleMode, "r1", "r2", "r3")
var err error
h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
KafkaVersion: sarama.V3_1_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UnMarshaler: kfk.ProtobufMarshaler,
}, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.MsgToPush)
if err != nil {
logger.SLog.Error(err)
}
}
func (h *MsgToPushHandler) handleJoinGame(roomId int64, _ string, user *pushPb.PushUser) {
h.svc.RoomManager.PushToRoom(roomId, "game.join", &pushPb.JoinGame{User: user})
}
func (h *MsgToPushHandler) handleOutbreak(roomId int64, _ string, user *pushPb.PushUser) {
h.svc.RoomManager.PushToRoom(roomId, "game.outbreak", &pushPb.Outbreak{User: user})
}
func (h *MsgToPushHandler) handleCreateUnit(roomId int64, cmd string, user *pushPb.PushUser) {
if len(cmd) < 2 {
return
}
unit := cmd[1]
h.svc.RoomManager.PushToRoom(roomId, "game.createUnit", &pushPb.CreateUnit{
User: user,
Unit: string(unit),
})
}
func (h *MsgToPushHandler) handleMove(roomId int64, cmd string, user *pushPb.PushUser) {
if len(cmd) < 2 {
return
}
line := cmd[1]
h.svc.RoomManager.PushToRoom(roomId, "game.move", &pushPb.Move{
User: user,
Line: string(line),
})
}
func (h *MsgToPushHandler) handleWai(roomId int64, _ string, user *pushPb.PushUser) {
h.svc.RoomManager.PushToRoom(roomId, "game.wai", &pushPb.Wai{User: user})
}
func (h *MsgToPushHandler) handleMode(roomId int64, cmd string, user *pushPb.PushUser) {
if len(cmd) < 2 {
return
}
line := cmd[1]
h.svc.RoomManager.PushToRoom(roomId, "game.mode", &pushPb.BuildingMode{
User: user,
Mode: string(line),
})
}
func (h *MsgToPushHandler) handleDanmaku(data []byte, msgKey string) {
// danmaku msg proto
var msgFromMq pbMq.MqDanmaku
if err := proto.Unmarshal(data, &msgFromMq); err != nil {
logger.SLog.Error("unmarshal msg err", err)
return
}
// rpc创建或获取用户数据
pbUser := &pushPb.PushUser{
UId: msgFromMq.Uid,
Uname: msgFromMq.Uname,
}
rpcUser, err := h.svc.UserCenterRpc.RetrievePlatformUser(context.Background(), &pb.PlatformUserReq{
Platform: msgFromMq.Platform,
PUid: strconv.FormatInt(msgFromMq.Uid, 10),
})
if err == nil {
pbUser.UId = rpcUser.User.Id
pbUser.Avatar = rpcUser.User.PAvatar
} else {
logger.SLog.Info("rpc获取用户信息失败")
}
cmdStruct := h.cmdParser.Parse(msgFromMq.Content)
if cmdStruct.IsCMD {
for _, cmd := range cmdStruct.Arr {
h.commandManager.Handle(msgFromMq.LiveRoomId, cmd, pbUser)
}
} else {
// 发送正常的非命令弹幕消息
h.svc.RoomManager.PushToRoom(msgFromMq.LiveRoomId, "live.danmaku", &pushPb.DanmakuMsg{
User: pbUser,
Content: msgFromMq.Content,
})
}
}
func (MsgToPushHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (MsgToPushHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *MsgToPushHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
//logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
if hFunc, ok := h.msgHandle[msg.Topic]; ok {
hFunc(msg.Value, string(msg.Key))
}
sess.MarkMessage(msg, "")
}
return nil
}