feat: 添加mq,将消息发送至mq

main
NorthLan 3 years ago
parent 309bacad83
commit 68461c4d13

@ -1,11 +0,0 @@
package bilibili
import "time"
type Config struct {
Url string // 弹幕服务器url
GetRoomUrl string // 获取房间信息url
RoomId int64 // 待连接roomId
UserId int64 // 用于连接的userId,0则随机生成
HeartbeatInterval time.Duration // 心跳间隔 单位s
}

@ -5,8 +5,9 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"live-gateway/bilibili/msg_handler" "live-gateway/bilibili/msg_handler"
"live-gateway/config"
"live-gateway/live" "live-gateway/live"
"live-gateway/logger" "live-gateway/pkg/logger"
"live-gateway/ws" "live-gateway/ws"
"net/http" "net/http"
"time" "time"
@ -34,13 +35,14 @@ type LiveBilibili struct {
entered chan struct{} // 进入房间 entered chan struct{} // 进入房间
} }
func NewLiveBilibili(config *Config) *live.Live { func NewLiveBilibili() *live.Live {
cfg := &config.Config.Bilibili
bl := &LiveBilibili{ bl := &LiveBilibili{
Url: config.Url, Url: cfg.Url,
GetRoomUrl: config.GetRoomUrl, GetRoomUrl: cfg.GetRoomUrl,
RoomId: config.RoomId, RoomId: cfg.RoomId,
UserId: config.UserId, UserId: cfg.UserId,
HeartbeatInterval: config.HeartbeatInterval * time.Second, HeartbeatInterval: cfg.HeartbeatInterval * time.Second,
msgHandlerMapper: make(map[string]MsgHandler, 6), msgHandlerMapper: make(map[string]MsgHandler, 6),
entered: make(chan struct{}), entered: make(chan struct{}),
} }

@ -2,8 +2,10 @@ package msg_handler
import ( import (
"encoding/json" "encoding/json"
"live-gateway/logger" "live-gateway/config"
"live-gateway/mq" "live-gateway/pb"
"live-gateway/pkg/kafka"
"live-gateway/pkg/logger"
"strconv" "strconv"
) )
@ -37,12 +39,13 @@ type Danmaku struct {
} }
type DanmakuHandler struct { type DanmakuHandler struct {
producer *mq.Producer producer *kafka.Producer
} }
func NewDanmakuHandler() *DanmakuHandler { func NewDanmakuHandler() *DanmakuHandler {
cfg := config.Config.Kafka.Danmaku
return &DanmakuHandler{ return &DanmakuHandler{
producer: mq.NewProducer(), producer: kafka.NewKafkaProducer(cfg.Addr, cfg.Topic),
} }
} }
@ -107,7 +110,15 @@ func (d *DanmakuHandler) HandlerMessage(data []byte) {
} }
logger.SLog.Debugf("%s 说: %s", dm.Uname, dm.Content) logger.SLog.Debugf("%s 说: %s", dm.Uname, dm.Content)
d.producer.SendDanmaku(strconv.Itoa(int(dm.UID)), dm.Content) dmMsg := &pb.Danmaku{
Platform: "bilibili",
Uid: dm.UID,
Uname: dm.Uname,
Content: dm.Content,
SendTime: dm.SendTime,
}
_ = d.producer.SendMessageAsync(dmMsg, strconv.Itoa(int(dm.UID)))
} }
func floatToBool(v float64) bool { func floatToBool(v float64) bool {

@ -2,7 +2,7 @@ package msg_handler
import ( import (
"encoding/json" "encoding/json"
"live-gateway/logger" "live-gateway/pkg/logger"
) )
type InterActWord struct { type InterActWord struct {

@ -2,7 +2,7 @@ package msg_handler
import ( import (
"encoding/json" "encoding/json"
"live-gateway/logger" "live-gateway/pkg/logger"
) )
type SendGift struct { type SendGift struct {

@ -18,3 +18,7 @@ Log:
FileMaxBackups: 30 # FileMaxBackups: 30 #
MaxAge: 7 # 保留7天 MaxAge: 7 # 保留7天
Compress: true # 压缩日志 Compress: true # 压缩日志
Kafka:
Danmaku:
Addr: ["127.0.0.1:9093"]
Topic: "danmaku"

@ -1,15 +1,59 @@
package config package config
import ( import (
"live-gateway/bilibili" "fmt"
"live-gateway/logger" c "github.com/gookit/config/v2"
"github.com/gookit/config/v2/yaml"
"live-gateway/pkg/logger"
"path/filepath"
"runtime"
"time"
) )
type Config struct { var (
Bilibili bilibili.Config _, b, _, _ = runtime.Caller(0)
Root = filepath.Join(filepath.Dir(b), "../")
)
var Config config
type (
Kafka struct {
Addr []string
Topic string
}
config struct {
Bilibili struct {
Url string // 弹幕服务器url
GetRoomUrl string // 获取房间信息url
RoomId int64 // 待连接roomId
UserId int64 // 用于连接的userId,0则随机生成
HeartbeatInterval time.Duration // 心跳间隔 单位s
}
// Log 日志配置 // Log 日志配置
Log struct { Log struct {
File logger.FileConfig File logger.FileConfig
Console logger.ConsoleConfig Console logger.ConsoleConfig
} }
// Kafka 队列配置
Kafka struct {
Danmaku Kafka
}
}
)
func init() {
var err error
c.AddDriver(yaml.Driver)
err = c.LoadFiles(Root + "/config.yml")
if err != nil {
panic(err)
}
err = c.BindStruct("", &Config)
if err != nil {
panic(err)
}
fmt.Printf("%+v\n", Config)
} }

@ -38,5 +38,6 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
) )

@ -27,9 +27,11 @@ github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3a
github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gookit/color v1.5.0 h1:1Opow3+BWDwqor78DcJkJCIwnkviFi+rrOANki9BUFw= github.com/gookit/color v1.5.0 h1:1Opow3+BWDwqor78DcJkJCIwnkviFi+rrOANki9BUFw=
@ -193,6 +195,9 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

@ -2,7 +2,7 @@ package live
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"live-gateway/logger" "live-gateway/pkg/logger"
"live-gateway/ws" "live-gateway/ws"
) )

@ -1,40 +1,19 @@
package main package main
import ( import (
"fmt"
c "github.com/gookit/config/v2"
"github.com/gookit/config/v2/yaml"
"live-gateway/bilibili" "live-gateway/bilibili"
"live-gateway/config" "live-gateway/config"
"live-gateway/logger" "live-gateway/pkg/logger"
"live-gateway/mq"
"sync" "sync"
) )
var Config config.Config
func init() {
var err error
c.AddDriver(yaml.Driver)
err = c.LoadFiles("config.yml")
if err != nil {
return
}
err = c.BindStruct("", &Config)
if err != nil {
return
}
fmt.Printf("%+v\n", Config)
}
func main() { func main() {
_ = logger.InitLogger(&Config.Log.File, &Config.Log.Console) _ = logger.InitLogger(&config.Config.Log.File, &config.Config.Log.Console)
defer logger.Sync() defer logger.Sync()
var wg sync.WaitGroup var wg sync.WaitGroup
bLive := bilibili.NewLiveBilibili(&Config.Bilibili) bLive := bilibili.NewLiveBilibili()
wg.Add(1) wg.Add(1)
go func() { go func() {
if err := bLive.Serve(); err != nil { if err := bLive.Serve(); err != nil {
@ -43,10 +22,5 @@ func main() {
} }
}() }()
wg.Add(1)
go func() {
go mq.NewConsumer()
}()
wg.Wait() wg.Wait()
} }

@ -1,53 +0,0 @@
package mq
import (
"context"
"github.com/Shopify/sarama"
"live-gateway/logger"
)
type msgConsumerGroup struct{}
func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
logger.SLog.Debugf("Message topic:%q partition:%d offset:%d value:%s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 标记sarama会自动进行提交默认间隔1秒
sess.MarkMessage(msg, "")
}
return nil
}
func consumerGroup() sarama.ConsumerGroupHandler {
return &msgConsumerGroup{}
}
func NewConsumer() {
config := sarama.NewConfig()
config.Version = sarama.V3_1_0_0
config.Consumer.Return.Errors = false
config.Consumer.Offsets.Initial = sarama.OffsetNewest
var err error
client, err := sarama.NewClient([]string{"127.0.0.1:9093"}, config)
cGroup, err := sarama.NewConsumerGroupFromClient("test", client)
if err != nil {
return
}
defer cGroup.Close()
for {
err := cGroup.Consume(context.Background(), []string{"danmaku"}, consumerGroup())
if err != nil {
logger.SLog.Error(err.Error())
continue
}
}
}

@ -1,61 +0,0 @@
package mq
import (
"github.com/Shopify/sarama"
"live-gateway/logger"
)
type Producer struct {
producer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
}
func NewProducer() *Producer {
result := &Producer{}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有follower都回复ack,确保kafka不丢消息
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewHashPartitioner // 对key进行hash,同样的key落到同样的partition,保证个人消息有序性
var err error
client, err := sarama.NewClient([]string{"127.0.0.1:9093"}, config)
if err != nil {
return nil
}
// 暂时使用同步producer
result.producer, err = sarama.NewSyncProducerFromClient(client)
if err != nil {
return nil
}
result.asyncProducer, err = sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil
}
return result
}
func (p *Producer) SendDanmaku(uid, msg string) {
p.SendMessageSync("danmaku", uid, msg)
}
func (p *Producer) SendMessageAsync(topic, uid string, msg string) {
p.asyncProducer.Input()
}
func (p *Producer) SendMessageSync(topic, uid string, msg string) {
partition, offset, err := p.producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(uid),
Value: sarama.StringEncoder(msg),
})
if err != nil {
logger.SLog.Error("err", err)
return
}
logger.SLog.Debug("success partition:", partition, "offset", offset)
}

