package kafka import ( "context" "git.noahlan.cn/noahlan/ntool/ndef" "github.com/Shopify/sarama" ) type ConsumerGroup struct { sarama.ConsumerGroup groupId string topics []string unMarshaler ndef.Unmarshaler } type ConsumerGroupConfig struct { KafkaVersion sarama.KafkaVersion OffsetsInitial int64 IsReturnErr bool UnMarshaler ndef.Unmarshaler } func NewConsumerGroup(config *ConsumerGroupConfig, addr, topics []string, groupId string) (*ConsumerGroup, error) { c := sarama.NewConfig() c.Version = config.KafkaVersion c.Consumer.Offsets.Initial = config.OffsetsInitial c.Consumer.Return.Errors = config.IsReturnErr client, err := sarama.NewClient(addr, c) if err != nil { return nil, err } consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client) if err != nil { return nil, err } return &ConsumerGroup{ ConsumerGroup: consumerGroup, groupId: groupId, topics: topics, unMarshaler: config.UnMarshaler, }, nil } func (cg *ConsumerGroup) RegisterHandlerAndConsumer(handler sarama.ConsumerGroupHandler) { ctx := context.Background() for { err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler) if err != nil { } } }