You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
4.4 KiB
Go

package bilibili
import (
"encoding/json"
"fmt"
"git.noahlan.cn/northlan/ntools-go/logger"
"io/ioutil"
"live-gateway/bilibili/msg_handler"
"live-gateway/config"
"live-gateway/live"
"live-gateway/ws"
"net/http"
"time"
)
type MsgHandler interface {
HandlerMessage(data []byte)
CMD() string
}
// 实现 live.Handler 接口
var _ live.Handler = (*LiveBilibili)(nil)
type LiveBilibili struct {
Url string
GetRoomUrl string
RoomId int64
UserId int64
HeartbeatInterval time.Duration // 心跳间
RoomInfo *RoomInfo
Live *live.Live
msgHandlerMapper map[string]MsgHandler
entered chan struct{} // 进入房间
}
func NewLiveBilibili() *live.Live {
cfg := &config.Config.Bilibili
bl := &LiveBilibili{
Url: cfg.Url,
GetRoomUrl: cfg.GetRoomUrl,
RoomId: cfg.RoomId,
UserId: cfg.UserId,
HeartbeatInterval: cfg.HeartbeatInterval * time.Second,
msgHandlerMapper: make(map[string]MsgHandler, 6),
entered: make(chan struct{}),
}
l := live.NewLive(
live.WithWsOptions(
ws.WithPacker(NewPackBilibili()),
),
)
l.Init(bl.Init)
l.PreConnect(bl.PreConnect)
l.Handler(bl)
bl.Live = l
return l
}
func (l *LiveBilibili) AddMessageHandler(h ...MsgHandler) {
for _, handler := range h {
l.msgHandlerMapper[handler.CMD()] = handler
}
}
func (l *LiveBilibili) PreConnect() (url string, err error) {
url = l.Url
// 避免重复请求接口
if l.RoomInfo != nil &&
(l.RoomId == l.RoomInfo.RoomId || l.RoomId == l.RoomInfo.ShortId) {
return
}
fetchUrl := fmt.Sprintf(`%s%d`, l.GetRoomUrl, l.RoomId)
response, err := http.Get(fetchUrl)
if err != nil {
return "", err
}
res, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
var getRoomInfoResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Message string `json:"message"`
Data *RoomInfo `json:"data"`
}
err = json.Unmarshal(res, &getRoomInfoResp)
if err != nil {
return "", err
}
l.RoomInfo = getRoomInfoResp.Data
logger.SLog.Infof("获取房间数据: %+v\n", l.RoomInfo)
//log.With("获取房间数据", l.RoomInfo)
// 获取房间信息后再添加消息处理器 是因为需要房间号
l.AddMessageHandler(
msg_handler.NewDanmakuHandler(l.RoomInfo.RoomId),
msg_handler.NewSendGiftHandler(l.RoomInfo.RoomId),
msg_handler.NewGuardBuyHandler(l.RoomInfo.RoomId),
//&msg_handler.InterActWordHandler{},
)
return
}
func (l *LiveBilibili) Init(conn *ws.NWebsocket) (err error) {
if err = l.joinRoom(conn); err != nil {
logger.SLog.Error(err)
return
}
// 等待进入房间
<-l.entered
go l.heartbeat(conn, l.HeartbeatInterval)
return
}
func (l *LiveBilibili) HandlerMessage(v interface{}) {
entry, ok := v.(*WsEntry)
if !ok {
logger.SLog.Warn("读取消息错误, 数据类型不匹配 WsEntry")
return
}
// 处理消息
switch entry.operation {
case WsOpEnterRoomSuccess:
go func() {
select {
case l.entered <- struct{}{}:
}
}()
case WsOpHeartbeatReply:
// TODO 心跳回复(房间人数)
case WsOpMessage:
// message with operation
if entry.protoVer == WsVerPlain {
var cmd CMD
err := json.Unmarshal(entry.data, &cmd)
if err != nil {
logger.SLog.Error("读取消息CMD错误")
return
}
handler, ok := l.msgHandlerMapper[cmd.CMD]
if !ok {
logger.SLog.Debugf("未发现 %s 处理器", cmd.CMD)
return
}
handler.HandlerMessage(entry.data)
}
}
}
func (l *LiveBilibili) joinRoom(conn *ws.NWebsocket) error {
msg := JoinRoomReq{
Platform: "web",
ProtoVer: 1,
RoomId: l.RoomInfo.RoomId,
Uid: l.UserId,
Type: 2,
}
body, err := json.Marshal(msg)
if err != nil {
return err
}
data := &WsEntry{
protoVer: 0,
operation: WsOpEnterRoom,
sequenceId: WsHeaderDefaultSequence,
data: body,
}
if err = conn.SendBinaryMessage(data); err != nil {
return err
}
return nil
}
func (l *LiveBilibili) heartbeat(conn *ws.NWebsocket, t time.Duration) {
hb := func(conn *ws.NWebsocket) {
//logger.SLog.Info("heartbeat !!!")
data := &WsEntry{
protoVer: 0,
operation: WsOpHeartbeat,
sequenceId: WsHeaderDefaultSequence,
data: nil,
}
err := conn.SendBinaryMessage(data)
if err != nil {
return
}
}
hb(conn)
ticker := time.NewTicker(t)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hb(conn)
}
}
}