@ -0,0 +1,179 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: danmaku.proto
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Danmaku struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"`
Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"`
Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"`
Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"`
SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"`
}
func (x *Danmaku) Reset() {
*x = Danmaku{}
if protoimpl.UnsafeEnabled {
mi := &file_danmaku_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Danmaku) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Danmaku) ProtoMessage() {}
func (x *Danmaku) ProtoReflect() protoreflect.Message {
mi := &file_danmaku_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Danmaku.ProtoReflect.Descriptor instead.
func (*Danmaku) Descriptor() ([]byte, []int) {
return file_danmaku_proto_rawDescGZIP(), []int{0}
}
func (x *Danmaku) GetPlatform() string {
if x != nil {
return x.Platform
}
return ""
}
func (x *Danmaku) GetUid() int64 {
if x != nil {
return x.Uid
}
return 0
}
func (x *Danmaku) GetUname() string {
if x != nil {
return x.Uname
}
return ""
}
func (x *Danmaku) GetContent() string {
if x != nil {
return x.Content
}
return ""
}
func (x *Danmaku) GetSendTime() int64 {
if x != nil {
return x.SendTime
}
return 0
}
var File_danmaku_proto protoreflect.FileDescriptor
var file_danmaku_proto_rawDesc = []byte{
0x0a, 0x0d, 0x64, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x02, 0x70, 0x62, 0x22, 0x83, 0x01, 0x0a, 0x07, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x12,
0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x75,
0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a,
0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e,
0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a,
0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52,
0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_danmaku_proto_rawDescOnce sync.Once
file_danmaku_proto_rawDescData = file_danmaku_proto_rawDesc
)
func file_danmaku_proto_rawDescGZIP() []byte {
file_danmaku_proto_rawDescOnce.Do(func() {
file_danmaku_proto_rawDescData = protoimpl.X.CompressGZIP(file_danmaku_proto_rawDescData)
})
return file_danmaku_proto_rawDescData
}
var file_danmaku_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_danmaku_proto_goTypes = []interface{}{
(*Danmaku)(nil), // 0: pb.Danmaku
}
var file_danmaku_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_danmaku_proto_init() }
func file_danmaku_proto_init() {
if File_danmaku_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_danmaku_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Danmaku); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_danmaku_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_danmaku_proto_goTypes,
DependencyIndexes: file_danmaku_proto_depIdxs,
MessageInfos: file_danmaku_proto_msgTypes,
}.Build()
File_danmaku_proto = out.File
file_danmaku_proto_rawDesc = nil
file_danmaku_proto_goTypes = nil
file_danmaku_proto_depIdxs = nil
}

