From 7cc0501c27122d8f1d3b81e7894eb5e074592ba7 Mon Sep 17 00:00:00 2001 From: NoahLan <6995syu@163.com> Date: Tue, 16 Jun 2026 13:29:42 +0800 Subject: [PATCH] feat: add mq --- .codex/skills/iti-flask-framework/SKILL.md | 3 + README.md | 2 + .../SKILL.md.jinja | 1 + copier-template/.env.example.jinja | 9 + docs/ARCHITECTURE.md | 12 +- docs/CONFIGURATION.md | 31 ++ docs/MODULES.md | 27 +- docs/MQ.md | 145 +++++++ docs/README.md | 1 + iti/app.py | 12 + iti/config.py | 21 + iti/exchange/base.py | 3 + iti/modules/base.py | 3 + iti/modules/registry.py | 1 + iti/mq/__init__.py | 77 ++++ iti/mq/backend.py | 72 ++++ iti/mq/client.py | 110 +++++ iti/mq/errors.py | 13 + iti/mq/message.py | 19 + iti/mq/registry.py | 116 ++++++ iti/mq/runner.py | 181 +++++++++ iti/mq/serialization.py | 47 +++ pyproject.toml | 2 + tests/test_config.py | 32 ++ tests/test_modules.py | 42 ++ tests/test_mq.py | 383 ++++++++++++++++++ 26 files changed, 1357 insertions(+), 8 deletions(-) create mode 100644 docs/MQ.md create mode 100644 iti/mq/__init__.py create mode 100644 iti/mq/backend.py create mode 100644 iti/mq/client.py create mode 100644 iti/mq/errors.py create mode 100644 iti/mq/message.py create mode 100644 iti/mq/registry.py create mode 100644 iti/mq/runner.py create mode 100644 iti/mq/serialization.py create mode 100644 tests/test_mq.py diff --git a/.codex/skills/iti-flask-framework/SKILL.md b/.codex/skills/iti-flask-framework/SKILL.md index f8f2b93..b930fdf 100644 --- a/.codex/skills/iti-flask-framework/SKILL.md +++ b/.codex/skills/iti-flask-framework/SKILL.md @@ -26,6 +26,7 @@ iTi-Flask 是 FastAPI 后端框架基座。 - `iti/modules/*`:模块协议、权限元数据、菜单 seed 元数据。 - `iti/responses/*`:envelope、raw response 逃逸。 - `iti/service_client/*`:同步 HTTP JSON 客户端和注册表。 +- `iti/mq/*`:通用 MQ 入口,v1 Kafka backend,生产者、消费者和 runner。 - `iti/tasks/*`:单进程任务注册和 runner。 - `iti/audit.py`:审计事件发送器,不拥有系统日志表。 - `iti/storage/*`:存储后端接口和实现。 @@ -40,6 +41,8 @@ iTi-Flask 是 FastAPI 后端框架基座。 - 保留 raw 默认值:`/health`、`/ready`、`/docs`、`/openapi.json`。 - `/docs` 是文档入口,按 `docs_ui_enabled` 展示 Swagger、Scalar、ReDoc 等已启用 UI。 - 模块元数据使用 `ModulePermission` 和 `ModuleMenuSeed`。 +- MQ 默认关闭;Kafka 客户端依赖放在 `mq-kafka` extra,不放核心依赖。 +- MQ 消费成功后提交 offset;handler 失败不提交,seek 回当前消息并按 backoff 重试。 - migration 归生成项目所有。框架不要静默接管业务项目 migration 流。 - 审计保持异步、非阻塞。框架只发事件,接收方在框架外。 - 不为未发生的需求加宽泛兼容层。 diff --git a/README.md b/README.md index 0ba1f29..41b84ec 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ AI 修改框架代码或文档时优先读: - 缓存、限流、事件总线。 - 模块注册、权限元数据、菜单 seed 元数据。 - 同步 HTTP 服务客户端。 +- 通用 MQ 入口,Kafka backend 可选。 - 运行日志和审计事件 sender。 - 单机轻量任务 runner。 - `/health` 和 `/ready` 健康检查。 @@ -135,6 +136,7 @@ iticli release v0.3.0 - [模块协议](docs/MODULES.md) - [模板与导入导出](docs/EXCHANGE.md) - [服务客户端](docs/SERVICE_CLIENT.md) +- [MQ](docs/MQ.md) - [任务运行器](docs/TASKS.md) - [数据库迁移](docs/MIGRATIONS.md) - [种子数据](docs/SEEDS.md) diff --git a/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja b/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja index f5c05d6..592c9fa 100644 --- a/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja +++ b/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja @@ -51,6 +51,7 @@ description: "{{ project_name }} 业务项目 skill。用于当前由 iTi-Flask - `register_permissions(app)`:用 `ModulePermission` 声明权限元数据。 - `register_menu_seed(app)`:用 `ModuleMenuSeed` 声明后台菜单 seed 元数据。 - `register_tasks(app)`:按需注册本地任务。 +- `register_mq(app)`:按需注册 MQ producer 和 consumer。 - `init_app(app)`:按需接入配置或服务客户端。 业务模块优先放在 `app/modules//`。 diff --git a/copier-template/.env.example.jinja b/copier-template/.env.example.jinja index f6b512d..174edce 100644 --- a/copier-template/.env.example.jinja +++ b/copier-template/.env.example.jinja @@ -18,3 +18,12 @@ POSTGRES_PASSWORD=change-me SECRET_KEY=change-me JWT_SECRET_KEY=change-me LOG_FILE_ENABLED=true + +MQ_ENABLED=false +MQ_BACKEND=kafka +KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 +KAFKA_GROUP_ID={{ project_slug | lower | replace('-', '_') }} +KAFKA_CLIENT_ID={{ project_slug | lower | replace('-', '_') }} +KAFKA_AUTO_OFFSET_RESET=earliest +MQ_FAILURE_BACKOFF_SECONDS=1 +MQ_POLL_TIMEOUT_SECONDS=1 diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 9700f4e..203affa 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -12,6 +12,7 @@ iTi-Flask 是 FastAPI 框架基座。 - `iti.responses`:自动 envelope、`@raw_response`、响应工具。 - `iti.modules`:模块协议、权限元数据、菜单 seed 元数据。 - `iti.service_client`:同步 HTTP JSON 服务客户端。 +- `iti.mq`:通用 MQ 入口,v1 支持 Kafka backend。 - `iti.tasks`:单进程轻量任务注册和调度。 - `iti.audit`:审计事件、diff、脱敏和异步 HTTP sender,不拥有日志表。 - `iti.logging_config`:运行日志配置。 @@ -29,11 +30,12 @@ iTi-Flask 是 FastAPI 框架基座。 5. 注册中间件、错误处理和运行日志。 6. 初始化服务客户端。 7. 初始化任务 runner。 -8. 初始化审计 sender。 -9. 初始化模块并运行 `init_app`、`register_tasks`。 -10. 注册健康检查。 -11. 注册模块路由、权限元数据和菜单 seed 元数据。 -12. 安装自动 envelope。 +8. 按配置初始化 MQ。 +9. 初始化审计 sender。 +10. 初始化模块并运行 `init_app`、`register_tasks`、`register_mq`。 +11. 注册健康检查。 +12. 注册模块路由、权限元数据和菜单 seed 元数据。 +13. 安装自动 envelope。 ## API 约定 diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index d949d61..54aad44 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -85,6 +85,8 @@ POSTGRES_DB=iti_dev | `services` | 服务客户端配置 | | `service_tokens` | 可信服务 token | | `tasks_enabled` | 是否启动单机任务调度线程 | +| `mq_enabled` | 是否启动 MQ consumer runner,默认关闭 | +| `mq` | MQ 配置,v1 支持 Kafka backend | | `exchange_enabled` | 是否默认自动挂载本地交换模块 | | `exchange_default_storage` | 导入导出默认文件存储类型 | | `exchange_storage` | 导入导出文件存储配置 | @@ -110,3 +112,32 @@ class DevConfig(BaseDevConfig): config = {"dev": DevConfig, "default": DevConfig} ``` + +## MQ + +MQ 默认关闭。 +Kafka backend 需要安装 `iti-flask[mq-kafka]`。 + +可用环境变量: + +```bash +MQ_ENABLED=false +MQ_BACKEND=kafka +KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 +KAFKA_GROUP_ID=my-app +KAFKA_CLIENT_ID=my-app-dev +KAFKA_AUTO_OFFSET_RESET=earliest +MQ_FAILURE_BACKOFF_SECONDS=1 +MQ_POLL_TIMEOUT_SECONDS=1 +``` + +业务项目也可以直接覆盖: + +```python +self.mq_enabled = True +self.mq = { + "backend": "kafka", + "bootstrap_servers": "127.0.0.1:9092", + "group_id": "my-app", +} +``` diff --git a/docs/MODULES.md b/docs/MODULES.md index 7883c56..29d5efe 100644 --- a/docs/MODULES.md +++ b/docs/MODULES.md @@ -32,15 +32,19 @@ class ExampleModule: def register_tasks(self, app): pass + + def register_mq(self, app): + pass ``` 执行顺序: 1. `init_app` 2. `register_tasks` -3. `register_routes` -4. `register_permissions` -5. `register_menu_seed` +3. `register_mq` +4. `register_routes` +5. `register_permissions` +6. `register_menu_seed` ## 路由 @@ -89,3 +93,20 @@ from iti.exchange.module import create_exchange_module app = create_app(modules=[create_exchange_module()]) ``` + +## MQ + +业务模块可以在 `register_mq(app)` 里注册 MQ producer 和 consumer。 + +```python +from iti.mq import mq_registry + + +def register_mq(self, app): + mq_registry.register_consumer( + name="example-events", + topics=["example.events"], + group_id="my-app", + handler=handle_event, + ) +``` diff --git a/docs/MQ.md b/docs/MQ.md new file mode 100644 index 0000000..a046120 --- /dev/null +++ b/docs/MQ.md @@ -0,0 +1,145 @@ +# MQ + +iTi-Flask 提供通用 MQ 入口。 +v1 内置 Kafka backend,底层使用 `confluent-kafka`。 + +## 安装 + +Kafka 客户端是可选依赖: + +```bash +uv add "iti-flask[mq-kafka]" +``` + +未启用 MQ 的项目不需要安装 Kafka 依赖。 + +## 配置 + +MQ 默认关闭。 + +```python +class DevConfig(BaseDevConfig): + def __init__(self) -> None: + super().__init__() + self.mq_enabled = True + self.mq = { + "backend": "kafka", + "bootstrap_servers": "127.0.0.1:9092", + "group_id": "my-app", + "client_id": "my-app-dev", + "auto_offset_reset": "earliest", + "failure_backoff_seconds": 1.0, + } +``` + +也可以使用环境变量: + +```bash +MQ_ENABLED=true +MQ_BACKEND=kafka +KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 +KAFKA_GROUP_ID=my-app +KAFKA_CLIENT_ID=my-app-dev +KAFKA_AUTO_OFFSET_RESET=earliest +MQ_FAILURE_BACKOFF_SECONDS=1 +MQ_POLL_TIMEOUT_SECONDS=1 +``` + +## 发送消息 + +```python +from iti.mq import mq_client + +mq_client(app).send_json("mes.work_order.events", {"id": "MO001"}, key="MO001") +mq_client(app).send("raw.topic", value=b"bytes", key=b"k") +``` + +也可以先注册 producer: + +```python +from iti.mq import mq_registry + +mq_registry.register_producer( + name="work-order-events", + topic="mes.work_order.events", +) + +mq_client(app).sender("work-order-events").send_json({"id": "MO001"}) +``` + +## 监听消息 + +使用装饰器: + +```python +from iti.mq import MQMessage, mq_consumer + + +@mq_consumer("mes.work_order.created", group_id="hsyh-mes") +async def handle_work_order(message: MQMessage) -> None: + payload = message.value +``` + +也可以显式注册: + +```python +from iti.mq import mq_registry + + +def handle_work_order(message): + payload = message.value + + +mq_registry.register_consumer( + name="work-order-created", + topics=["mes.work_order.created"], + group_id="hsyh-mes", + handler=handle_work_order, +) +``` + +业务模块也可以在 `register_mq(app)` 阶段注册 consumer 和 producer。 + +## 消息格式 + +默认 `value_format="json"`。 +框架用 UTF-8 JSON 解码 `message.value`,并保留 `message.raw_value`。 + +`value_format="bytes"` 时,`message.value` 是原始 bytes。 + +`MQMessage` 包含: + +- `app` +- `topic` +- `partition` +- `offset` +- `key` +- `raw_key` +- `value` +- `raw_value` +- `headers` +- `timestamp` +- `raw_message` + +## 消费语义 + +Kafka consumer 强制设置: + +```python +enable.auto.commit = False +``` + +handler 正常结束后,框架同步提交当前消息 offset。 +handler 抛错或 JSON 解码失败时,框架记录日志、不提交 offset、seek 回当前消息,并按 `failure_backoff_seconds` 等待后重试。 + +每个 consumer definition 启动一个线程,单线程顺序处理消息。 + +## 不支持 + +v1 不支持: + +- DLQ。 +- 批量消费。 +- exactly-once 事务。 +- 多 backend。 +- Kafka topic 自动创建。 diff --git a/docs/README.md b/docs/README.md index 9d3cdf5..55b4b73 100644 --- a/docs/README.md +++ b/docs/README.md @@ -10,6 +10,7 @@ AI 修改框架时优先读 `.codex/skills/iti-flask-framework/SKILL.md`。 - [配置](CONFIGURATION.md) - [模块协议](MODULES.md) - [模板与导入导出](EXCHANGE.md) +- [MQ](MQ.md) - [Copier 模板](COPIER_TEMPLATE.md) - [测试与部署](TESTING_DEPLOYMENT.md) diff --git a/iti/app.py b/iti/app.py index 5589115..5ebc0d7 100644 --- a/iti/app.py +++ b/iti/app.py @@ -45,6 +45,7 @@ from iti.logging_config import configure_logging, log_extra from iti.modules import init_modules from iti.exchange import get_exchange_registry from iti.exchange import models as _exchange_models +from iti.mq import get_mq_registry, init_mq from iti.responses.auto import is_envelope_payload, is_raw_response_request from iti.responses import fail from iti.service_client import init_service_clients @@ -72,13 +73,21 @@ def create_app( async def lifespan(app: FastAPI): runner = getattr(app.state, "iti_task_runner", None) audit_dispatcher = getattr(app.state, "audit_dispatcher", None) + mq_runner = getattr(app.state, "iti_mq_runner", None) + mq_client = getattr(app.state, "iti_mq_client", None) if audit_dispatcher: audit_dispatcher.start() + if mq_runner and config.mq_enabled: + mq_runner.start() if runner and config.tasks_enabled: runner.start() yield if runner: runner.stop() + if mq_runner: + mq_runner.stop() + if mq_client: + mq_client.flush() if audit_dispatcher: audit_dispatcher.stop() for client in getattr(app.state, "iti_service_clients", {}).values(): @@ -98,6 +107,7 @@ def create_app( app.state.limiter = SimpleLimiter(enabled=config.ratelimit_enabled) app.state.permission_provider = permission_provider or StaticPermissionProvider() app.state.exchange_enabled = config.exchange_enabled + app.state.iti_mq_registry = get_mq_registry(app) init_middlewares(app) @@ -121,6 +131,8 @@ def create_app( init_error_handlers(app) init_service_clients(app, config.services) init_task_runner(app) + if config.mq_enabled: + init_mq(app, config.mq) get_exchange_registry(app) init_audit(app) module_list = list(modules or []) diff --git a/iti/config.py b/iti/config.py index adfb06b..c010985 100644 --- a/iti/config.py +++ b/iti/config.py @@ -39,6 +39,13 @@ def env_bool(key: str, default: bool = False) -> bool: return value.lower() in {"1", "true", "yes", "on"} +def env_float(key: str, default: float) -> float: + value = os.getenv(key) + if value is None: + return default + return float(value) + + def normalize_database_dialect(dialect: str) -> str: normalized = _DATABASE_DIALECT_ALIASES.get(dialect.strip().lower()) if normalized is None: @@ -159,6 +166,19 @@ class BaseConfig: audit_batch_size: int = 20 audit_flush_interval_seconds: float = 1.0 + mq_enabled: bool = field(default_factory=lambda: env_bool("MQ_ENABLED", False)) + mq: dict[str, Any] = field( + default_factory=lambda: { + "backend": os.getenv("MQ_BACKEND", "kafka"), + "bootstrap_servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "group_id": os.getenv("KAFKA_GROUP_ID"), + "client_id": os.getenv("KAFKA_CLIENT_ID"), + "auto_offset_reset": os.getenv("KAFKA_AUTO_OFFSET_RESET", "earliest"), + "failure_backoff_seconds": env_float("MQ_FAILURE_BACKOFF_SECONDS", 1.0), + "poll_timeout_seconds": env_float("MQ_POLL_TIMEOUT_SECONDS", 1.0), + } + ) + class DevConfig(BaseConfig): def __init__(self) -> None: @@ -188,6 +208,7 @@ class TestConfig(BaseConfig): ratelimit_enabled=False, log_file_enabled=False, audit_enabled=False, + mq_enabled=False, ) diff --git a/iti/exchange/base.py b/iti/exchange/base.py index 1e9efc0..b95e78b 100644 --- a/iti/exchange/base.py +++ b/iti/exchange/base.py @@ -260,6 +260,9 @@ class DataExchangeModule(Protocol): def register_tasks(self, app) -> None: ... + def register_mq(self, app) -> None: + ... + def _coerce_layout(value: ExchangeTemplateLayout | dict[str, Any] | None) -> ExchangeTemplateLayout: if value is None: diff --git a/iti/modules/base.py b/iti/modules/base.py index 6b3f763..e9fdfe6 100644 --- a/iti/modules/base.py +++ b/iti/modules/base.py @@ -65,3 +65,6 @@ class ItiModule(Protocol): def register_tasks(self, app) -> None: """Register module tasks.""" + + def register_mq(self, app) -> None: + """Register module MQ consumers and producers.""" diff --git a/iti/modules/registry.py b/iti/modules/registry.py index 2de0ea7..4f18dea 100644 --- a/iti/modules/registry.py +++ b/iti/modules/registry.py @@ -83,4 +83,5 @@ def init_modules(app, modules: Iterable[Any] | None = None) -> ModuleRegistry: registry.extend(modules) registry.run_phase("init_app", app) registry.run_phase("register_tasks", app) + registry.run_phase("register_mq", app) return registry diff --git a/iti/mq/__init__.py b/iti/mq/__init__.py new file mode 100644 index 0000000..bcc3cd6 --- /dev/null +++ b/iti/mq/__init__.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from typing import Any + +from .backend import KafkaBackend +from .client import MQClient, MQSender, mq_client +from .errors import MQConfigError, MQError, MQPublishError +from .message import MQMessage +from .registry import ( + MQConsumerDefinition, + MQProducerDefinition, + MQRegistry, + mq_consumer, + mq_registry, +) +from .runner import MQConsumerRunner + + +def get_mq_registry(app) -> MQRegistry: + registry = getattr(app.state, "iti_mq_registry", None) + if registry is None: + registry = mq_registry + app.state.iti_mq_registry = registry + return registry + + +def init_mq( + app, + config: dict[str, Any] | None = None, + *, + registry: MQRegistry | None = None, + producer_factory: Any | None = None, + consumer_factory: Any | None = None, +) -> None: + config = dict(config or {}) + registry = registry or get_mq_registry(app) + backend_name = str(config.get("backend", "kafka")) + if backend_name != "kafka": + raise MQConfigError(f"unsupported mq backend: {backend_name}") + backend = KafkaBackend( + config, + producer_factory=producer_factory, + consumer_factory=consumer_factory, + ) + client = MQClient(backend.create_producer(), registry) + runner = MQConsumerRunner( + app, + backend, + registry, + group_id=config.get("group_id"), + failure_backoff_seconds=float(config.get("failure_backoff_seconds", 1.0)), + poll_timeout_seconds=float(config.get("poll_timeout_seconds", 1.0)), + ) + app.state.iti_mq_registry = registry + app.state.iti_mq_backend = backend + app.state.iti_mq_client = client + app.state.iti_mq_runner = runner + + +__all__ = [ + "KafkaBackend", + "MQClient", + "MQConfigError", + "MQConsumerDefinition", + "MQConsumerRunner", + "MQError", + "MQMessage", + "MQProducerDefinition", + "MQPublishError", + "MQRegistry", + "MQSender", + "get_mq_registry", + "init_mq", + "mq_client", + "mq_consumer", + "mq_registry", +] diff --git a/iti/mq/backend.py b/iti/mq/backend.py new file mode 100644 index 0000000..577f263 --- /dev/null +++ b/iti/mq/backend.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from typing import Any + +from .errors import MQConfigError + + +class KafkaBackend: + def __init__( + self, + config: dict[str, Any], + *, + producer_factory: Any | None = None, + consumer_factory: Any | None = None, + ) -> None: + self.config = config + self._producer_factory = producer_factory + self._consumer_factory = consumer_factory + + def create_producer(self): + if self._producer_factory is not None: + return self._producer_factory(self.producer_config()) + return self._confluent_producer()(self.producer_config()) + + def create_consumer(self, group_id: str, config: dict[str, Any] | None = None): + consumer_config = self.consumer_config(group_id, config) + if self._consumer_factory is not None: + return self._consumer_factory(consumer_config) + return self._confluent_consumer()(consumer_config) + + def producer_config(self) -> dict[str, Any]: + base = self._common_config() + base.update(dict(self.config.get("producer") or {})) + return base + + def consumer_config(self, group_id: str, config: dict[str, Any] | None = None) -> dict[str, Any]: + base = self._common_config() + base.update(dict(self.config.get("consumer") or {})) + base.update(dict(config or {})) + base["group.id"] = group_id + base["enable.auto.commit"] = False + base.setdefault("auto.offset.reset", self.config.get("auto_offset_reset", "earliest")) + return base + + def _common_config(self) -> dict[str, Any]: + bootstrap_servers = self.config.get("bootstrap_servers") + if not bootstrap_servers: + raise MQConfigError("mq kafka bootstrap_servers is required") + common = {"bootstrap.servers": bootstrap_servers} + client_id = self.config.get("client_id") + if client_id: + common["client.id"] = client_id + common.update(dict(self.config.get("common") or {})) + return common + + def _confluent_producer(self): + try: + from confluent_kafka import Producer + except ImportError as exc: + raise MQConfigError( + "confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]" + ) from exc + return Producer + + def _confluent_consumer(self): + try: + from confluent_kafka import Consumer + except ImportError as exc: + raise MQConfigError( + "confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]" + ) from exc + return Consumer diff --git a/iti/mq/client.py b/iti/mq/client.py new file mode 100644 index 0000000..0b78c4c --- /dev/null +++ b/iti/mq/client.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from typing import Any + +from .errors import MQConfigError +from .registry import MQRegistry +from .serialization import encode_message_key, encode_message_value + + +class MQClient: + def __init__(self, producer: Any, registry: MQRegistry) -> None: + self._producer = producer + self._registry = registry + + def send_json( + self, + topic: str, + value: Any, + *, + key: str | bytes | None = None, + headers: dict[str, str | bytes | None] | None = None, + ) -> None: + self.send(topic, value=value, key=key, headers=headers, value_format="json") + + def send( + self, + topic: str, + *, + value: Any, + key: str | bytes | None = None, + headers: dict[str, str | bytes | None] | None = None, + value_format: str = "bytes", + ) -> None: + self._producer.poll(0) + self._producer.produce( + topic, + value=encode_message_value(value, value_format), + key=encode_message_key(key), + headers=_encode_headers(headers), + ) + + def sender(self, name: str) -> "MQSender": + definition = self._registry.producers.get(name) + if definition is None: + raise MQConfigError(f"mq producer not registered: {name}") + return MQSender(self, definition.topic, definition.value_format) + + def flush(self, timeout: float | None = None) -> None: + if timeout is None: + self._producer.flush() + else: + self._producer.flush(timeout) + + +class MQSender: + def __init__(self, client: MQClient, topic: str, value_format: str) -> None: + self._client = client + self.topic = topic + self.value_format = value_format + + def send_json( + self, + value: Any, + *, + key: str | bytes | None = None, + headers: dict[str, str | bytes | None] | None = None, + ) -> None: + self._client.send( + self.topic, + value=value, + key=key, + headers=headers, + value_format="json", + ) + + def send( + self, + value: Any, + *, + key: str | bytes | None = None, + headers: dict[str, str | bytes | None] | None = None, + ) -> None: + self._client.send( + self.topic, + value=value, + key=key, + headers=headers, + value_format=self.value_format, + ) + + +def _encode_headers( + headers: dict[str, str | bytes | None] | None, +) -> list[tuple[str, bytes | None]] | None: + if headers is None: + return None + result: list[tuple[str, bytes | None]] = [] + for key, value in headers.items(): + if value is None or isinstance(value, bytes): + result.append((key, value)) + else: + result.append((key, str(value).encode("utf-8"))) + return result + + +def mq_client(app) -> MQClient: + client = getattr(app.state, "iti_mq_client", None) + if client is None: + raise MQConfigError("mq client is not configured") + return client diff --git a/iti/mq/errors.py b/iti/mq/errors.py new file mode 100644 index 0000000..6b0eac1 --- /dev/null +++ b/iti/mq/errors.py @@ -0,0 +1,13 @@ +from __future__ import annotations + + +class MQError(RuntimeError): + """Base MQ error.""" + + +class MQConfigError(MQError): + """Raised when MQ configuration is missing or invalid.""" + + +class MQPublishError(MQError): + """Raised when a message cannot be published.""" diff --git a/iti/mq/message.py b/iti/mq/message.py new file mode 100644 index 0000000..46d7859 --- /dev/null +++ b/iti/mq/message.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class MQMessage: + app: Any + topic: str + partition: int + offset: int + key: Any + raw_key: bytes | None + value: Any + raw_value: bytes | None + headers: dict[str, bytes | None] + timestamp: tuple[int, int] | None + raw_message: Any diff --git a/iti/mq/registry.py b/iti/mq/registry.py new file mode 100644 index 0000000..f91ad6e --- /dev/null +++ b/iti/mq/registry.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class MQProducerDefinition: + name: str + topic: str + value_format: str = "json" + config: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class MQConsumerDefinition: + name: str + topics: tuple[str, ...] + handler: Callable + group_id: str | None = None + value_format: str = "json" + failure_backoff_seconds: float | None = None + config: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class MQRegistry: + producers: dict[str, MQProducerDefinition] = field(default_factory=dict) + consumers: dict[str, MQConsumerDefinition] = field(default_factory=dict) + + def register_producer( + self, + *, + name: str, + topic: str, + value_format: str = "json", + config: dict[str, Any] | None = None, + ) -> MQProducerDefinition: + if not name: + raise ValueError("mq producer name is required") + if not topic: + raise ValueError("mq producer topic is required") + if name in self.producers: + raise ValueError(f"mq producer already registered: {name}") + definition = MQProducerDefinition( + name=name, + topic=topic, + value_format=value_format, + config=dict(config or {}), + ) + self.producers[name] = definition + return definition + + def register_consumer( + self, + *, + name: str, + topics: list[str] | tuple[str, ...] | str, + handler: Callable, + group_id: str | None = None, + value_format: str = "json", + failure_backoff_seconds: float | None = None, + config: dict[str, Any] | None = None, + ) -> MQConsumerDefinition: + if not name: + raise ValueError("mq consumer name is required") + topic_values = _normalize_topics(topics) + if not topic_values: + raise ValueError("mq consumer topics are required") + if name in self.consumers: + raise ValueError(f"mq consumer already registered: {name}") + definition = MQConsumerDefinition( + name=name, + topics=topic_values, + handler=handler, + group_id=group_id, + value_format=value_format, + failure_backoff_seconds=failure_backoff_seconds, + config=dict(config or {}), + ) + self.consumers[name] = definition + return definition + + +def _normalize_topics(topics: list[str] | tuple[str, ...] | str) -> tuple[str, ...]: + if isinstance(topics, str): + topics = (topics,) + return tuple(topic for topic in topics if topic) + + +mq_registry = MQRegistry() + + +def mq_consumer( + *topics: str, + name: str | None = None, + group_id: str | None = None, + value_format: str = "json", + failure_backoff_seconds: float | None = None, + config: dict[str, Any] | None = None, +): + def decorator(func: Callable) -> Callable: + consumer_name = name or ".".join(topics) or func.__name__ + mq_registry.register_consumer( + name=consumer_name, + topics=topics, + group_id=group_id, + handler=func, + value_format=value_format, + failure_backoff_seconds=failure_backoff_seconds, + config=config, + ) + return func + + return decorator diff --git a/iti/mq/runner.py b/iti/mq/runner.py new file mode 100644 index 0000000..4a754b3 --- /dev/null +++ b/iti/mq/runner.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from collections.abc import Coroutine +from inspect import isawaitable +from typing import Any + +from .errors import MQConfigError +from .message import MQMessage +from .registry import MQConsumerDefinition, MQRegistry +from .serialization import decode_message_key, decode_message_value + + +logger = logging.getLogger("iti.mq") + + +class MQConsumerRunner: + def __init__( + self, + app, + backend, + registry: MQRegistry, + *, + group_id: str | None = None, + failure_backoff_seconds: float = 1.0, + poll_timeout_seconds: float = 1.0, + ) -> None: + self.app = app + self.backend = backend + self.registry = registry + self.group_id = group_id + self.failure_backoff_seconds = failure_backoff_seconds + self.poll_timeout_seconds = poll_timeout_seconds + self._workers: list[_ConsumerWorker] = [] + + def start(self) -> None: + if self._workers: + return + pending_workers: list[_ConsumerWorker] = [] + try: + for definition in self.registry.consumers.values(): + group_id = definition.group_id or self.group_id + if not group_id: + raise MQConfigError(f"mq consumer {definition.name} missing group_id") + consumer = self.backend.create_consumer(group_id, definition.config) + consumer.subscribe(list(definition.topics)) + worker = _ConsumerWorker( + app=self.app, + consumer=consumer, + definition=definition, + failure_backoff_seconds=( + definition.failure_backoff_seconds + if definition.failure_backoff_seconds is not None + else self.failure_backoff_seconds + ), + poll_timeout_seconds=self.poll_timeout_seconds, + ) + pending_workers.append(worker) + except Exception: + for worker in pending_workers: + worker.close() + raise + self._workers = pending_workers + for worker in self._workers: + worker.start() + + def stop(self) -> None: + for worker in self._workers: + worker.stop() + for worker in self._workers: + worker.join() + self._workers.clear() + + +class _ConsumerWorker: + def __init__( + self, + *, + app, + consumer: Any, + definition: MQConsumerDefinition, + failure_backoff_seconds: float, + poll_timeout_seconds: float, + ) -> None: + self.app = app + self.consumer = consumer + self.definition = definition + self.failure_backoff_seconds = failure_backoff_seconds + self.poll_timeout_seconds = poll_timeout_seconds + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + def start(self) -> None: + if self._thread and self._thread.is_alive(): + return + self._thread = threading.Thread(target=self._loop, daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop.set() + + def join(self) -> None: + if self._thread: + self._thread.join(timeout=3) + self.close() + + def close(self) -> None: + self.consumer.close() + + def _loop(self) -> None: + while not self._stop.is_set(): + raw_message = self.consumer.poll(self.poll_timeout_seconds) + if raw_message is None: + continue + if raw_message.error(): + logger.warning("mq consumer error: %s", raw_message.error()) + continue + self._handle_raw_message(raw_message) + + def _handle_raw_message(self, raw_message: Any) -> None: + try: + message = self._build_message(raw_message) + result = self.definition.handler(message) + if isawaitable(result): + if not isinstance(result, Coroutine): + raise TypeError("mq async handler must return a coroutine") + asyncio.run(result) + self.consumer.commit(raw_message, asynchronous=False) + except Exception: + logger.exception( + "mq handler failed name=%s topic=%s partition=%s offset=%s", + self.definition.name, + _safe_call(raw_message, "topic"), + _safe_call(raw_message, "partition"), + _safe_call(raw_message, "offset"), + ) + self.consumer.seek(_seek_position(raw_message)) + self._stop.wait(self.failure_backoff_seconds) + + def _build_message(self, raw_message: Any) -> MQMessage: + raw_key = raw_message.key() + raw_value = raw_message.value() + return MQMessage( + app=self.app, + topic=raw_message.topic(), + partition=raw_message.partition(), + offset=raw_message.offset(), + key=decode_message_key(raw_key), + raw_key=raw_key, + value=decode_message_value(raw_value, self.definition.value_format), + raw_value=raw_value, + headers=_headers_to_dict(raw_message.headers()), + timestamp=raw_message.timestamp(), + raw_message=raw_message, + ) + + +def _headers_to_dict(headers: list[tuple[str, bytes | None]] | None) -> dict[str, bytes | None]: + return {key: value for key, value in headers or []} + + +def _safe_call(value: Any, method: str) -> Any: + try: + return getattr(value, method)() + except Exception: + return "-" + + +def _seek_position(raw_message: Any) -> Any: + try: + from confluent_kafka import TopicPartition + except ImportError: + return raw_message + return TopicPartition( + raw_message.topic(), + raw_message.partition(), + raw_message.offset(), + ) diff --git a/iti/mq/serialization.py b/iti/mq/serialization.py new file mode 100644 index 0000000..20e8bec --- /dev/null +++ b/iti/mq/serialization.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import json +from typing import Any + +from .errors import MQConfigError + + +SUPPORTED_VALUE_FORMATS = {"json", "bytes"} + + +def encode_message_value(value: Any, value_format: str = "json") -> bytes | None: + _validate_value_format(value_format) + if value is None: + return None + if value_format == "bytes": + if isinstance(value, bytes): + return value + raise TypeError("bytes mq value must be bytes") + return json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8") + + +def decode_message_value(value: bytes | None, value_format: str = "json") -> Any: + _validate_value_format(value_format) + if value is None: + return None + if value_format == "bytes": + return value + return json.loads(value.decode("utf-8")) + + +def encode_message_key(key: str | bytes | None) -> bytes | None: + if key is None or isinstance(key, bytes): + return key + return str(key).encode("utf-8") + + +def decode_message_key(key: bytes | None) -> str | None: + if key is None: + return None + return key.decode("utf-8") + + +def _validate_value_format(value_format: str) -> None: + if value_format not in SUPPORTED_VALUE_FORMATS: + supported = ", ".join(sorted(SUPPORTED_VALUE_FORMATS)) + raise MQConfigError(f"unsupported mq value_format: {value_format!r}, supported: {supported}") diff --git a/pyproject.toml b/pyproject.toml index 1ab9c61..fab4c9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ erp = ["pyodbc>=5.3.0"] excel = ["pandas>=2.3.3", "openpyxl>=3.1.5"] image = ["Pillow>=12.0.0"] postgres = ["psycopg[binary]>=3.2.0"] +mq-kafka = ["confluent-kafka>=2.8,<3"] prod = ["gunicorn>=22.0.0"] dev = [ "mypy>=1.0.0", @@ -110,6 +111,7 @@ files = [ "iti/auth", "iti/db", "iti/modules", + "iti/mq", "iti/responses", "iti/service_client", "iti/tasks", diff --git a/tests/test_config.py b/tests/test_config.py index d3cfb1a..67a4fd8 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -74,3 +74,35 @@ def test_base_config_can_be_overridden_for_unit_tests(): assert config.database_url == "sqlite+pysqlite:///:memory:" assert config.testing is True + + +def test_base_config_reads_mq_environment(monkeypatch): + monkeypatch.setenv("MQ_ENABLED", "true") + monkeypatch.setenv("MQ_BACKEND", "kafka") + monkeypatch.setenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092") + monkeypatch.setenv("KAFKA_GROUP_ID", "group-a") + monkeypatch.setenv("KAFKA_CLIENT_ID", "client-a") + monkeypatch.setenv("KAFKA_AUTO_OFFSET_RESET", "latest") + monkeypatch.setenv("MQ_FAILURE_BACKOFF_SECONDS", "2.5") + monkeypatch.setenv("MQ_POLL_TIMEOUT_SECONDS", "0.5") + + config = BaseConfig(database_url="sqlite+pysqlite:///:memory:") + + assert config.mq_enabled is True + assert config.mq == { + "backend": "kafka", + "bootstrap_servers": "kafka:9092", + "group_id": "group-a", + "client_id": "client-a", + "auto_offset_reset": "latest", + "failure_backoff_seconds": 2.5, + "poll_timeout_seconds": 0.5, + } + + +def test_test_config_keeps_mq_disabled_by_default(monkeypatch): + monkeypatch.setenv("MQ_ENABLED", "true") + + config = FrameworkTestConfig() + + assert config.mq_enabled is False diff --git a/tests/test_modules.py b/tests/test_modules.py index 20f934c..71b5398 100644 --- a/tests/test_modules.py +++ b/tests/test_modules.py @@ -34,3 +34,45 @@ def test_create_app_runs_module_registration(): registry = app.state.iti_modules assert registry.permissions["demo.read"].name == "Demo Read" assert registry.list_menu_seeds()[0].id == "demo" + + +def test_create_app_runs_register_mq_between_tasks_and_routes(): + phases = [] + + class MQModule: + name = "mq-demo" + + def init_app(self, app): + phases.append("init_app") + + def register_tasks(self, app): + phases.append("register_tasks") + + def register_mq(self, app): + phases.append("register_mq") + + def register_routes(self, app): + phases.append("register_routes") + + def register_permissions(self, app): + phases.append("register_permissions") + + def register_menu_seed(self, app): + phases.append("register_menu_seed") + + create_app( + modules=[MQModule()], + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + exchange_enabled=False, + ), + ) + + assert phases == [ + "init_app", + "register_tasks", + "register_mq", + "register_routes", + "register_permissions", + "register_menu_seed", + ] diff --git a/tests/test_mq.py b/tests/test_mq.py new file mode 100644 index 0000000..86f31a4 --- /dev/null +++ b/tests/test_mq.py @@ -0,0 +1,383 @@ +import asyncio +import builtins +import sys + +import pytest + +from iti import create_app +from iti.config import BaseConfig +from iti.mq import MQConfigError, init_mq, mq_client, mq_consumer +from iti.mq.backend import KafkaBackend +from iti.mq.registry import MQRegistry as RegistryClass + + +class FakeProducer: + def __init__(self, config): + self.config = config + self.produced = [] + self.polled = [] + self.flushed = [] + + def poll(self, timeout): + self.polled.append(timeout) + + def produce(self, topic, *, value=None, key=None, headers=None): + self.produced.append( + {"topic": topic, "value": value, "key": key, "headers": headers} + ) + + def flush(self, timeout=None): + self.flushed.append(timeout) + + +class FakeConsumer: + def __init__(self, messages=None): + self.messages = list(messages or []) + self.subscribed = [] + self.committed = [] + self.sought = [] + self.closed = False + self.config = None + + def subscribe(self, topics): + self.subscribed.append(topics) + + def poll(self, timeout): + if self.messages: + return self.messages.pop(0) + return None + + def commit(self, message, asynchronous=False): + self.committed.append((message, asynchronous)) + + def seek(self, position): + if all(hasattr(position, name) for name in ("topic", "partition", "offset")): + self.sought.append( + ( + call_or_value(position, "topic"), + call_or_value(position, "partition"), + call_or_value(position, "offset"), + ) + ) + return + self.sought.append(position) + + def close(self): + self.closed = True + + +class FakeMessage: + def __init__( + self, + *, + topic="demo.topic", + partition=0, + offset=1, + key=b"k1", + value=b'{"ok":true}', + headers=None, + ): + self._topic = topic + self._partition = partition + self._offset = offset + self._key = key + self._value = value + self._headers = headers or [("source", b"test")] + + def topic(self): + return self._topic + + def partition(self): + return self._partition + + def offset(self): + return self._offset + + def key(self): + return self._key + + def value(self): + return self._value + + def headers(self): + return self._headers + + def timestamp(self): + return (0, 0) + + def error(self): + return None + + +def test_mq_registry_registers_decorator_and_explicit_consumer(): + registry = RegistryClass() + registry.register_consumer( + name="explicit", + topics=["demo.explicit"], + group_id="g1", + handler=lambda message: None, + ) + + assert registry.consumers["explicit"].topics == ("demo.explicit",) + + before = set(mq_consumer_registry_names()) + + try: + @mq_consumer("demo.decorated", name="decorated-test", group_id="g1") + def decorated(message): + return None + + assert "decorated-test" in mq_consumer_registry_names() - before + assert decorated.__name__ == "decorated" + finally: + from iti.mq import mq_registry + + mq_registry.consumers.pop("decorated-test", None) + + +def test_mq_registry_rejects_duplicate_names(): + registry = RegistryClass() + registry.register_producer(name="events", topic="demo.events") + registry.register_consumer( + name="consumer", + topics="demo.events", + handler=lambda message: None, + ) + + with pytest.raises(ValueError, match="producer already registered"): + registry.register_producer(name="events", topic="other") + with pytest.raises(ValueError, match="consumer already registered"): + registry.register_consumer( + name="consumer", + topics="other", + handler=lambda message: None, + ) + + +def test_mq_enabled_false_does_not_import_or_configure_kafka(monkeypatch): + sys.modules.pop("confluent_kafka", None) + + app = create_app( + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + mq_enabled=False, + exchange_enabled=False, + ) + ) + + assert not hasattr(app.state, "iti_mq_client") + assert "confluent_kafka" not in sys.modules + + +def test_mq_enabled_true_without_dependency_raises_install_hint(monkeypatch): + sys.modules.pop("confluent_kafka", None) + real_import = builtins.__import__ + + def missing_confluent_kafka(name, *args, **kwargs): + if name == "confluent_kafka": + raise ImportError("missing") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", missing_confluent_kafka) + + with pytest.raises(MQConfigError, match=r"iti-flask\[mq-kafka\]"): + create_app( + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + mq_enabled=True, + mq={"backend": "kafka", "bootstrap_servers": "127.0.0.1:9092"}, + exchange_enabled=False, + ) + ) + + +def test_mq_client_sends_json_bytes_and_registered_sender(): + producer = FakeProducer({"bootstrap.servers": "localhost:9092"}) + registry = RegistryClass() + registry.register_producer(name="events", topic="demo.events") + app = create_app( + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + mq_enabled=False, + exchange_enabled=False, + ) + ) + init_mq( + app, + {"backend": "kafka", "bootstrap_servers": "localhost:9092"}, + registry=registry, + producer_factory=lambda config: producer, + ) + + client = mq_client(app) + client.send_json("demo.raw", {"id": "1"}, key="k1", headers={"h": "v"}) + client.send("demo.bytes", value=b"raw", key=b"k2") + client.sender("events").send_json({"id": "2"}) + client.flush(2) + + assert producer.produced[0] == { + "topic": "demo.raw", + "value": b'{"id":"1"}', + "key": b"k1", + "headers": [("h", b"v")], + } + assert producer.produced[1]["value"] == b"raw" + assert producer.produced[1]["key"] == b"k2" + assert producer.produced[2]["topic"] == "demo.events" + assert producer.flushed == [2] + + +def test_runner_commits_after_successful_sync_handler(): + handled = [] + message = FakeMessage() + fake_consumer = FakeConsumer([message]) + app = make_mq_app( + registry_with_consumer(lambda item: handled.append(item.value)), + fake_consumer, + ) + runner = app.state.iti_mq_runner + + runner.start() + wait_until(lambda: bool(fake_consumer.committed)) + runner.stop() + + assert handled == [{"ok": True}] + assert fake_consumer.committed == [(message, False)] + assert fake_consumer.sought == [] + assert fake_consumer.closed is True + + +def test_runner_executes_async_handler(): + handled = [] + + async def handler(message): + await asyncio.sleep(0) + handled.append(message.key) + + fake_consumer = FakeConsumer([FakeMessage()]) + app = make_mq_app(registry_with_consumer(handler), fake_consumer) + runner = app.state.iti_mq_runner + + runner.start() + wait_until(lambda: bool(fake_consumer.committed)) + runner.stop() + + assert handled == ["k1"] + + +def test_runner_does_not_commit_and_seeks_after_handler_failure(): + message = FakeMessage() + fake_consumer = FakeConsumer([message]) + + def handler(_message): + raise RuntimeError("boom") + + app = make_mq_app(registry_with_consumer(handler), fake_consumer) + runner = app.state.iti_mq_runner + + runner.start() + wait_until(lambda: bool(fake_consumer.sought)) + runner.stop() + + assert fake_consumer.committed == [] + assert fake_consumer.sought == [("demo.topic", 0, 1)] + + +def test_runner_raises_when_group_id_missing(): + registry = RegistryClass() + registry.register_consumer( + name="demo", + topics="demo.topic", + handler=lambda message: None, + ) + app = make_mq_app(registry, FakeConsumer()) + + with pytest.raises(MQConfigError, match="missing group_id"): + app.state.iti_mq_runner.start() + + +def test_kafka_backend_forces_manual_commit(): + backend = KafkaBackend( + { + "bootstrap_servers": "localhost:9092", + "group_id": "global", + "auto_offset_reset": "latest", + "consumer": {"enable.auto.commit": True}, + }, + producer_factory=lambda config: FakeProducer(config), + consumer_factory=lambda config: FakeConsumer(), + ) + + config = backend.consumer_config("g1") + + assert config["bootstrap.servers"] == "localhost:9092" + assert config["group.id"] == "g1" + assert config["auto.offset.reset"] == "latest" + assert config["enable.auto.commit"] is False + + +def mq_consumer_registry_names(): + from iti.mq import mq_registry + + return set(mq_registry.consumers.keys()) + + +def registry_with_consumer(handler): + registry = RegistryClass() + registry.register_consumer( + name="demo", + topics="demo.topic", + group_id="g1", + handler=handler, + ) + return registry + + +def make_mq_app(registry, fake_consumer): + app = create_app( + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + mq_enabled=False, + exchange_enabled=False, + ) + ) + + def consumer_factory(config): + fake_consumer.config = config + return fake_consumer + + init_mq( + app, + { + "backend": "kafka", + "bootstrap_servers": "localhost:9092", + "failure_backoff_seconds": 0.01, + "poll_timeout_seconds": 0.01, + }, + registry=registry, + producer_factory=lambda config: FakeProducer(config), + consumer_factory=consumer_factory, + ) + return app + + +def wait_until(predicate, timeout=1.0): + loop = asyncio.new_event_loop() + try: + deadline = loop.time() + timeout + while loop.time() < deadline: + if predicate(): + return + loop.run_until_complete(asyncio.sleep(0.01)) + finally: + loop.close() + raise AssertionError("condition not reached") + + +def call_or_value(value, name): + attr = getattr(value, name) + return attr() if callable(attr) else attr