package bilibili import ( "encoding/json" "fmt" "git.noahlan.cn/northlan/ntools-go/logger" "github.com/go-ping/ping" "github.com/pkg/errors" "io/ioutil" "live-gateway/bilibili/msg_handler" "live-gateway/config" "live-gateway/live" "live-gateway/ws" "net/http" "strconv" "strings" "time" ) type MsgHandler interface { HandlerMessage(data []byte) CMD() string } // 实现 live.Handler 接口 var _ live.Handler = (*LiveBilibili)(nil) type ( LiveBilibili struct { *live.Live sequenceId uint32 roomInfo *RoomInfo // 房间信息 webSocketInfo *WebsocketInfoResp // ws信息 msgHandlerMapper map[string]MsgHandler entered chan struct{} // 进入房间 } ) func NewLiveBilibili() *LiveBilibili { bl := &LiveBilibili{ msgHandlerMapper: make(map[string]MsgHandler, 6), entered: make(chan struct{}), } l := live.NewLive( live.WithWsOptions( ws.WithPacker(NewPackBilibili()), ), ) l.PreConnect(bl.preConnect) l.Init(bl.Init) l.Handler(bl) bl.Live = l return bl } func (l *LiveBilibili) registerMessageHandler(h ...MsgHandler) { for _, handler := range h { l.msgHandlerMapper[handler.CMD()] = handler } } func (l *LiveBilibili) preConnect() (url string, err error) { cfg := config.Config.Bilibili if err = l.initRoomInfo(); err != nil { return "", err } switch cfg.Type { case config.BilibiliTypeOfficial: err = l.initWebsocketInfo() if err != nil { logger.SLog.Errorf("获取wss信息失败, err:%+v", err) return "", err } if len(l.webSocketInfo.Ip) == 0 || len(l.webSocketInfo.WssPort) == 0 { return "", errors.New("获取到的wss信息中地址或端口为空") } logger.SLog.Info("正在寻找连接最快的API地址...") // 获取最快的地址 var minDuration = 1 * time.Hour var minIdx = 0 for i, ip := range l.webSocketInfo.Host { pinger, err := ping.NewPinger(ip) if err != nil { continue } pinger.SetPrivileged(true) pinger.Count = 3 pinger.Interval = 100 * time.Nanosecond // 100ns /time pinger.Timeout = 200 * time.Millisecond // 200ms err = pinger.Run() if err != nil { continue } stat := pinger.Statistics() if stat.AvgRtt < minDuration { minDuration = stat.AvgRtt minIdx = i } } url = fmt.Sprintf("wss://%s:%d/sub", l.webSocketInfo.Host[minIdx], l.webSocketInfo.WssPort[0]) logger.SLog.Info("找到当前最快的弹幕服务器地址 ", url, " 延迟:", minDuration) l.registerMessageHandler( msg_handler.NewDanmakuOfficialHandler(l.roomInfo.RoomId), msg_handler.NewGuardBuyOfficialHandler(l.roomInfo.RoomId), msg_handler.NewGiftOfficialHandler(l.roomInfo.RoomId), ) case config.BilibiliTypeCustom: url = cfg.Custom.Url l.registerMessageHandler( msg_handler.NewDanmakuHandler(l.roomInfo.RoomId), msg_handler.NewSendGiftHandler(l.roomInfo.RoomId), msg_handler.NewGuardBuyHandler(l.roomInfo.RoomId), msg_handler.NewRedPocketHandler(l.roomInfo.RoomId), ) } return } func (l *LiveBilibili) initRoomInfo() error { // 避免重复请求接口 if l.roomInfo != nil { return nil } cfg := config.Config.Bilibili response, err := http.Get(fmt.Sprintf(`%s%d`, cfg.GetRoomUrl, cfg.RoomId)) if err != nil { return err } res, err := ioutil.ReadAll(response.Body) if err != nil { return err } var getRoomInfoResp struct { Code int `json:"code"` Msg string `json:"msg"` Message string `json:"message"` Data *RoomInfo `json:"data"` } err = json.Unmarshal(res, &getRoomInfoResp) if err != nil { return err } l.roomInfo = getRoomInfoResp.Data logger.SLog.Infof("获取房间数据: %+v\n", l.roomInfo) return nil } func (l *LiveBilibili) initWebsocketInfo() error { cfg := config.Config.Bilibili req := &WebsocketInfoReq{ RoomId: l.roomInfo.RoomId, } content, _ := json.Marshal(req) header := &CommonHeader{ ContentType: JsonType, ContentAcceptType: JsonType, Timestamp: strconv.FormatInt(time.Now().Unix(), 10), SignatureMethod: HmacSha256, SignatureVersion: BiliVersion, Authorization: "", Nonce: strconv.FormatInt(time.Now().UnixNano(), 10), AccessKeyId: cfg.Official.AkId, ContentMD5: Md5(string(content)), } header.Authorization = CreateSignature(header, cfg.Official.AkSecret) request, err := http.NewRequest("POST", fmt.Sprintf("%s/v1/common/websocketInfo", cfg.Official.Api), strings.NewReader(string(content))) if err != nil { return err } for k, v := range header.ToMap() { request.Header.Add(k, v) } response, err := http.DefaultClient.Do(request) if err != nil { return err } res, err := ioutil.ReadAll(response.Body) if err != nil { return err } var baseResp BaseResp err = json.Unmarshal(res, &baseResp) if err != nil { return err } if baseResp.Code != 0 { return errors.New(fmt.Sprintf("调用API失败, result: %+v", baseResp)) } var infoData WebsocketInfoResp err = json.Unmarshal(baseResp.Data, &infoData) if err != nil { return err } l.webSocketInfo = &infoData logger.SLog.Debugf("获取到wss信息为: %+v", infoData) return nil } func (l *LiveBilibili) Init(conn *ws.NWebsocket) (err error) { cfg := config.Config.Bilibili switch cfg.Type { case config.BilibiliTypeOfficial: if err = l.auth(conn); err != nil { logger.SLog.Error(err) return } case config.BilibiliTypeCustom: if err = l.joinRoom(conn); err != nil { logger.SLog.Error(err) return } } // 等待进入房间或登录成功 <-l.entered go l.heartbeat(conn, cfg.HeartbeatInterval*time.Second) return } func (l *LiveBilibili) HandlerMessage(v interface{}) { entry, ok := v.(*WsEntry) if !ok { logger.SLog.Warn("读取消息错误, 数据类型不匹配 WsEntry") return } // 处理消息 switch entry.operation { case WsOpEnterRoomOrAuthReply: go func() { select { case l.entered <- struct{}{}: } }() case WsOpHeartbeatReply: // TODO 心跳回复(房间人数) case WsOpMessage: // message with operation if entry.protoVer == WsVerPlain { var cmd CMD err := json.Unmarshal(entry.data, &cmd) if err != nil { logger.SLog.Error("读取消息CMD错误") return } // 暂时忽略CMD错误 if strings.Contains(cmd.CMD, "DANMU_MSG") { cmd.CMD = "DANMU_MSG" } handler, ok := l.msgHandlerMapper[cmd.CMD] if !ok { logger.SLog.Debugf("未发现 %s 处理器", cmd.CMD) return } handler.HandlerMessage(entry.data) } } } func (l *LiveBilibili) auth(conn *ws.NWebsocket) error { if l.webSocketInfo == nil { return errors.New("websocket信息未获取成功,不能登录") } data := &WsEntry{ operation: WsOpEnterRoomOrAuth, data: []byte(l.webSocketInfo.AuthBody), } l.sequenceId++ err := conn.SendBinaryMessage(data) if err != nil { return errors.Wrap(err, "发送AUTH消息到wss失败") } return nil } func (l *LiveBilibili) joinRoom(conn *ws.NWebsocket) error { cfg := config.Config.Bilibili msg := JoinRoomReq{ Platform: "web", ProtoVer: 1, RoomId: l.roomInfo.RoomId, Uid: cfg.UserId, Type: 2, } body, err := json.Marshal(msg) if err != nil { return err } data := &WsEntry{ protoVer: 0, operation: WsOpEnterRoomOrAuth, sequenceId: WsHeaderDefaultSequence, data: body, } if err = conn.SendBinaryMessage(data); err != nil { return err } return nil } func (l *LiveBilibili) heartbeat(conn *ws.NWebsocket, t time.Duration) { hb := func(conn *ws.NWebsocket) { //logger.SLog.Info("heartbeat !!!") data := &WsEntry{ protoVer: 0, operation: WsOpHeartbeat, sequenceId: l.sequenceId, data: nil, } l.sequenceId++ err := conn.SendBinaryMessage(data) if err != nil { return } } hb(conn) ticker := time.NewTicker(t) defer ticker.Stop() for { select { case <-ticker.C: hb(conn) } } }