package kafka import ( "git.noahlan.cn/noahlan/ntool/ndef" "github.com/Shopify/sarama" ) type Producer struct { topic string client sarama.Client producer sarama.AsyncProducer marshaler ndef.Marshaler } type ProducerConfig struct { RequiredAcks sarama.RequiredAcks Partitioner sarama.PartitionerConstructor IsReturnSuccess bool Marshaler ndef.Marshaler } func NewKafkaProducer(cfg *ProducerConfig, addr []string, topic string) *Producer { p := Producer{} config := sarama.NewConfig() // Instantiate a sarama Config config.Producer.Return.Successes = cfg.IsReturnSuccess // Whether to enable the successes channel to be notified after the message is sent successfully config.Producer.RequiredAcks = cfg.RequiredAcks // Set producer Message Reply level 0 1 all config.Producer.Partitioner = cfg.Partitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly p.marshaler = cfg.Marshaler p.topic = topic var err error p.client, err = sarama.NewClient(addr, config) if err != nil { return &p } p.producer, err = sarama.NewAsyncProducerFromClient(p.client) if err != nil { return &p } go func() { for range p.producer.Successes() { } }() return &p } func (p *Producer) SendMessageAsync(v interface{}, key ...string) error { kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic if len(key) > 0 { kMsg.Key = sarama.StringEncoder(key[0]) } if str, ok := v.(string); ok { kMsg.Value = sarama.StringEncoder(str) } else if p.marshaler != nil { marshal, err := p.marshaler.Marshal(v) if err != nil { return err } kMsg.Value = sarama.ByteEncoder(marshal) } select { case p.producer.Input() <- kMsg: } return nil }