package kafka import ( "github.com/Shopify/sarama" ) type Consumer struct { client sarama.Client topic string consumer sarama.Consumer partitions []int32 } func NewKafkaConsumer(addr []string, topic string) (*Consumer, error) { p := Consumer{} p.topic = topic config := sarama.NewConfig() config.Version = sarama.V3_1_0_0 config.Consumer.Offsets.Initial = sarama.OffsetNewest var err error p.client, err = sarama.NewClient(addr, config) if err != nil { return nil, err } p.consumer, err = sarama.NewConsumerFromClient(p.client) if err != nil { return nil, err } p.partitions, err = p.consumer.Partitions(topic) if err != nil { return nil, err } return &p, nil }