from __future__ import annotations from typing import Any from .errors import MQConfigError class KafkaBackend: def __init__( self, config: dict[str, Any], *, producer_factory: Any | None = None, consumer_factory: Any | None = None, ) -> None: self.config = config self._producer_factory = producer_factory self._consumer_factory = consumer_factory def create_producer(self): if self._producer_factory is not None: return self._producer_factory(self.producer_config()) return self._confluent_producer()(self.producer_config()) def create_consumer(self, group_id: str, config: dict[str, Any] | None = None): consumer_config = self.consumer_config(group_id, config) if self._consumer_factory is not None: return self._consumer_factory(consumer_config) return self._confluent_consumer()(consumer_config) def producer_config(self) -> dict[str, Any]: base = self._common_config() base.update(dict(self.config.get("producer") or {})) return base def consumer_config(self, group_id: str, config: dict[str, Any] | None = None) -> dict[str, Any]: base = self._common_config() base.update(dict(self.config.get("consumer") or {})) base.update(dict(config or {})) base["group.id"] = group_id base["enable.auto.commit"] = False base.setdefault("auto.offset.reset", self.config.get("auto_offset_reset", "earliest")) return base def _common_config(self) -> dict[str, Any]: bootstrap_servers = self.config.get("bootstrap_servers") if not bootstrap_servers: raise MQConfigError("mq kafka bootstrap_servers is required") common = {"bootstrap.servers": bootstrap_servers} client_id = self.config.get("client_id") if client_id: common["client.id"] = client_id common.update(dict(self.config.get("common") or {})) return common def _confluent_producer(self): try: from confluent_kafka import Producer except ImportError as exc: raise MQConfigError( "confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]" ) from exc return Producer def _confluent_consumer(self): try: from confluent_kafka import Consumer except ImportError as exc: raise MQConfigError( "confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]" ) from exc return Consumer