You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ntool-biz/core/mq/kafka/consumer.go

38 lines
706 B
Go

package kafka
import (
"github.com/Shopify/sarama"
)
type Consumer struct {
client sarama.Client
topic string
consumer sarama.Consumer
partitions []int32
}
func NewKafkaConsumer(addr []string, topic string) (*Consumer, error) {
p := Consumer{}
p.topic = topic
config := sarama.NewConfig()
config.Version = sarama.V3_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
var err error
p.client, err = sarama.NewClient(addr, config)
if err != nil {
return nil, err
}
p.consumer, err = sarama.NewConsumerFromClient(p.client)
if err != nil {
return nil, err
}
p.partitions, err = p.consumer.Partitions(topic)
if err != nil {
return nil, err
}
return &p, nil
}