You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
73 lines
2.6 KiB
Python
73 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from .errors import MQConfigError
|
|
|
|
|
|
class KafkaBackend:
|
|
def __init__(
|
|
self,
|
|
config: dict[str, Any],
|
|
*,
|
|
producer_factory: Any | None = None,
|
|
consumer_factory: Any | None = None,
|
|
) -> None:
|
|
self.config = config
|
|
self._producer_factory = producer_factory
|
|
self._consumer_factory = consumer_factory
|
|
|
|
def create_producer(self):
|
|
if self._producer_factory is not None:
|
|
return self._producer_factory(self.producer_config())
|
|
return self._confluent_producer()(self.producer_config())
|
|
|
|
def create_consumer(self, group_id: str, config: dict[str, Any] | None = None):
|
|
consumer_config = self.consumer_config(group_id, config)
|
|
if self._consumer_factory is not None:
|
|
return self._consumer_factory(consumer_config)
|
|
return self._confluent_consumer()(consumer_config)
|
|
|
|
def producer_config(self) -> dict[str, Any]:
|
|
base = self._common_config()
|
|
base.update(dict(self.config.get("producer") or {}))
|
|
return base
|
|
|
|
def consumer_config(self, group_id: str, config: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
base = self._common_config()
|
|
base.update(dict(self.config.get("consumer") or {}))
|
|
base.update(dict(config or {}))
|
|
base["group.id"] = group_id
|
|
base["enable.auto.commit"] = False
|
|
base.setdefault("auto.offset.reset", self.config.get("auto_offset_reset", "earliest"))
|
|
return base
|
|
|
|
def _common_config(self) -> dict[str, Any]:
|
|
bootstrap_servers = self.config.get("bootstrap_servers")
|
|
if not bootstrap_servers:
|
|
raise MQConfigError("mq kafka bootstrap_servers is required")
|
|
common = {"bootstrap.servers": bootstrap_servers}
|
|
client_id = self.config.get("client_id")
|
|
if client_id:
|
|
common["client.id"] = client_id
|
|
common.update(dict(self.config.get("common") or {}))
|
|
return common
|
|
|
|
def _confluent_producer(self):
|
|
try:
|
|
from confluent_kafka import Producer
|
|
except ImportError as exc:
|
|
raise MQConfigError(
|
|
"confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]"
|
|
) from exc
|
|
return Producer
|
|
|
|
def _confluent_consumer(self):
|
|
try:
|
|
from confluent_kafka import Consumer
|
|
except ImportError as exc:
|
|
raise MQConfigError(
|
|
"confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]"
|
|
) from exc
|
|
return Consumer
|