@ -0,0 +1,13 @@
syntax = "proto3";
package pb;
option go_package = "/pb";
message Danmaku {
string platform = 1;
int64 uid = 2;
string uname = 3;
string content = 4;
int64 sendTime = 5;
}

@ -0,0 +1 @@
protoc --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_opt=require_unimplemented_servers=false --go_out=. --go-grpc_out=. --proto_path=. *.proto

@ -0,0 +1,41 @@
package kafka
import (
"github.com/Shopify/sarama"
"live-gateway/pkg/logger"
)
type Consumer struct {
client sarama.Client
topic string
consumer sarama.Consumer
partitions []int32
}
func NewKafkaConsumer(addr []string, topic string) (*Consumer, error) {
p := Consumer{}
p.topic = topic
config := sarama.NewConfig()
config.Version = sarama.V3_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
var err error
p.client, err = sarama.NewClient(addr, config)
if err != nil {
logger.SLog.Error("new kafka client err:", err)
return nil, err
}
p.consumer, err = sarama.NewConsumerFromClient(p.client)
if err != nil {
logger.SLog.Error("new kafka consumer err:", err)
return nil, err
}
p.partitions, err = p.consumer.Partitions(topic)
if err != nil {
logger.SLog.Errorf("get partitions for topic %s err", topic)
return nil, err
}
return &p, nil
}

