from __future__ import annotations from typing import Any from .backend import KafkaBackend from .client import MQClient, MQSender, mq_client from .errors import MQConfigError, MQError, MQPublishError from .message import MQMessage from .registry import ( MQConsumerDefinition, MQProducerDefinition, MQRegistry, mq_consumer, mq_registry, ) from .runner import MQConsumerRunner def get_mq_registry(app) -> MQRegistry: registry = getattr(app.state, "iti_mq_registry", None) if registry is None: registry = mq_registry app.state.iti_mq_registry = registry return registry def init_mq( app, config: dict[str, Any] | None = None, *, registry: MQRegistry | None = None, producer_factory: Any | None = None, consumer_factory: Any | None = None, ) -> None: config = dict(config or {}) registry = registry or get_mq_registry(app) backend_name = str(config.get("backend", "kafka")) if backend_name != "kafka": raise MQConfigError(f"unsupported mq backend: {backend_name}") backend = KafkaBackend( config, producer_factory=producer_factory, consumer_factory=consumer_factory, ) client = MQClient(backend.create_producer(), registry) runner = MQConsumerRunner( app, backend, registry, group_id=config.get("group_id"), failure_backoff_seconds=float(config.get("failure_backoff_seconds", 1.0)), poll_timeout_seconds=float(config.get("poll_timeout_seconds", 1.0)), ) app.state.iti_mq_registry = registry app.state.iti_mq_backend = backend app.state.iti_mq_client = client app.state.iti_mq_runner = runner __all__ = [ "KafkaBackend", "MQClient", "MQConfigError", "MQConsumerDefinition", "MQConsumerRunner", "MQError", "MQMessage", "MQProducerDefinition", "MQPublishError", "MQRegistry", "MQSender", "get_mq_registry", "init_mq", "mq_client", "mq_consumer", "mq_registry", ]