package kafka import ( "dcg/pkg/logger" "github.com/Shopify/sarama" "google.golang.org/protobuf/proto" ) type Producer struct { topic string client sarama.Client producer sarama.AsyncProducer } func NewKafkaProducer(addr []string, topic string) *Producer { p := Producer{} config := sarama.NewConfig() //Instantiate a sarama Config config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly p.topic = topic var err error p.client, err = sarama.NewClient(addr, config) if err != nil { logger.SLog.Error("new kafka client err:", err) return &p } p.producer, err = sarama.NewAsyncProducerFromClient(p.client) if err != nil { logger.SLog.Error("new kafka producer err:", err) return &p } go func() { for range p.producer.Successes() { } }() return &p } func (p *Producer) SendMessageAsync(m proto.Message, key ...string) error { kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic if len(key) > 0 { kMsg.Key = sarama.StringEncoder(key[0]) } bMsg, err := proto.Marshal(m) if err != nil { logger.SLog.Error("proto marshal err:", err) return err } kMsg.Value = sarama.ByteEncoder(bMsg) select { case p.producer.Input() <- kMsg: } return nil }