From 1a724603444cd699475260e59e120071e09eea96 Mon Sep 17 00:00:00 2001 From: NorthLan <6995syu@163.com> Date: Fri, 22 Apr 2022 20:50:36 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BF=AE=E6=94=B9=E9=87=8D?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bilibili/live.go | 10 +- bilibili/msg_handler/danmaku.go | 2 +- bilibili/msg_handler/send_gift.go | 27 ++- config.yml | 9 +- config/config.go | 12 +- main.go | 7 + pb/danmaku.pb.go | 179 ------------------ pb/danmaku.proto | 13 -- pb/mq.pb.go | 299 ++++++++++++++++++++++++++++++ pb/mq.proto | 23 +++ pkg/logger/logger.go | 45 +---- ws/connection.go | 2 +- 12 files changed, 373 insertions(+), 255 deletions(-) delete mode 100644 pb/danmaku.pb.go delete mode 100644 pb/danmaku.proto create mode 100644 pb/mq.pb.go create mode 100644 pb/mq.proto diff --git a/bilibili/live.go b/bilibili/live.go index 74a2da8..b7a7103 100644 --- a/bilibili/live.go +++ b/bilibili/live.go @@ -49,6 +49,7 @@ func NewLiveBilibili() *live.Live { // 内置处理器 bl.AddMessageHandler( msg_handler.NewDanmakuHandler(), + msg_handler.NewSendGiftHandler(), //&msg_handler.InterActWordHandler{}, //&msg_handler.SendGiftHandler{}, ) @@ -100,13 +101,14 @@ func (l *LiveBilibili) PreConnect() (url string, err error) { return "", err } l.RoomInfo = getRoomInfoResp.Data - //log.Debugf("获取房间数据: %+v\n", l.RoomInfo) + logger.SLog.Infof("获取房间数据: %+v\n", l.RoomInfo) //log.With("获取房间数据", l.RoomInfo) return } func (l *LiveBilibili) Init(conn *ws.NWebsocket) (err error) { if err = l.joinRoom(conn); err != nil { + logger.SLog.Error(err) return } @@ -144,7 +146,7 @@ func (l *LiveBilibili) HandlerMessage(v interface{}) { } handler, ok := l.msgHandlerMapper[cmd.CMD] if !ok { - logger.SLog.Warnf("未发现 %s 处理器", cmd.CMD) + logger.SLog.Debugf("未发现 %s 处理器", cmd.CMD) return } handler.HandlerMessage(entry.data) @@ -159,7 +161,6 @@ func (l *LiveBilibili) joinRoom(conn *ws.NWebsocket) error { RoomId: l.RoomInfo.RoomId, Uid: l.UserId, Type: 2, - Key: "", } body, err := json.Marshal(msg) if err != nil { @@ -172,7 +173,6 @@ func (l *LiveBilibili) joinRoom(conn *ws.NWebsocket) error { sequenceId: WsHeaderDefaultSequence, data: body, } - if err = conn.SendBinaryMessage(data); err != nil { return err } @@ -181,7 +181,7 @@ func (l *LiveBilibili) joinRoom(conn *ws.NWebsocket) error { func (l *LiveBilibili) heartbeat(conn *ws.NWebsocket, t time.Duration) { hb := func(conn *ws.NWebsocket) { - logger.SLog.Debug("heartbeat !!!") + //logger.SLog.Info("heartbeat !!!") data := &WsEntry{ protoVer: 0, operation: WsOpHeartbeat, diff --git a/bilibili/msg_handler/danmaku.go b/bilibili/msg_handler/danmaku.go index 9f39954..82b0f4b 100644 --- a/bilibili/msg_handler/danmaku.go +++ b/bilibili/msg_handler/danmaku.go @@ -110,7 +110,7 @@ func (d *DanmakuHandler) HandlerMessage(data []byte) { } logger.SLog.Debugf("%s 说: %s", dm.Uname, dm.Content) - dmMsg := &pb.Danmaku{ + dmMsg := &pbMq.MqDanmaku{ Platform: "bilibili", Uid: dm.UID, Uname: dm.Uname, diff --git a/bilibili/msg_handler/send_gift.go b/bilibili/msg_handler/send_gift.go index 9c7fb10..b66a07c 100644 --- a/bilibili/msg_handler/send_gift.go +++ b/bilibili/msg_handler/send_gift.go @@ -2,7 +2,10 @@ package msg_handler import ( "encoding/json" - "live-gateway/pkg/logger" + "live-gateway/config" + "live-gateway/pb" + "live-gateway/pkg/kafka" + "strconv" ) type SendGift struct { @@ -57,6 +60,14 @@ type SendGift struct { } type SendGiftHandler struct { + producer *kafka.Producer +} + +func NewSendGiftHandler() *SendGiftHandler { + cfg := config.Config.Kafka.Gift + return &SendGiftHandler{ + producer: kafka.NewKafkaProducer(cfg.Addr, cfg.Topic), + } } func (h *SendGiftHandler) CMD() string { @@ -72,5 +83,17 @@ func (h *SendGiftHandler) HandlerMessage(data []byte) { return } - logger.SLog.Debugf("%s %s礼物 %s x%d", baseMsg.Data.Uname, baseMsg.Data.Action, baseMsg.Data.GiftName, baseMsg.Data.Num) + //logger.SLog.Infof("%s %s礼物 %s x%d", baseMsg.Data.Uname, baseMsg.Data.Action, baseMsg.Data.GiftName, baseMsg.Data.Num) + + dmMsg := &pbMq.MqGift{ + Platform: "bilibili", + Uid: int64(baseMsg.Data.Uid), + Uname: baseMsg.Data.Uname, + GiftId: int32(baseMsg.Data.GiftId), + GiftName: baseMsg.Data.GiftName, + TotalCoin: int64(baseMsg.Data.TotalCoin), + SendTime: int64(baseMsg.Data.Timestamp), + } + + _ = h.producer.SendMessageAsync(dmMsg, strconv.Itoa(int(dmMsg.Uid))) } diff --git a/config.yml b/config.yml index 0a45a32..0ad7f75 100644 --- a/config.yml +++ b/config.yml @@ -2,11 +2,11 @@ Bilibili: Url: wss://broadcastlv.chat.bilibili.com:2245/sub GetRoomUrl: https://api.live.bilibili.com/room/v1/Room/room_init?id= HeartbeatInterval: 30 - UserId: 7041246 - RoomId: 8722013 + UserId: 111222 + RoomId: 6925399 Log: Console: - Level: debug + Level: info Format: console File: Enabled: false @@ -22,3 +22,6 @@ Kafka: Danmaku: Addr: ["127.0.0.1:9093"] Topic: "danmaku" + Gift: + Addr: [ "127.0.0.1:9093" ] + Topic: "gift" diff --git a/config/config.go b/config/config.go index 837f65e..841d39a 100644 --- a/config/config.go +++ b/config/config.go @@ -5,16 +5,9 @@ import ( c "github.com/gookit/config/v2" "github.com/gookit/config/v2/yaml" "live-gateway/pkg/logger" - "path/filepath" - "runtime" "time" ) -var ( - _, b, _, _ = runtime.Caller(0) - Root = filepath.Join(filepath.Dir(b), "../") -) - var Config config type ( @@ -38,15 +31,16 @@ type ( // Kafka 队列配置 Kafka struct { Danmaku Kafka + Gift Kafka } } ) -func init() { +func Init(filepath string) { var err error c.AddDriver(yaml.Driver) - err = c.LoadFiles(Root + "/config.yml") + err = c.LoadFiles(filepath) if err != nil { panic(err) } diff --git a/main.go b/main.go index a7b9a6c..760c5c0 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,20 @@ package main import ( + "flag" "live-gateway/bilibili" "live-gateway/config" "live-gateway/pkg/logger" "sync" ) +var configFile = flag.String("f", "./config.yml", "the config file") + func main() { + flag.Parse() + + config.Init(*configFile) + _ = logger.InitLogger(&config.Config.Log.File, &config.Config.Log.Console) defer logger.Sync() diff --git a/pb/danmaku.pb.go b/pb/danmaku.pb.go deleted file mode 100644 index d8887c4..0000000 --- a/pb/danmaku.pb.go +++ /dev/null @@ -1,179 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.0 -// protoc v3.19.4 -// source: danmaku.proto - -package pb - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Danmaku struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` - Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` - Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` - Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` - SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"` -} - -func (x *Danmaku) Reset() { - *x = Danmaku{} - if protoimpl.UnsafeEnabled { - mi := &file_danmaku_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Danmaku) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Danmaku) ProtoMessage() {} - -func (x *Danmaku) ProtoReflect() protoreflect.Message { - mi := &file_danmaku_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Danmaku.ProtoReflect.Descriptor instead. -func (*Danmaku) Descriptor() ([]byte, []int) { - return file_danmaku_proto_rawDescGZIP(), []int{0} -} - -func (x *Danmaku) GetPlatform() string { - if x != nil { - return x.Platform - } - return "" -} - -func (x *Danmaku) GetUid() int64 { - if x != nil { - return x.Uid - } - return 0 -} - -func (x *Danmaku) GetUname() string { - if x != nil { - return x.Uname - } - return "" -} - -func (x *Danmaku) GetContent() string { - if x != nil { - return x.Content - } - return "" -} - -func (x *Danmaku) GetSendTime() int64 { - if x != nil { - return x.SendTime - } - return 0 -} - -var File_danmaku_proto protoreflect.FileDescriptor - -var file_danmaku_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x64, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x02, 0x70, 0x62, 0x22, 0x83, 0x01, 0x0a, 0x07, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x12, - 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x75, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, - 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, - 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, - 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_danmaku_proto_rawDescOnce sync.Once - file_danmaku_proto_rawDescData = file_danmaku_proto_rawDesc -) - -func file_danmaku_proto_rawDescGZIP() []byte { - file_danmaku_proto_rawDescOnce.Do(func() { - file_danmaku_proto_rawDescData = protoimpl.X.CompressGZIP(file_danmaku_proto_rawDescData) - }) - return file_danmaku_proto_rawDescData -} - -var file_danmaku_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_danmaku_proto_goTypes = []interface{}{ - (*Danmaku)(nil), // 0: pb.Danmaku -} -var file_danmaku_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_danmaku_proto_init() } -func file_danmaku_proto_init() { - if File_danmaku_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_danmaku_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Danmaku); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_danmaku_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_danmaku_proto_goTypes, - DependencyIndexes: file_danmaku_proto_depIdxs, - MessageInfos: file_danmaku_proto_msgTypes, - }.Build() - File_danmaku_proto = out.File - file_danmaku_proto_rawDesc = nil - file_danmaku_proto_goTypes = nil - file_danmaku_proto_depIdxs = nil -} diff --git a/pb/danmaku.proto b/pb/danmaku.proto deleted file mode 100644 index 0ad4ec5..0000000 --- a/pb/danmaku.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -package pb; - -option go_package = "/pb"; - -message Danmaku { - string platform = 1; - int64 uid = 2; - string uname = 3; - string content = 4; - int64 sendTime = 5; -} \ No newline at end of file diff --git a/pb/mq.pb.go b/pb/mq.pb.go new file mode 100644 index 0000000..20332ed --- /dev/null +++ b/pb/mq.pb.go @@ -0,0 +1,299 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: mq.proto + +package pbMq + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MqDanmaku struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` + Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` + SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"` +} + +func (x *MqDanmaku) Reset() { + *x = MqDanmaku{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MqDanmaku) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MqDanmaku) ProtoMessage() {} + +func (x *MqDanmaku) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MqDanmaku.ProtoReflect.Descriptor instead. +func (*MqDanmaku) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{0} +} + +func (x *MqDanmaku) GetPlatform() string { + if x != nil { + return x.Platform + } + return "" +} + +func (x *MqDanmaku) GetUid() int64 { + if x != nil { + return x.Uid + } + return 0 +} + +func (x *MqDanmaku) GetUname() string { + if x != nil { + return x.Uname + } + return "" +} + +func (x *MqDanmaku) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +func (x *MqDanmaku) GetSendTime() int64 { + if x != nil { + return x.SendTime + } + return 0 +} + +type MqGift struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` + GiftId int32 `protobuf:"varint,4,opt,name=giftId,proto3" json:"giftId,omitempty"` + GiftName string `protobuf:"bytes,5,opt,name=giftName,proto3" json:"giftName,omitempty"` + TotalCoin int64 `protobuf:"varint,6,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` + SendTime int64 `protobuf:"varint,7,opt,name=sendTime,proto3" json:"sendTime,omitempty"` +} + +func (x *MqGift) Reset() { + *x = MqGift{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MqGift) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MqGift) ProtoMessage() {} + +func (x *MqGift) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MqGift.ProtoReflect.Descriptor instead. +func (*MqGift) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{1} +} + +func (x *MqGift) GetPlatform() string { + if x != nil { + return x.Platform + } + return "" +} + +func (x *MqGift) GetUid() int64 { + if x != nil { + return x.Uid + } + return 0 +} + +func (x *MqGift) GetUname() string { + if x != nil { + return x.Uname + } + return "" +} + +func (x *MqGift) GetGiftId() int32 { + if x != nil { + return x.GiftId + } + return 0 +} + +func (x *MqGift) GetGiftName() string { + if x != nil { + return x.GiftName + } + return "" +} + +func (x *MqGift) GetTotalCoin() int64 { + if x != nil { + return x.TotalCoin + } + return 0 +} + +func (x *MqGift) GetSendTime() int64 { + if x != nil { + return x.SendTime + } + return 0 +} + +var File_mq_proto protoreflect.FileDescriptor + +var file_mq_proto_rawDesc = []byte{ + 0x0a, 0x08, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x85, + 0x01, 0x0a, 0x09, 0x4d, 0x71, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x12, 0x1a, 0x0a, 0x08, + 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, + 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, + 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xba, 0x01, 0x0a, 0x06, 0x4d, 0x71, 0x47, 0x69, 0x66, + 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, + 0x03, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, + 0x14, 0x0a, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, + 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x6f, 0x74, + 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, + 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, + 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, + 0x69, 0x6d, 0x65, 0x42, 0x07, 0x5a, 0x05, 0x2f, 0x70, 0x62, 0x4d, 0x71, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_mq_proto_rawDescOnce sync.Once + file_mq_proto_rawDescData = file_mq_proto_rawDesc +) + +func file_mq_proto_rawDescGZIP() []byte { + file_mq_proto_rawDescOnce.Do(func() { + file_mq_proto_rawDescData = protoimpl.X.CompressGZIP(file_mq_proto_rawDescData) + }) + return file_mq_proto_rawDescData +} + +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_mq_proto_goTypes = []interface{}{ + (*MqDanmaku)(nil), // 0: pb.MqDanmaku + (*MqGift)(nil), // 1: pb.MqGift +} +var file_mq_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_mq_proto_init() } +func file_mq_proto_init() { + if File_mq_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_mq_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MqDanmaku); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MqGift); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_mq_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_mq_proto_goTypes, + DependencyIndexes: file_mq_proto_depIdxs, + MessageInfos: file_mq_proto_msgTypes, + }.Build() + File_mq_proto = out.File + file_mq_proto_rawDesc = nil + file_mq_proto_goTypes = nil + file_mq_proto_depIdxs = nil +} diff --git a/pb/mq.proto b/pb/mq.proto new file mode 100644 index 0000000..769c0b6 --- /dev/null +++ b/pb/mq.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package pb; + +option go_package = "/pbMq"; + +message MqDanmaku { + string platform = 1; + int64 uid = 2; + string uname = 3; + string content = 4; + int64 sendTime = 5; +} + +message MqGift { + string platform = 1; + int64 uid = 2; + string uname = 3; + int32 giftId = 4; + string giftName = 5; + int64 totalCoin = 6; + int64 sendTime = 7; +} \ No newline at end of file diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 5988bd3..2cee6cf 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -5,16 +5,13 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "os" - "path" "path/filepath" - "runtime" - "strings" ) var Log *zap.Logger var SLog *zap.SugaredLogger -const DefaultLogPath = "/logs" +const DefaultLogPath = "./logs" type ( // FileConfig 日志文件配置 @@ -92,11 +89,11 @@ func getLogLevel(level string) zapcore.Level { func getLogWriter(conf *FileConfig) (zapcore.WriteSyncer, error) { // 判断日志路径是否存在,如果不存在就创建 if conf.Path == "" { - conf.Path = getCurrentAbPath() + DefaultLogPath + conf.Path = DefaultLogPath } if exist := isExist(conf.Path); !exist { if err := os.MkdirAll(conf.Path, os.ModePerm); err != nil { - conf.Path = getCurrentAbPath() + DefaultLogPath + conf.Path = DefaultLogPath if err := os.MkdirAll(conf.Path, os.ModePerm); err != nil { return nil, err } @@ -142,39 +139,3 @@ func isExist(path string) bool { _, err := os.Stat(path) return err == nil || os.IsExist(err) } - -// 最终方案-全兼容 -func getCurrentAbPath() string { - dir := getCurrentAbPathByExecutable() - if strings.Contains(dir, getTmpDir()) { - return getCurrentAbPathByCaller() - } - return dir -} - -// 获取系统临时目录,兼容go run -func getTmpDir() string { - dir := os.Getenv("TEMP") - if dir == "" { - dir = os.Getenv("TMP") - } - res, _ := filepath.EvalSymlinks(dir) - return res -} - -// 获取当前执行文件绝对路径 -func getCurrentAbPathByExecutable() string { - exePath, _ := os.Executable() - res, _ := filepath.EvalSymlinks(filepath.Dir(exePath)) - return res -} - -// 获取当前执行文件绝对路径(go run) -func getCurrentAbPathByCaller() string { - var abPath string - _, filename, _, ok := runtime.Caller(0) - if ok { - abPath = path.Dir(filename) - } - return abPath -} diff --git a/ws/connection.go b/ws/connection.go index e1ea349..ec9b345 100644 --- a/ws/connection.go +++ b/ws/connection.go @@ -101,7 +101,7 @@ func applyOpts(opts ...ConnectionOption) *ConnectionOptions { SendBufferSize: 512, ReadLimit: 8 << 10 << 10, WriteDeadline: 5 * time.Second, - ReadDeadline: 5 * time.Second, + ReadDeadline: 60 * time.Second, }, } for _, opt := range opts {