package mq import ( "github.com/Shopify/sarama" "live-gateway/logger" ) type Producer struct { producer sarama.SyncProducer asyncProducer sarama.AsyncProducer } func NewProducer() *Producer { result := &Producer{} config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有follower都回复ack,确保kafka不丢消息 config.Producer.Return.Successes = true config.Producer.Partitioner = sarama.NewHashPartitioner // 对key进行hash,同样的key落到同样的partition,保证个人消息有序性 var err error client, err := sarama.NewClient([]string{"127.0.0.1:9093"}, config) if err != nil { return nil } // 暂时使用同步producer result.producer, err = sarama.NewSyncProducerFromClient(client) if err != nil { return nil } result.asyncProducer, err = sarama.NewAsyncProducerFromClient(client) if err != nil { return nil } return result } func (p *Producer) SendDanmaku(uid, msg string) { p.SendMessageSync("danmaku", uid, msg) } func (p *Producer) SendMessageAsync(topic, uid string, msg string) { p.asyncProducer.Input() } func (p *Producer) SendMessageSync(topic, uid string, msg string) { partition, offset, err := p.producer.SendMessage(&sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(uid), Value: sarama.StringEncoder(msg), }) if err != nil { logger.SLog.Error("err", err) return } logger.SLog.Debug("success partition:", partition, "offset", offset) }