package msg_handler import ( "encoding/json" "git.noahlan.cn/northlan/ntools-go/kafka" "git.noahlan.cn/northlan/ntools-go/logger" "live-gateway/config" pbMq "live-gateway/pb/mq" pbVars "live-gateway/pb/vars" kfk "live-gateway/pkg/kafka" "strconv" ) //var _ bilibili.MsgHandler = (*DanmakuHandler)(nil) // DanmakuOfficial 弹幕结构 官方 type DanmakuOfficial struct { Uname string `json:"uname"` // 用户昵称 UID int64 `json:"uid"` // 用户ID UFace string `json:"uface"` // 用户头像 Timestamp int64 `json:"timestamp"` // 弹幕发送时间秒级时间戳 RoomId int64 `json:"room_id"` // 用户模式 Msg string `json:"msg"` // 弹幕内容 MsgId string `json:"msg_id"` // 消息唯一ID GuardLevel int64 `json:"guard_level"` // 对应房间大航海等级 FansMedalWearingStatus bool `json:"fans_medal_wearing_status"` // 当前佩戴粉丝勋章佩戴状态 FansMedalName string `json:"fans_medal_name"` // 粉丝勋章名 FansMedalLevel int64 `json:"fans_medal_level"` // 粉丝勋章等级 } type DanmakuOfficialHandler struct { producer *kafka.Producer liveRoomId int64 } func NewDanmakuOfficialHandler(liveRoomId int64) *DanmakuOfficialHandler { cfg := config.Config.Kafka.Danmaku return &DanmakuOfficialHandler{ producer: kafka.NewKafkaProducer(kfk.DefaultProducerConfig, cfg.Addr, cfg.Topic), liveRoomId: liveRoomId, } } func (d *DanmakuOfficialHandler) CMD() string { return "LIVE_OPEN_PLATFORM_DM" } func (d *DanmakuOfficialHandler) HandlerMessage(data []byte) { var baseMsg struct { CMD string `json:"cmd"` Data *DanmakuOfficial `json:"data"` } if err := json.Unmarshal(data, &baseMsg); err != nil { return } dm := baseMsg.Data logger.SLog.Debugf("%s 说: %s", dm.Uname, dm.Msg) logger.SLog.Debugf("%+v", dm) dmMsg := &pbMq.MqDanmaku{ Platform: pbVars.Platform_name[int32(pbVars.Platform_Bilibili)], LiveRoomId: d.liveRoomId, Uid: dm.UID, Uname: dm.Uname, Avatar: dm.UFace, Msg: dm.Msg, MsgId: dm.MsgId, Timestamp: dm.Timestamp, NobilityLevel: dm.GuardLevel, FansMedalWearingStatus: dm.FansMedalWearingStatus, FansMedalName: dm.FansMedalName, FansMedalLevel: dm.FansMedalLevel, } _ = d.producer.SendMessageAsync(dmMsg, strconv.FormatInt(dm.UID, 10)) }