@ -0,0 +1,47 @@
package kafka
import (
"context"
"github.com/Shopify/sarama"
"live-gateway/pkg/logger"
)
type ConsumerGroup struct {
sarama.ConsumerGroup
groupId string
topics []string
}
type ConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
}
func NewConsumerGroup(config *ConsumerGroupConfig, addr, topics []string, groupId string) (*ConsumerGroup, error) {
c := sarama.NewConfig()
c.Version = config.KafkaVersion
c.Consumer.Offsets.Initial = config.OffsetsInitial
c.Consumer.Return.Errors = config.IsReturnErr
client, err := sarama.NewClient(addr, c)
if err != nil {
return nil, err
}
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client)
if err != nil {
return nil, err
}
return &ConsumerGroup{consumerGroup, groupId, topics}, nil
}
func (cg *ConsumerGroup) RegisterHandlerAndConsumer(handler sarama.ConsumerGroupHandler) {
ctx := context.Background()
for {
err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler)
if err != nil {
logger.SLog.Error("RegisterHandlerAndConsumer error: ", err)
}
}
}

@ -0,0 +1,62 @@
package kafka
import (
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"live-gateway/pkg/logger"
)
type Producer struct {
topic string
client sarama.Client
producer sarama.AsyncProducer
}
func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{}
config := sarama.NewConfig() //Instantiate a sarama Config
config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
p.topic = topic
var err error
p.client, err = sarama.NewClient(addr, config)
if err != nil {
logger.SLog.Error("new kafka client err:", err)
return &p
}
p.producer, err = sarama.NewAsyncProducerFromClient(p.client)
if err != nil {
logger.SLog.Error("new kafka producer err:", err)
return &p
}
go func() {
for range p.producer.Successes() {
}
}()
return &p
}
func (p *Producer) SendMessageAsync(m proto.Message, key ...string) error {
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
if len(key) > 0 {
kMsg.Key = sarama.StringEncoder(key[0])
}
bMsg, err := proto.Marshal(m)
if err != nil {
logger.SLog.Error("proto marshal err:", err)
return err
}
kMsg.Value = sarama.ByteEncoder(bMsg)
select {
case p.producer.Input() <- kMsg:
}
return nil
}
Loading…
Cancel
Save