You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

338 lines
7.7 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package bilibili
import (
"encoding/json"
"fmt"
"git.noahlan.cn/northlan/ntools-go/logger"
"github.com/go-ping/ping"
"github.com/pkg/errors"
"io/ioutil"
"live-gateway/bilibili/msg_handler"
"live-gateway/config"
"live-gateway/live"
"live-gateway/ws"
"net/http"
"strconv"
"strings"
"time"
)
type MsgHandler interface {
HandlerMessage(data []byte)
CMD() string
}
// 实现 live.Handler 接口
var _ live.Handler = (*LiveBilibili)(nil)
type (
LiveBilibili struct {
*live.Live
sequenceId uint32
roomInfo *RoomInfo // 房间信息
webSocketInfo *WebsocketInfoResp // ws信息
msgHandlerMapper map[string]MsgHandler
entered chan struct{} // 进入房间
}
)
func NewLiveBilibili() *LiveBilibili {
bl := &LiveBilibili{
msgHandlerMapper: make(map[string]MsgHandler, 6),
entered: make(chan struct{}),
}
l := live.NewLive(
live.WithWsOptions(
ws.WithPacker(NewPackBilibili()),
),
)
l.PreConnect(bl.preConnect)
l.Init(bl.Init)
l.Handler(bl)
bl.Live = l
return bl
}
func (l *LiveBilibili) registerMessageHandler(h ...MsgHandler) {
for _, handler := range h {
l.msgHandlerMapper[handler.CMD()] = handler
}
}
func (l *LiveBilibili) preConnect() (url string, err error) {
cfg := config.Config.Bilibili
if err = l.initRoomInfo(); err != nil {
return "", err
}
switch cfg.Type {
case config.BilibiliTypeOfficial:
err = l.initWebsocketInfo()
if err != nil {
logger.SLog.Errorf("获取wss信息失败, err:%+v", err)
return "", err
}
if len(l.webSocketInfo.Ip) == 0 || len(l.webSocketInfo.WssPort) == 0 {
return "", errors.New("获取到的wss信息中地址或端口为空")
}
logger.SLog.Info("正在寻找连接最快的API地址...")
// 获取最快的地址
var minDuration = 1 * time.Hour
var minIdx = 0
for i, ip := range l.webSocketInfo.Host {
pinger, err := ping.NewPinger(ip)
if err != nil {
continue
}
pinger.SetPrivileged(true)
pinger.Count = 3
pinger.Interval = 100 * time.Nanosecond // 100ns /time
pinger.Timeout = 200 * time.Millisecond // 200ms
err = pinger.Run()
if err != nil {
continue
}
stat := pinger.Statistics()
if stat.AvgRtt < minDuration {
minDuration = stat.AvgRtt
minIdx = i
}
}
url = fmt.Sprintf("wss://%s:%d/sub", l.webSocketInfo.Host[minIdx], l.webSocketInfo.WssPort[0])
logger.SLog.Info("找到当前最快的弹幕服务器地址 ", url, " 延迟:", minDuration)
l.registerMessageHandler(
msg_handler.NewDanmakuOfficialHandler(l.roomInfo.RoomId),
msg_handler.NewGuardBuyOfficialHandler(l.roomInfo.RoomId),
msg_handler.NewGiftOfficialHandler(l.roomInfo.RoomId),
)
case config.BilibiliTypeCustom:
url = cfg.Custom.Url
l.registerMessageHandler(
msg_handler.NewDanmakuHandler(l.roomInfo.RoomId),
msg_handler.NewSendGiftHandler(l.roomInfo.RoomId),
msg_handler.NewGuardBuyHandler(l.roomInfo.RoomId),
msg_handler.NewRedPocketHandler(l.roomInfo.RoomId),
)
}
return
}
func (l *LiveBilibili) initRoomInfo() error {
// 避免重复请求接口
if l.roomInfo != nil {
return nil
}
cfg := config.Config.Bilibili
response, err := http.Get(fmt.Sprintf(`%s%d`, cfg.GetRoomUrl, cfg.RoomId))
if err != nil {
return err
}
res, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
var getRoomInfoResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Message string `json:"message"`
Data *RoomInfo `json:"data"`
}
err = json.Unmarshal(res, &getRoomInfoResp)
if err != nil {
return err
}
l.roomInfo = getRoomInfoResp.Data
logger.SLog.Infof("获取房间数据: %+v\n", l.roomInfo)
return nil
}
func (l *LiveBilibili) initWebsocketInfo() error {
cfg := config.Config.Bilibili
req := &WebsocketInfoReq{
RoomId: l.roomInfo.RoomId,
}
content, _ := json.Marshal(req)
header := &CommonHeader{
ContentType: JsonType,
ContentAcceptType: JsonType,
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
SignatureMethod: HmacSha256,
SignatureVersion: BiliVersion,
Authorization: "",
Nonce: strconv.FormatInt(time.Now().UnixNano(), 10),
AccessKeyId: cfg.Official.AkId,
ContentMD5: Md5(string(content)),
}
header.Authorization = CreateSignature(header, cfg.Official.AkSecret)
request, err := http.NewRequest("POST", fmt.Sprintf("%s/v1/common/websocketInfo", cfg.Official.Api), strings.NewReader(string(content)))
if err != nil {
return err
}
for k, v := range header.ToMap() {
request.Header.Add(k, v)
}
response, err := http.DefaultClient.Do(request)
if err != nil {
return err
}
res, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
var baseResp BaseResp
err = json.Unmarshal(res, &baseResp)
if err != nil {
return err
}
if baseResp.Code != 0 {
return errors.New(fmt.Sprintf("调用API失败, result: %+v", baseResp))
}
var infoData WebsocketInfoResp
err = json.Unmarshal(baseResp.Data, &infoData)
if err != nil {
return err
}
l.webSocketInfo = &infoData
logger.SLog.Debugf("获取到wss信息为: %+v", infoData)
return nil
}
func (l *LiveBilibili) Init(conn *ws.NWebsocket) (err error) {
cfg := config.Config.Bilibili
switch cfg.Type {
case config.BilibiliTypeOfficial:
if err = l.auth(conn); err != nil {
logger.SLog.Error(err)
return
}
case config.BilibiliTypeCustom:
if err = l.joinRoom(conn); err != nil {
logger.SLog.Error(err)
return
}
}
// 等待进入房间或登录成功
<-l.entered
go l.heartbeat(conn, cfg.HeartbeatInterval*time.Second)
return
}
func (l *LiveBilibili) HandlerMessage(v interface{}) {
entry, ok := v.(*WsEntry)
if !ok {
logger.SLog.Warn("读取消息错误, 数据类型不匹配 WsEntry")
return
}
// 处理消息
switch entry.operation {
case WsOpEnterRoomOrAuthReply:
go func() {
select {
case l.entered <- struct{}{}:
}
}()
case WsOpHeartbeatReply:
// TODO 心跳回复(房间人数)
case WsOpMessage:
// message with operation
if entry.protoVer == WsVerPlain {
var cmd CMD
err := json.Unmarshal(entry.data, &cmd)
if err != nil {
logger.SLog.Error("读取消息CMD错误")
return
}
// 暂时忽略CMD错误
if strings.Contains(cmd.CMD, "DANMU_MSG") {
cmd.CMD = "DANMU_MSG"
}
handler, ok := l.msgHandlerMapper[cmd.CMD]
if !ok {
logger.SLog.Debugf("未发现 %s 处理器", cmd.CMD)
return
}
handler.HandlerMessage(entry.data)
}
}
}
func (l *LiveBilibili) auth(conn *ws.NWebsocket) error {
if l.webSocketInfo == nil {
return errors.New("websocket信息未获取成功不能登录")
}
data := &WsEntry{
operation: WsOpEnterRoomOrAuth,
data: []byte(l.webSocketInfo.AuthBody),
}
l.sequenceId++
err := conn.SendBinaryMessage(data)
if err != nil {
return errors.Wrap(err, "发送AUTH消息到wss失败")
}
return nil
}
func (l *LiveBilibili) joinRoom(conn *ws.NWebsocket) error {
cfg := config.Config.Bilibili
msg := JoinRoomReq{
Platform: "web",
ProtoVer: 1,
RoomId: l.roomInfo.RoomId,
Uid: cfg.UserId,
Type: 2,
}
body, err := json.Marshal(msg)
if err != nil {
return err
}
data := &WsEntry{
protoVer: 0,
operation: WsOpEnterRoomOrAuth,
sequenceId: WsHeaderDefaultSequence,
data: body,
}
if err = conn.SendBinaryMessage(data); err != nil {
return err
}
return nil
}
func (l *LiveBilibili) heartbeat(conn *ws.NWebsocket, t time.Duration) {
hb := func(conn *ws.NWebsocket) {
//logger.SLog.Info("heartbeat !!!")
data := &WsEntry{
protoVer: 0,
operation: WsOpHeartbeat,
sequenceId: l.sequenceId,
data: nil,
}
l.sequenceId++
err := conn.SendBinaryMessage(data)
if err != nil {
return
}
}
hb(conn)
ticker := time.NewTicker(t)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hb(conn)
}
}
}