From 68461c4d133351e5441b61ff8b01bdd83d90cef2 Mon Sep 17 00:00:00 2001 From: NorthLan <6995syu@163.com> Date: Thu, 21 Apr 2022 17:57:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0mq,=E5=B0=86=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=8F=91=E9=80=81=E8=87=B3mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bilibili/config.go | 11 -- bilibili/live.go | 16 ++- bilibili/msg_handler/danmaku.go | 21 ++- bilibili/msg_handler/interact_word.go | 2 +- bilibili/msg_handler/send_gift.go | 2 +- config.yml | 4 + config/config.go | 60 +++++++-- go.mod | 1 + go.sum | 5 + live/live.go | 2 +- main.go | 32 +---- mq/consumer.go | 53 -------- mq/producer.go | 61 --------- pb/danmaku.pb.go | 179 ++++++++++++++++++++++++++ pb/danmaku.proto | 13 ++ pb/gen.bat | 1 + pkg/kafka/consumer.go | 41 ++++++ pkg/kafka/consumer_group.go | 47 +++++++ pkg/kafka/producer.go | 62 +++++++++ {logger => pkg/logger}/logger.go | 0 20 files changed, 436 insertions(+), 177 deletions(-) delete mode 100644 bilibili/config.go delete mode 100644 mq/consumer.go delete mode 100644 mq/producer.go create mode 100644 pb/danmaku.pb.go create mode 100644 pb/danmaku.proto create mode 100644 pb/gen.bat create mode 100644 pkg/kafka/consumer.go create mode 100644 pkg/kafka/consumer_group.go create mode 100644 pkg/kafka/producer.go rename {logger => pkg/logger}/logger.go (100%) diff --git a/bilibili/config.go b/bilibili/config.go deleted file mode 100644 index 8fbd0f4..0000000 --- a/bilibili/config.go +++ /dev/null @@ -1,11 +0,0 @@ -package bilibili - -import "time" - -type Config struct { - Url string // 弹幕服务器url - GetRoomUrl string // 获取房间信息url - RoomId int64 // 待连接roomId - UserId int64 // 用于连接的userId,0则随机生成 - HeartbeatInterval time.Duration // 心跳间隔 单位s -} diff --git a/bilibili/live.go b/bilibili/live.go index 3f328a4..74a2da8 100644 --- a/bilibili/live.go +++ b/bilibili/live.go @@ -5,8 +5,9 @@ import ( "fmt" "io/ioutil" "live-gateway/bilibili/msg_handler" + "live-gateway/config" "live-gateway/live" - "live-gateway/logger" + "live-gateway/pkg/logger" "live-gateway/ws" "net/http" "time" @@ -34,13 +35,14 @@ type LiveBilibili struct { entered chan struct{} // 进入房间 } -func NewLiveBilibili(config *Config) *live.Live { +func NewLiveBilibili() *live.Live { + cfg := &config.Config.Bilibili bl := &LiveBilibili{ - Url: config.Url, - GetRoomUrl: config.GetRoomUrl, - RoomId: config.RoomId, - UserId: config.UserId, - HeartbeatInterval: config.HeartbeatInterval * time.Second, + Url: cfg.Url, + GetRoomUrl: cfg.GetRoomUrl, + RoomId: cfg.RoomId, + UserId: cfg.UserId, + HeartbeatInterval: cfg.HeartbeatInterval * time.Second, msgHandlerMapper: make(map[string]MsgHandler, 6), entered: make(chan struct{}), } diff --git a/bilibili/msg_handler/danmaku.go b/bilibili/msg_handler/danmaku.go index 78a8d92..9f39954 100644 --- a/bilibili/msg_handler/danmaku.go +++ b/bilibili/msg_handler/danmaku.go @@ -2,8 +2,10 @@ package msg_handler import ( "encoding/json" - "live-gateway/logger" - "live-gateway/mq" + "live-gateway/config" + "live-gateway/pb" + "live-gateway/pkg/kafka" + "live-gateway/pkg/logger" "strconv" ) @@ -37,12 +39,13 @@ type Danmaku struct { } type DanmakuHandler struct { - producer *mq.Producer + producer *kafka.Producer } func NewDanmakuHandler() *DanmakuHandler { + cfg := config.Config.Kafka.Danmaku return &DanmakuHandler{ - producer: mq.NewProducer(), + producer: kafka.NewKafkaProducer(cfg.Addr, cfg.Topic), } } @@ -107,7 +110,15 @@ func (d *DanmakuHandler) HandlerMessage(data []byte) { } logger.SLog.Debugf("%s 说: %s", dm.Uname, dm.Content) - d.producer.SendDanmaku(strconv.Itoa(int(dm.UID)), dm.Content) + dmMsg := &pb.Danmaku{ + Platform: "bilibili", + Uid: dm.UID, + Uname: dm.Uname, + Content: dm.Content, + SendTime: dm.SendTime, + } + + _ = d.producer.SendMessageAsync(dmMsg, strconv.Itoa(int(dm.UID))) } func floatToBool(v float64) bool { diff --git a/bilibili/msg_handler/interact_word.go b/bilibili/msg_handler/interact_word.go index 0a4c5e7..d59c1a7 100644 --- a/bilibili/msg_handler/interact_word.go +++ b/bilibili/msg_handler/interact_word.go @@ -2,7 +2,7 @@ package msg_handler import ( "encoding/json" - "live-gateway/logger" + "live-gateway/pkg/logger" ) type InterActWord struct { diff --git a/bilibili/msg_handler/send_gift.go b/bilibili/msg_handler/send_gift.go index 2e0d068..9c7fb10 100644 --- a/bilibili/msg_handler/send_gift.go +++ b/bilibili/msg_handler/send_gift.go @@ -2,7 +2,7 @@ package msg_handler import ( "encoding/json" - "live-gateway/logger" + "live-gateway/pkg/logger" ) type SendGift struct { diff --git a/config.yml b/config.yml index 0f3b617..0a45a32 100644 --- a/config.yml +++ b/config.yml @@ -18,3 +18,7 @@ Log: FileMaxBackups: 30 # MaxAge: 7 # 保留7天 Compress: true # 压缩日志 +Kafka: + Danmaku: + Addr: ["127.0.0.1:9093"] + Topic: "danmaku" diff --git a/config/config.go b/config/config.go index 4207a36..837f65e 100644 --- a/config/config.go +++ b/config/config.go @@ -1,15 +1,59 @@ package config import ( - "live-gateway/bilibili" - "live-gateway/logger" + "fmt" + c "github.com/gookit/config/v2" + "github.com/gookit/config/v2/yaml" + "live-gateway/pkg/logger" + "path/filepath" + "runtime" + "time" ) -type Config struct { - Bilibili bilibili.Config - // Log 日志配置 - Log struct { - File logger.FileConfig - Console logger.ConsoleConfig +var ( + _, b, _, _ = runtime.Caller(0) + Root = filepath.Join(filepath.Dir(b), "../") +) + +var Config config + +type ( + Kafka struct { + Addr []string + Topic string + } + config struct { + Bilibili struct { + Url string // 弹幕服务器url + GetRoomUrl string // 获取房间信息url + RoomId int64 // 待连接roomId + UserId int64 // 用于连接的userId,0则随机生成 + HeartbeatInterval time.Duration // 心跳间隔 单位s + } + // Log 日志配置 + Log struct { + File logger.FileConfig + Console logger.ConsoleConfig + } + // Kafka 队列配置 + Kafka struct { + Danmaku Kafka + } + } +) + +func init() { + var err error + c.AddDriver(yaml.Driver) + + err = c.LoadFiles(Root + "/config.yml") + if err != nil { + panic(err) } + err = c.BindStruct("", &Config) + if err != nil { + panic(err) + } + + fmt.Printf("%+v\n", Config) } diff --git a/go.mod b/go.mod index dc27259..141cbb5 100644 --- a/go.mod +++ b/go.mod @@ -38,5 +38,6 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index e0e5603..61262f8 100644 --- a/go.sum +++ b/go.sum @@ -27,9 +27,11 @@ github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3a github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gookit/color v1.5.0 h1:1Opow3+BWDwqor78DcJkJCIwnkviFi+rrOANki9BUFw= @@ -193,6 +195,9 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/live/live.go b/live/live.go index d7ecdc5..5071f56 100644 --- a/live/live.go +++ b/live/live.go @@ -2,7 +2,7 @@ package live import ( "github.com/pkg/errors" - "live-gateway/logger" + "live-gateway/pkg/logger" "live-gateway/ws" ) diff --git a/main.go b/main.go index e1cd0b3..a7b9a6c 100644 --- a/main.go +++ b/main.go @@ -1,40 +1,19 @@ package main import ( - "fmt" - c "github.com/gookit/config/v2" - "github.com/gookit/config/v2/yaml" "live-gateway/bilibili" "live-gateway/config" - "live-gateway/logger" - "live-gateway/mq" + "live-gateway/pkg/logger" "sync" ) -var Config config.Config - -func init() { - var err error - - c.AddDriver(yaml.Driver) - err = c.LoadFiles("config.yml") - if err != nil { - return - } - err = c.BindStruct("", &Config) - if err != nil { - return - } - fmt.Printf("%+v\n", Config) -} - func main() { - _ = logger.InitLogger(&Config.Log.File, &Config.Log.Console) + _ = logger.InitLogger(&config.Config.Log.File, &config.Config.Log.Console) defer logger.Sync() var wg sync.WaitGroup - bLive := bilibili.NewLiveBilibili(&Config.Bilibili) + bLive := bilibili.NewLiveBilibili() wg.Add(1) go func() { if err := bLive.Serve(); err != nil { @@ -43,10 +22,5 @@ func main() { } }() - wg.Add(1) - go func() { - go mq.NewConsumer() - }() - wg.Wait() } diff --git a/mq/consumer.go b/mq/consumer.go deleted file mode 100644 index fcdc898..0000000 --- a/mq/consumer.go +++ /dev/null @@ -1,53 +0,0 @@ -package mq - -import ( - "context" - "github.com/Shopify/sarama" - "live-gateway/logger" -) - -type msgConsumerGroup struct{} - -func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error { - return nil -} -func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } - -func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - logger.SLog.Debugf("Message topic:%q partition:%d offset:%d value:%s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) - - // 标记,sarama会自动进行提交,默认间隔1秒 - sess.MarkMessage(msg, "") - } - return nil -} - -func consumerGroup() sarama.ConsumerGroupHandler { - return &msgConsumerGroup{} -} - -func NewConsumer() { - config := sarama.NewConfig() - config.Version = sarama.V3_1_0_0 - config.Consumer.Return.Errors = false - config.Consumer.Offsets.Initial = sarama.OffsetNewest - - var err error - client, err := sarama.NewClient([]string{"127.0.0.1:9093"}, config) - - cGroup, err := sarama.NewConsumerGroupFromClient("test", client) - if err != nil { - return - } - - defer cGroup.Close() - - for { - err := cGroup.Consume(context.Background(), []string{"danmaku"}, consumerGroup()) - if err != nil { - logger.SLog.Error(err.Error()) - continue - } - } -} diff --git a/mq/producer.go b/mq/producer.go deleted file mode 100644 index b9507d1..0000000 --- a/mq/producer.go +++ /dev/null @@ -1,61 +0,0 @@ -package mq - -import ( - "github.com/Shopify/sarama" - "live-gateway/logger" -) - -type Producer struct { - producer sarama.SyncProducer - asyncProducer sarama.AsyncProducer -} - -func NewProducer() *Producer { - result := &Producer{} - - config := sarama.NewConfig() - config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有follower都回复ack,确保kafka不丢消息 - config.Producer.Return.Successes = true - config.Producer.Partitioner = sarama.NewHashPartitioner // 对key进行hash,同样的key落到同样的partition,保证个人消息有序性 - - var err error - client, err := sarama.NewClient([]string{"127.0.0.1:9093"}, config) - if err != nil { - return nil - } - - // 暂时使用同步producer - - result.producer, err = sarama.NewSyncProducerFromClient(client) - if err != nil { - return nil - } - - result.asyncProducer, err = sarama.NewAsyncProducerFromClient(client) - if err != nil { - return nil - } - - return result -} - -func (p *Producer) SendDanmaku(uid, msg string) { - p.SendMessageSync("danmaku", uid, msg) -} - -func (p *Producer) SendMessageAsync(topic, uid string, msg string) { - p.asyncProducer.Input() -} - -func (p *Producer) SendMessageSync(topic, uid string, msg string) { - partition, offset, err := p.producer.SendMessage(&sarama.ProducerMessage{ - Topic: topic, - Key: sarama.StringEncoder(uid), - Value: sarama.StringEncoder(msg), - }) - if err != nil { - logger.SLog.Error("err", err) - return - } - logger.SLog.Debug("success partition:", partition, "offset", offset) -} diff --git a/pb/danmaku.pb.go b/pb/danmaku.pb.go new file mode 100644 index 0000000..d8887c4 --- /dev/null +++ b/pb/danmaku.pb.go @@ -0,0 +1,179 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: danmaku.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Danmaku struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` + Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` + SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"` +} + +func (x *Danmaku) Reset() { + *x = Danmaku{} + if protoimpl.UnsafeEnabled { + mi := &file_danmaku_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Danmaku) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Danmaku) ProtoMessage() {} + +func (x *Danmaku) ProtoReflect() protoreflect.Message { + mi := &file_danmaku_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Danmaku.ProtoReflect.Descriptor instead. +func (*Danmaku) Descriptor() ([]byte, []int) { + return file_danmaku_proto_rawDescGZIP(), []int{0} +} + +func (x *Danmaku) GetPlatform() string { + if x != nil { + return x.Platform + } + return "" +} + +func (x *Danmaku) GetUid() int64 { + if x != nil { + return x.Uid + } + return 0 +} + +func (x *Danmaku) GetUname() string { + if x != nil { + return x.Uname + } + return "" +} + +func (x *Danmaku) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +func (x *Danmaku) GetSendTime() int64 { + if x != nil { + return x.SendTime + } + return 0 +} + +var File_danmaku_proto protoreflect.FileDescriptor + +var file_danmaku_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x64, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x02, 0x70, 0x62, 0x22, 0x83, 0x01, 0x0a, 0x07, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x12, + 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x75, + 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, + 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, + 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, + 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_danmaku_proto_rawDescOnce sync.Once + file_danmaku_proto_rawDescData = file_danmaku_proto_rawDesc +) + +func file_danmaku_proto_rawDescGZIP() []byte { + file_danmaku_proto_rawDescOnce.Do(func() { + file_danmaku_proto_rawDescData = protoimpl.X.CompressGZIP(file_danmaku_proto_rawDescData) + }) + return file_danmaku_proto_rawDescData +} + +var file_danmaku_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_danmaku_proto_goTypes = []interface{}{ + (*Danmaku)(nil), // 0: pb.Danmaku +} +var file_danmaku_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_danmaku_proto_init() } +func file_danmaku_proto_init() { + if File_danmaku_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_danmaku_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Danmaku); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_danmaku_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_danmaku_proto_goTypes, + DependencyIndexes: file_danmaku_proto_depIdxs, + MessageInfos: file_danmaku_proto_msgTypes, + }.Build() + File_danmaku_proto = out.File + file_danmaku_proto_rawDesc = nil + file_danmaku_proto_goTypes = nil + file_danmaku_proto_depIdxs = nil +} diff --git a/pb/danmaku.proto b/pb/danmaku.proto new file mode 100644 index 0000000..0ad4ec5 --- /dev/null +++ b/pb/danmaku.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package pb; + +option go_package = "/pb"; + +message Danmaku { + string platform = 1; + int64 uid = 2; + string uname = 3; + string content = 4; + int64 sendTime = 5; +} \ No newline at end of file diff --git a/pb/gen.bat b/pb/gen.bat new file mode 100644 index 0000000..a9eb2ba --- /dev/null +++ b/pb/gen.bat @@ -0,0 +1 @@ +protoc --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_opt=require_unimplemented_servers=false --go_out=. --go-grpc_out=. --proto_path=. *.proto \ No newline at end of file diff --git a/pkg/kafka/consumer.go b/pkg/kafka/consumer.go new file mode 100644 index 0000000..d108021 --- /dev/null +++ b/pkg/kafka/consumer.go @@ -0,0 +1,41 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "live-gateway/pkg/logger" +) + +type Consumer struct { + client sarama.Client + topic string + consumer sarama.Consumer + partitions []int32 +} + +func NewKafkaConsumer(addr []string, topic string) (*Consumer, error) { + p := Consumer{} + p.topic = topic + + config := sarama.NewConfig() + config.Version = sarama.V3_1_0_0 + config.Consumer.Offsets.Initial = sarama.OffsetNewest + + var err error + p.client, err = sarama.NewClient(addr, config) + if err != nil { + logger.SLog.Error("new kafka client err:", err) + return nil, err + } + + p.consumer, err = sarama.NewConsumerFromClient(p.client) + if err != nil { + logger.SLog.Error("new kafka consumer err:", err) + return nil, err + } + p.partitions, err = p.consumer.Partitions(topic) + if err != nil { + logger.SLog.Errorf("get partitions for topic %s err", topic) + return nil, err + } + return &p, nil +} diff --git a/pkg/kafka/consumer_group.go b/pkg/kafka/consumer_group.go new file mode 100644 index 0000000..7a4f4e1 --- /dev/null +++ b/pkg/kafka/consumer_group.go @@ -0,0 +1,47 @@ +package kafka + +import ( + "context" + "github.com/Shopify/sarama" + "live-gateway/pkg/logger" +) + +type ConsumerGroup struct { + sarama.ConsumerGroup + groupId string + topics []string +} + +type ConsumerGroupConfig struct { + KafkaVersion sarama.KafkaVersion + OffsetsInitial int64 + IsReturnErr bool +} + +func NewConsumerGroup(config *ConsumerGroupConfig, addr, topics []string, groupId string) (*ConsumerGroup, error) { + c := sarama.NewConfig() + c.Version = config.KafkaVersion + c.Consumer.Offsets.Initial = config.OffsetsInitial + c.Consumer.Return.Errors = config.IsReturnErr + + client, err := sarama.NewClient(addr, c) + if err != nil { + return nil, err + } + + consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client) + if err != nil { + return nil, err + } + return &ConsumerGroup{consumerGroup, groupId, topics}, nil +} + +func (cg *ConsumerGroup) RegisterHandlerAndConsumer(handler sarama.ConsumerGroupHandler) { + ctx := context.Background() + for { + err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler) + if err != nil { + logger.SLog.Error("RegisterHandlerAndConsumer error: ", err) + } + } +} diff --git a/pkg/kafka/producer.go b/pkg/kafka/producer.go new file mode 100644 index 0000000..08b9c54 --- /dev/null +++ b/pkg/kafka/producer.go @@ -0,0 +1,62 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "google.golang.org/protobuf/proto" + "live-gateway/pkg/logger" +) + +type Producer struct { + topic string + client sarama.Client + producer sarama.AsyncProducer +} + +func NewKafkaProducer(addr []string, topic string) *Producer { + p := Producer{} + + config := sarama.NewConfig() //Instantiate a sarama Config + config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all + config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly + + p.topic = topic + + var err error + p.client, err = sarama.NewClient(addr, config) + if err != nil { + logger.SLog.Error("new kafka client err:", err) + return &p + } + p.producer, err = sarama.NewAsyncProducerFromClient(p.client) + if err != nil { + logger.SLog.Error("new kafka producer err:", err) + return &p + } + + go func() { + for range p.producer.Successes() { + } + }() + + return &p +} + +func (p *Producer) SendMessageAsync(m proto.Message, key ...string) error { + kMsg := &sarama.ProducerMessage{} + kMsg.Topic = p.topic + if len(key) > 0 { + kMsg.Key = sarama.StringEncoder(key[0]) + } + bMsg, err := proto.Marshal(m) + if err != nil { + logger.SLog.Error("proto marshal err:", err) + return err + } + kMsg.Value = sarama.ByteEncoder(bMsg) + + select { + case p.producer.Input() <- kMsg: + } + return nil +} diff --git a/logger/logger.go b/pkg/logger/logger.go similarity index 100% rename from logger/logger.go rename to pkg/logger/logger.go