diff --git a/.codex/skills/iti-flask-framework/SKILL.md b/.codex/skills/iti-flask-framework/SKILL.md index b930fdf..817c723 100644 --- a/.codex/skills/iti-flask-framework/SKILL.md +++ b/.codex/skills/iti-flask-framework/SKILL.md @@ -26,7 +26,7 @@ iTi-Flask 是 FastAPI 后端框架基座。 - `iti/modules/*`:模块协议、权限元数据、菜单 seed 元数据。 - `iti/responses/*`:envelope、raw response 逃逸。 - `iti/service_client/*`:同步 HTTP JSON 客户端和注册表。 -- `iti/mq/*`:通用 MQ 入口,v1 Kafka backend,生产者、消费者和 runner。 +- `iti/mq/*`:通用 MQ 入口,v1 Kafka backend,生产者、消费者、offset store 和 runner。 - `iti/tasks/*`:单进程任务注册和 runner。 - `iti/audit.py`:审计事件发送器,不拥有系统日志表。 - `iti/storage/*`:存储后端接口和实现。 @@ -42,7 +42,9 @@ iTi-Flask 是 FastAPI 后端框架基座。 - `/docs` 是文档入口,按 `docs_ui_enabled` 展示 Swagger、Scalar、ReDoc 等已启用 UI。 - 模块元数据使用 `ModulePermission` 和 `ModuleMenuSeed`。 - MQ 默认关闭;Kafka 客户端依赖放在 `mq-kafka` extra,不放核心依赖。 -- MQ 消费成功后提交 offset;handler 失败不提交,seek 回当前消息并按 backoff 重试。 +- MQ 默认 `subscribe` 模式,消费成功后提交 broker offset;handler 失败不提交,seek 回当前消息并按 backoff 重试。 +- `assign` 模式用于 Kafka consumer group / commit 异常场景,按 partition 直接消费并把 offset 写入本地 store。 +- MQ 相关环境变量包括 `MQ_CONSUMER_MODE`、`MQ_OFFSET_STORE_PATH`。 - migration 归生成项目所有。框架不要静默接管业务项目 migration 流。 - 审计保持异步、非阻塞。框架只发事件,接收方在框架外。 - 不为未发生的需求加宽泛兼容层。 diff --git a/README.md b/README.md index eb6bce9..2fab120 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ iticli install ```toml dependencies = [ - "iti-flask @ git+https://git.noahlan.cn/iti-framework/iTi-Flask.git@v0.4.0", + "iti-flask @ git+https://git.noahlan.cn/iti-framework/iTi-Flask.git@v0.4.1", ] ``` @@ -118,14 +118,14 @@ iticli help ```bash iticli release -iticli release v0.4.0 +iticli release v0.4.1 ``` Windows: ```bat iticli release -iticli release v0.4.0 +iticli release v0.4.1 ``` ## 文档 diff --git a/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja b/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja index 592c9fa..9c79870 100644 --- a/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja +++ b/copier-template/.codex/skills/{{ project_slug | lower | replace('_', '-') }}-project/SKILL.md.jinja @@ -64,6 +64,7 @@ description: "{{ project_name }} 业务项目 skill。用于当前由 iTi-Flask - 服务间内部 API 使用 service token。 - 项目级测试使用 `fastapi.testclient.TestClient`。 - 请求体使用 Pydantic schema。 +- Kafka 默认走框架 `subscribe` 模式;如果 broker consumer group / commit 异常,可在 `.env` 里切到 `MQ_CONSUMER_MODE=assign`,并使用本地 offset store。 {{ "- seed 前先同步 iTi-System migration。\n" if include_system else "" }} ## 命令 diff --git a/copier-template/.env.example.jinja b/copier-template/.env.example.jinja index 174edce..51a39eb 100644 --- a/copier-template/.env.example.jinja +++ b/copier-template/.env.example.jinja @@ -25,5 +25,7 @@ KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_GROUP_ID={{ project_slug | lower | replace('-', '_') }} KAFKA_CLIENT_ID={{ project_slug | lower | replace('-', '_') }} KAFKA_AUTO_OFFSET_RESET=earliest +MQ_CONSUMER_MODE=subscribe +MQ_OFFSET_STORE_PATH=runtime/mq_offsets.sqlite MQ_FAILURE_BACKOFF_SECONDS=1 MQ_POLL_TIMEOUT_SECONDS=1 diff --git a/copier-template/README.md.jinja b/copier-template/README.md.jinja index 2558f18..b708b3a 100644 --- a/copier-template/README.md.jinja +++ b/copier-template/README.md.jinja @@ -42,6 +42,24 @@ iticli run dev iticli test ``` +MQ 默认关闭。 +使用 Kafka 时先安装带 `mq-kafka` extra 的框架依赖,并在 `.env` 里设置: + +```bash +MQ_ENABLED=true +MQ_BACKEND=kafka +KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 +KAFKA_GROUP_ID={{ project_slug | lower | replace('-', '_') }} +MQ_CONSUMER_MODE=subscribe +``` + +如果 Kafka broker 的 consumer group / offset commit 不可用,可切到: + +```bash +MQ_CONSUMER_MODE=assign +MQ_OFFSET_STORE_PATH=runtime/mq_offsets.sqlite +``` + 不同环境把环境名作为 `run` 的第一个参数。 迁移命令使用 `--env` 指定环境: diff --git a/copier-template/pyproject.toml.jinja b/copier-template/pyproject.toml.jinja index d340bcd..df8043f 100644 --- a/copier-template/pyproject.toml.jinja +++ b/copier-template/pyproject.toml.jinja @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "{{ project_slug | replace('_', '-') }}" -version = "0.4.0" +version = "0.4.1" description = "{{ project_name }}" readme = "README.md" requires-python = ">=3.11" diff --git a/copier.yml b/copier.yml index 44a6107..3443bfe 100644 --- a/copier.yml +++ b/copier.yml @@ -26,7 +26,7 @@ framework_git: framework_tag: type: str help: iTi-Flask Git tag - default: v0.4.0 + default: v0.4.1 include_system: type: bool @@ -49,4 +49,4 @@ system_git: system_tag: type: str help: iTi-System Git tag - default: v0.4.0 + default: v0.4.1 diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 54aad44..5bdfc85 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -127,6 +127,8 @@ KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_GROUP_ID=my-app KAFKA_CLIENT_ID=my-app-dev KAFKA_AUTO_OFFSET_RESET=earliest +MQ_CONSUMER_MODE=subscribe +MQ_OFFSET_STORE_PATH=runtime/mq_offsets.sqlite MQ_FAILURE_BACKOFF_SECONDS=1 MQ_POLL_TIMEOUT_SECONDS=1 ``` @@ -139,5 +141,6 @@ self.mq = { "backend": "kafka", "bootstrap_servers": "127.0.0.1:9092", "group_id": "my-app", + "consumer_mode": "subscribe", } ``` diff --git a/docs/COPIER_TEMPLATE.md b/docs/COPIER_TEMPLATE.md index e298b5a..eb4945b 100644 --- a/docs/COPIER_TEMPLATE.md +++ b/docs/COPIER_TEMPLATE.md @@ -149,6 +149,9 @@ iticli template update 模板拥有的文件包括 `main.py`、`app/app_factory.py`、`config.py`、`Dockerfile`、`docker-compose.yml`、`docker-compose.with-db.yml`、`.dockerignore`、`.env.example`、`.vscode/launch.json`、`pyproject.toml`、`migrations/`、示例模块、示例测试、README 和项目 skill。 业务项目自己的模块、模型、API 文档和业务 README 由业务项目维护。 +Kafka 相关环境变量也由模板渲染进 `.env.example`,包括: +`MQ_ENABLED`、`MQ_BACKEND`、`KAFKA_BOOTSTRAP_SERVERS`、`KAFKA_GROUP_ID`、`KAFKA_CLIENT_ID`、`KAFKA_AUTO_OFFSET_RESET`、`MQ_CONSUMER_MODE`、`MQ_OFFSET_STORE_PATH`。 + ## Docker 模板生成: diff --git a/docs/MODULES.md b/docs/MODULES.md index 29d5efe..1c51e0f 100644 --- a/docs/MODULES.md +++ b/docs/MODULES.md @@ -110,3 +110,16 @@ def register_mq(self, app): handler=handle_event, ) ``` + +如果目标 Kafka 的 consumer group 不可用,也可以用: + +```python +def register_mq(self, app): + mq_registry.register_consumer( + name="example-events-direct", + topics=["example.events"], + mode="assign", + partitions="all", + handler=handle_event, + ) +``` diff --git a/docs/MQ.md b/docs/MQ.md index a046120..7e4195e 100644 --- a/docs/MQ.md +++ b/docs/MQ.md @@ -41,6 +41,8 @@ KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_GROUP_ID=my-app KAFKA_CLIENT_ID=my-app-dev KAFKA_AUTO_OFFSET_RESET=earliest +MQ_CONSUMER_MODE=subscribe +MQ_OFFSET_STORE_PATH=runtime/mq_offsets.sqlite MQ_FAILURE_BACKOFF_SECONDS=1 MQ_POLL_TIMEOUT_SECONDS=1 ``` @@ -80,6 +82,8 @@ async def handle_work_order(message: MQMessage) -> None: payload = message.value ``` +默认 `mode="subscribe"`,依赖 Kafka consumer group。 + 也可以显式注册: ```python @@ -100,6 +104,32 @@ mq_registry.register_consumer( 业务模块也可以在 `register_mq(app)` 阶段注册 consumer 和 producer。 +如果 Kafka broker 的 consumer group / offset commit 异常,也可以切到 `assign` 模式: + +```python +from iti.mq import mq_registry + + +mq_registry.register_consumer( + name="work-order-created-direct", + topics=["mes.work_order.created"], + mode="assign", + partitions="all", + offset_store={"type": "sqlite", "path": "runtime/mq_offsets.sqlite"}, + handler=handle_work_order, +) +``` + +或者全局启用: + +```python +self.mq = { + "backend": "kafka", + "bootstrap_servers": "127.0.0.1:9092", + "consumer_mode": "assign", +} +``` + ## 消息格式 默认 `value_format="json"`。 @@ -129,11 +159,18 @@ Kafka consumer 强制设置: enable.auto.commit = False ``` -handler 正常结束后,框架同步提交当前消息 offset。 +`subscribe` 模式下,handler 正常结束后框架同步提交当前消息 offset。 handler 抛错或 JSON 解码失败时,框架记录日志、不提交 offset、seek 回当前消息,并按 `failure_backoff_seconds` 等待后重试。 +`assign` 模式下,框架不走 Kafka consumer group,不调用 broker offset commit。 +handler 成功后把下一 offset 写入本地 offset store,默认路径是 `runtime/mq_offsets.sqlite`。 +如果 broker 返回 offset out of range,框架按 `auto_offset_reset` 重置到 `earliest` 或 `latest`。 + 每个 consumer definition 启动一个线程,单线程顺序处理消息。 +`assign` 模式适合当前 broker group 协议不可用的场景。 +它不做自动 rebalance;多实例部署时需要业务方自己分配 partition,或者接受重复消费风险。 + ## 不支持 v1 不支持: diff --git a/iti/__about__.py b/iti/__about__.py index 937c64b..bbb740c 100644 --- a/iti/__about__.py +++ b/iti/__about__.py @@ -1,4 +1,4 @@ # SPDX-FileCopyrightText: 2025-present NoahLan <6995syu@163.com> # # SPDX-License-Identifier: MIT -__version__ = "0.4.0" +__version__ = "0.4.1" diff --git a/iti/config.py b/iti/config.py index c010985..4eb91b9 100644 --- a/iti/config.py +++ b/iti/config.py @@ -174,6 +174,8 @@ class BaseConfig: "group_id": os.getenv("KAFKA_GROUP_ID"), "client_id": os.getenv("KAFKA_CLIENT_ID"), "auto_offset_reset": os.getenv("KAFKA_AUTO_OFFSET_RESET", "earliest"), + "consumer_mode": os.getenv("MQ_CONSUMER_MODE", "subscribe"), + "offset_store_path": os.getenv("MQ_OFFSET_STORE_PATH"), "failure_backoff_seconds": env_float("MQ_FAILURE_BACKOFF_SECONDS", 1.0), "poll_timeout_seconds": env_float("MQ_POLL_TIMEOUT_SECONDS", 1.0), } diff --git a/iti/mq/__init__.py b/iti/mq/__init__.py index bcc3cd6..07c795e 100644 --- a/iti/mq/__init__.py +++ b/iti/mq/__init__.py @@ -6,6 +6,7 @@ from .backend import KafkaBackend from .client import MQClient, MQSender, mq_client from .errors import MQConfigError, MQError, MQPublishError from .message import MQMessage +from .offset_store import MemoryMQOffsetStore, MQOffsetStore, SQLiteMQOffsetStore from .registry import ( MQConsumerDefinition, MQProducerDefinition, @@ -48,6 +49,9 @@ def init_mq( backend, registry, group_id=config.get("group_id"), + consumer_mode=config.get("consumer_mode", "subscribe"), + offset_store_config=config.get("offset_store"), + offset_store_path=config.get("offset_store_path"), failure_backoff_seconds=float(config.get("failure_backoff_seconds", 1.0)), poll_timeout_seconds=float(config.get("poll_timeout_seconds", 1.0)), ) @@ -65,10 +69,13 @@ __all__ = [ "MQConsumerRunner", "MQError", "MQMessage", + "MQOffsetStore", "MQProducerDefinition", "MQPublishError", "MQRegistry", "MQSender", + "MemoryMQOffsetStore", + "SQLiteMQOffsetStore", "get_mq_registry", "init_mq", "mq_client", diff --git a/iti/mq/backend.py b/iti/mq/backend.py index 577f263..f9c4cb3 100644 --- a/iti/mq/backend.py +++ b/iti/mq/backend.py @@ -1,10 +1,14 @@ from __future__ import annotations +import logging from typing import Any from .errors import MQConfigError +logger = logging.getLogger("iti.mq") + + class KafkaBackend: def __init__( self, @@ -46,7 +50,10 @@ class KafkaBackend: bootstrap_servers = self.config.get("bootstrap_servers") if not bootstrap_servers: raise MQConfigError("mq kafka bootstrap_servers is required") - common = {"bootstrap.servers": bootstrap_servers} + common = { + "bootstrap.servers": bootstrap_servers, + "error_cb": _log_kafka_error, + } client_id = self.config.get("client_id") if client_id: common["client.id"] = client_id @@ -70,3 +77,7 @@ class KafkaBackend: "confluent-kafka is required for kafka mq; install iti-flask[mq-kafka]" ) from exc return Consumer + + +def _log_kafka_error(error) -> None: + logger.warning("mq kafka client error: %s", error) diff --git a/iti/mq/client.py b/iti/mq/client.py index 0b78c4c..80ee5a0 100644 --- a/iti/mq/client.py +++ b/iti/mq/client.py @@ -19,8 +19,16 @@ class MQClient: *, key: str | bytes | None = None, headers: dict[str, str | bytes | None] | None = None, + flush: bool | float = False, ) -> None: - self.send(topic, value=value, key=key, headers=headers, value_format="json") + self.send( + topic, + value=value, + key=key, + headers=headers, + value_format="json", + flush=flush, + ) def send( self, @@ -30,6 +38,7 @@ class MQClient: key: str | bytes | None = None, headers: dict[str, str | bytes | None] | None = None, value_format: str = "bytes", + flush: bool | float = False, ) -> None: self._producer.poll(0) self._producer.produce( @@ -38,6 +47,8 @@ class MQClient: key=encode_message_key(key), headers=_encode_headers(headers), ) + if flush: + self.flush(None if flush is True else float(flush)) def sender(self, name: str) -> "MQSender": definition = self._registry.producers.get(name) @@ -64,6 +75,7 @@ class MQSender: *, key: str | bytes | None = None, headers: dict[str, str | bytes | None] | None = None, + flush: bool | float = False, ) -> None: self._client.send( self.topic, @@ -71,6 +83,7 @@ class MQSender: key=key, headers=headers, value_format="json", + flush=flush, ) def send( @@ -79,6 +92,7 @@ class MQSender: *, key: str | bytes | None = None, headers: dict[str, str | bytes | None] | None = None, + flush: bool | float = False, ) -> None: self._client.send( self.topic, @@ -86,6 +100,7 @@ class MQSender: key=key, headers=headers, value_format=self.value_format, + flush=flush, ) diff --git a/iti/mq/offset_store.py b/iti/mq/offset_store.py new file mode 100644 index 0000000..4cef953 --- /dev/null +++ b/iti/mq/offset_store.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import sqlite3 +import threading +from pathlib import Path +from typing import Protocol + + +class MQOffsetStore(Protocol): + def get(self, consumer_name: str, topic: str, partition: int) -> int | None: + """Return the next offset to consume.""" + + def set(self, consumer_name: str, topic: str, partition: int, offset: int) -> None: + """Persist the next offset to consume.""" + + def close(self) -> None: + """Release store resources.""" + + +class SQLiteMQOffsetStore: + def __init__(self, path: str | Path) -> None: + self.path = Path(path) + self.path.parent.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() + self._conn = sqlite3.connect(self.path, check_same_thread=False) + self._closed = False + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS mq_offsets ( + consumer_name TEXT NOT NULL, + topic TEXT NOT NULL, + partition INTEGER NOT NULL, + offset INTEGER NOT NULL, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (consumer_name, topic, partition) + ) + """ + ) + self._conn.commit() + + def get(self, consumer_name: str, topic: str, partition: int) -> int | None: + with self._lock: + if self._closed: + return None + row = self._conn.execute( + """ + SELECT offset + FROM mq_offsets + WHERE consumer_name = ? AND topic = ? AND partition = ? + """, + (consumer_name, topic, partition), + ).fetchone() + if row is None: + return None + return int(row[0]) + + def set(self, consumer_name: str, topic: str, partition: int, offset: int) -> None: + with self._lock: + if self._closed: + return + self._conn.execute( + """ + INSERT INTO mq_offsets (consumer_name, topic, partition, offset, updated_at) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(consumer_name, topic, partition) + DO UPDATE SET offset = excluded.offset, updated_at = CURRENT_TIMESTAMP + """, + (consumer_name, topic, partition, offset), + ) + self._conn.commit() + + def close(self) -> None: + with self._lock: + if self._closed: + return + self._conn.close() + self._closed = True + + +class MemoryMQOffsetStore: + def __init__(self) -> None: + self._offsets: dict[tuple[str, str, int], int] = {} + self._lock = threading.Lock() + + def get(self, consumer_name: str, topic: str, partition: int) -> int | None: + with self._lock: + return self._offsets.get((consumer_name, topic, partition)) + + def set(self, consumer_name: str, topic: str, partition: int, offset: int) -> None: + with self._lock: + self._offsets[(consumer_name, topic, partition)] = offset + + def close(self) -> None: + return None + + +def create_offset_store(config: dict | None, *, default_path: str | Path) -> MQOffsetStore: + config = dict(config or {}) + store_type = str(config.get("type", "sqlite")).strip().lower() + if store_type == "memory": + return MemoryMQOffsetStore() + if store_type == "sqlite": + return SQLiteMQOffsetStore(config.get("path") or default_path) + raise ValueError(f"unsupported mq offset store type: {store_type!r}") diff --git a/iti/mq/registry.py b/iti/mq/registry.py index f91ad6e..acd32ec 100644 --- a/iti/mq/registry.py +++ b/iti/mq/registry.py @@ -19,6 +19,9 @@ class MQConsumerDefinition: topics: tuple[str, ...] handler: Callable group_id: str | None = None + mode: str | None = None + partitions: Any = "all" + offset_store: dict[str, Any] = field(default_factory=dict) value_format: str = "json" failure_backoff_seconds: float | None = None config: dict[str, Any] = field(default_factory=dict) @@ -59,6 +62,9 @@ class MQRegistry: topics: list[str] | tuple[str, ...] | str, handler: Callable, group_id: str | None = None, + mode: str | None = None, + partitions: Any = "all", + offset_store: dict[str, Any] | None = None, value_format: str = "json", failure_backoff_seconds: float | None = None, config: dict[str, Any] | None = None, @@ -68,6 +74,9 @@ class MQRegistry: topic_values = _normalize_topics(topics) if not topic_values: raise ValueError("mq consumer topics are required") + mode = _normalize_mode(mode) if mode is not None else None + if mode == "assign" and partitions != "all" and not partitions: + raise ValueError("mq assign consumer partitions are required") if name in self.consumers: raise ValueError(f"mq consumer already registered: {name}") definition = MQConsumerDefinition( @@ -75,6 +84,9 @@ class MQRegistry: topics=topic_values, handler=handler, group_id=group_id, + mode=mode, + partitions=partitions, + offset_store=dict(offset_store or {}), value_format=value_format, failure_backoff_seconds=failure_backoff_seconds, config=dict(config or {}), @@ -89,6 +101,13 @@ def _normalize_topics(topics: list[str] | tuple[str, ...] | str) -> tuple[str, . return tuple(topic for topic in topics if topic) +def _normalize_mode(mode: str) -> str: + mode = str(mode or "subscribe").strip().lower() + if mode not in {"subscribe", "assign"}: + raise ValueError("mq consumer mode must be 'subscribe' or 'assign'") + return mode + + mq_registry = MQRegistry() @@ -96,6 +115,9 @@ def mq_consumer( *topics: str, name: str | None = None, group_id: str | None = None, + mode: str | None = None, + partitions: Any = "all", + offset_store: dict[str, Any] | None = None, value_format: str = "json", failure_backoff_seconds: float | None = None, config: dict[str, Any] | None = None, @@ -106,6 +128,9 @@ def mq_consumer( name=consumer_name, topics=topics, group_id=group_id, + mode=mode, + partitions=partitions, + offset_store=offset_store, handler=func, value_format=value_format, failure_backoff_seconds=failure_backoff_seconds, diff --git a/iti/mq/runner.py b/iti/mq/runner.py index 4a754b3..ad92834 100644 --- a/iti/mq/runner.py +++ b/iti/mq/runner.py @@ -5,11 +5,14 @@ import logging import threading import time from collections.abc import Coroutine +from dataclasses import dataclass from inspect import isawaitable +from pathlib import Path from typing import Any from .errors import MQConfigError from .message import MQMessage +from .offset_store import MQOffsetStore, create_offset_store from .registry import MQConsumerDefinition, MQRegistry from .serialization import decode_message_key, decode_message_value @@ -25,6 +28,9 @@ class MQConsumerRunner: registry: MQRegistry, *, group_id: str | None = None, + consumer_mode: str = "subscribe", + offset_store_config: dict[str, Any] | None = None, + offset_store_path: str | Path | None = None, failure_backoff_seconds: float = 1.0, poll_timeout_seconds: float = 1.0, ) -> None: @@ -32,6 +38,9 @@ class MQConsumerRunner: self.backend = backend self.registry = registry self.group_id = group_id + self.consumer_mode = _normalize_consumer_mode(consumer_mode) + self.offset_store_config = dict(offset_store_config or {}) + self.offset_store_path = offset_store_path self.failure_backoff_seconds = failure_backoff_seconds self.poll_timeout_seconds = poll_timeout_seconds self._workers: list[_ConsumerWorker] = [] @@ -43,14 +52,34 @@ class MQConsumerRunner: try: for definition in self.registry.consumers.values(): group_id = definition.group_id or self.group_id - if not group_id: - raise MQConfigError(f"mq consumer {definition.name} missing group_id") - consumer = self.backend.create_consumer(group_id, definition.config) - consumer.subscribe(list(definition.topics)) + mode = definition.mode or self.consumer_mode + if mode == "assign": + consumer = self.backend.create_consumer(group_id or definition.name, definition.config) + offset_store = self._create_offset_store(definition) + self._assign_consumer(consumer, definition, offset_store) + logger.info( + "mq consumer started name=%s mode=assign topics=%s", + definition.name, + ",".join(definition.topics), + ) + else: + if not group_id: + raise MQConfigError(f"mq consumer {definition.name} missing group_id") + consumer = self.backend.create_consumer(group_id, definition.config) + consumer.subscribe(list(definition.topics)) + offset_store = None + logger.info( + "mq consumer started name=%s mode=subscribe group_id=%s topics=%s", + definition.name, + group_id, + ",".join(definition.topics), + ) worker = _ConsumerWorker( app=self.app, consumer=consumer, definition=definition, + offset_store=offset_store, + auto_offset_reset=str(self.backend.config.get("auto_offset_reset", "earliest")), failure_backoff_seconds=( definition.failure_backoff_seconds if definition.failure_backoff_seconds is not None @@ -74,6 +103,53 @@ class MQConsumerRunner: worker.join() self._workers.clear() + def _create_offset_store(self, definition: MQConsumerDefinition) -> MQOffsetStore: + config = dict(self.offset_store_config) + config.update(definition.offset_store) + default_path = self.offset_store_path + if default_path is None: + base_dir = getattr(getattr(self.app, "state", None), "config", None) + base_dir = getattr(base_dir, "base_dir", Path.cwd()) + default_path = Path(base_dir) / "runtime" / "mq_offsets.sqlite" + try: + return create_offset_store( + config, + default_path=default_path, + ) + except ValueError as exc: + raise MQConfigError(str(exc)) from exc + + def _assign_consumer( + self, + consumer: Any, + definition: MQConsumerDefinition, + offset_store: MQOffsetStore, + ) -> None: + partitions = _resolve_partitions(consumer, definition) + if not partitions: + raise MQConfigError(f"mq consumer {definition.name} has no partitions to assign") + consumer.assign( + [ + self._topic_partition(definition.name, topic, partition, offset_store) + for topic, partition in partitions + ] + ) + + def _topic_partition( + self, + consumer_name: str, + topic: str, + partition: int, + offset_store: MQOffsetStore, + ) -> Any: + offset = offset_store.get(consumer_name, topic, partition) + try: + from confluent_kafka import TopicPartition + except ImportError: + logger.debug("confluent-kafka unavailable; using test topic partition fallback") + return _AssignedPartition(topic, partition, offset if offset is not None else _auto_offset(self)) + return TopicPartition(topic, partition, offset if offset is not None else _auto_offset(self)) + class _ConsumerWorker: def __init__( @@ -82,12 +158,16 @@ class _ConsumerWorker: app, consumer: Any, definition: MQConsumerDefinition, + offset_store: MQOffsetStore | None, + auto_offset_reset: str, failure_backoff_seconds: float, poll_timeout_seconds: float, ) -> None: self.app = app self.consumer = consumer self.definition = definition + self.offset_store = offset_store + self.auto_offset_reset = auto_offset_reset self.failure_backoff_seconds = failure_backoff_seconds self.poll_timeout_seconds = poll_timeout_seconds self._stop = threading.Event() @@ -109,6 +189,8 @@ class _ConsumerWorker: def close(self) -> None: self.consumer.close() + if self.offset_store is not None: + self.offset_store.close() def _loop(self) -> None: while not self._stop.is_set(): @@ -117,6 +199,13 @@ class _ConsumerWorker: continue if raw_message.error(): logger.warning("mq consumer error: %s", raw_message.error()) + _handle_offset_error( + self.consumer, + raw_message, + self.definition, + self.offset_store, + self.auto_offset_reset, + ) continue self._handle_raw_message(raw_message) @@ -128,7 +217,15 @@ class _ConsumerWorker: if not isinstance(result, Coroutine): raise TypeError("mq async handler must return a coroutine") asyncio.run(result) - self.consumer.commit(raw_message, asynchronous=False) + if self.offset_store is None: + self.consumer.commit(raw_message, asynchronous=False) + else: + self.offset_store.set( + self.definition.name, + raw_message.topic(), + raw_message.partition(), + raw_message.offset() + 1, + ) except Exception: logger.exception( "mq handler failed name=%s topic=%s partition=%s offset=%s", @@ -179,3 +276,86 @@ def _seek_position(raw_message: Any) -> Any: raw_message.partition(), raw_message.offset(), ) + + +def _resolve_partitions(consumer: Any, definition: MQConsumerDefinition) -> list[tuple[str, int]]: + if definition.partitions == "all": + metadata = consumer.list_topics(timeout=10) + result: list[tuple[str, int]] = [] + for topic in definition.topics: + topic_metadata = metadata.topics.get(topic) + if topic_metadata is None or getattr(topic_metadata, "error", None): + raise MQConfigError(f"mq topic metadata unavailable: {topic}") + result.extend((topic, partition) for partition in sorted(topic_metadata.partitions)) + return result + if isinstance(definition.partitions, dict): + return [ + (topic, int(partition)) + for topic, partitions in definition.partitions.items() + for partition in partitions + ] + result = [] + for value in definition.partitions: + if isinstance(value, tuple) and len(value) == 2: + result.append((str(value[0]), int(value[1]))) + continue + if isinstance(value, dict): + result.append((str(value["topic"]), int(value["partition"]))) + continue + raise MQConfigError(f"unsupported mq partitions item: {value!r}") + return result + + +def _auto_offset(runner: MQConsumerRunner) -> int: + try: + from confluent_kafka import OFFSET_BEGINNING, OFFSET_END + except ImportError: + OFFSET_BEGINNING = -2 + OFFSET_END = -1 + return OFFSET_END if str(runner.backend.config.get("auto_offset_reset")) == "latest" else OFFSET_BEGINNING + + +def _handle_offset_error( + consumer: Any, + raw_message: Any, + definition: MQConsumerDefinition, + offset_store: MQOffsetStore | None, + auto_offset_reset: str, +) -> None: + if offset_store is None: + return + error = raw_message.error() + code = error.code() if hasattr(error, "code") else None + try: + from confluent_kafka import KafkaError, TopicPartition + except ImportError: + return + if code != KafkaError._OFFSET_OUT_OF_RANGE: + return + topic = raw_message.topic() + partition = raw_message.partition() + try: + low, high = consumer.get_watermark_offsets( + TopicPartition(topic, partition), + timeout=10, + cached=False, + ) + except Exception: + low = 0 + offset = high if auto_offset_reset == "latest" else low + offset_store.set(definition.name, topic, partition, offset) + consumer.seek(TopicPartition(topic, partition, offset)) + + +def _normalize_consumer_mode(mode: str) -> str: + mode = str(mode or "subscribe").strip().lower() + if mode not in {"subscribe", "assign"}: + raise MQConfigError("mq consumer_mode must be 'subscribe' or 'assign'") + return mode + + +@dataclass(frozen=True) +class _AssignedPartition: + topic: str + partition: int + offset: int diff --git a/tests/test_config.py b/tests/test_config.py index 67a4fd8..4b06662 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -95,6 +95,8 @@ def test_base_config_reads_mq_environment(monkeypatch): "group_id": "group-a", "client_id": "client-a", "auto_offset_reset": "latest", + "consumer_mode": "subscribe", + "offset_store_path": None, "failure_backoff_seconds": 2.5, "poll_timeout_seconds": 0.5, } diff --git a/tests/test_mq.py b/tests/test_mq.py index 86f31a4..ca9e15a 100644 --- a/tests/test_mq.py +++ b/tests/test_mq.py @@ -1,6 +1,7 @@ import asyncio import builtins import sys +from pathlib import Path import pytest @@ -8,6 +9,7 @@ from iti import create_app from iti.config import BaseConfig from iti.mq import MQConfigError, init_mq, mq_client, mq_consumer from iti.mq.backend import KafkaBackend +from iti.mq.offset_store import MemoryMQOffsetStore, SQLiteMQOffsetStore from iti.mq.registry import MQRegistry as RegistryClass @@ -34,15 +36,32 @@ class FakeConsumer: def __init__(self, messages=None): self.messages = list(messages or []) self.subscribed = [] + self.assigned = [] self.committed = [] self.sought = [] self.closed = False self.config = None + self.raise_offset_out_of_range_once = False def subscribe(self, topics): self.subscribed.append(topics) + def assign(self, partitions): + self.assigned.append( + [ + ( + call_or_value(partition, "topic"), + call_or_value(partition, "partition"), + call_or_value(partition, "offset"), + ) + for partition in partitions + ] + ) + def poll(self, timeout): + if self.raise_offset_out_of_range_once: + self.raise_offset_out_of_range_once = False + return FakeErrorMessage(topic="demo.topic", partition=0) if self.messages: return self.messages.pop(0) return None @@ -65,6 +84,65 @@ class FakeConsumer: def close(self): self.closed = True + def list_topics(self, timeout=10): + return FakeMetadata( + { + "demo.topic": FakeTopicMetadata([0, 1]), + "demo.other": FakeTopicMetadata([2]), + } + ) + + def get_watermark_offsets(self, partition, timeout=10, cached=False): + return (5, 11) + + +class FakeKafkaError: + _OFFSET_OUT_OF_RANGE = 1 + + def __init__(self, code=1, message="offset out of range"): + self._code = code + self._message = message + + def code(self): + return self._code + + def __str__(self): + return self._message + + +class FakeErrorMessage: + def __init__(self, *, topic="demo.topic", partition=0): + self._topic = topic + self._partition = partition + self._error = FakeKafkaError() + + def error(self): + return self._error + + def topic(self): + return self._topic + + def partition(self): + return self._partition + + +class FakeTopicPartition: + def __init__(self, topic, partition, offset=None): + self.topic = topic + self.partition = partition + self.offset = offset + + +class FakeTopicMetadata: + def __init__(self, partitions, error=None): + self.partitions = {partition: object() for partition in partitions} + self.error = error + + +class FakeMetadata: + def __init__(self, topics): + self.topics = topics + class FakeMessage: def __init__( @@ -230,6 +308,28 @@ def test_mq_client_sends_json_bytes_and_registered_sender(): assert producer.flushed == [2] +def test_mq_client_supports_inline_flush(): + producer = FakeProducer({"bootstrap.servers": "localhost:9092"}) + app = create_app( + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + mq_enabled=False, + exchange_enabled=False, + ) + ) + init_mq( + app, + {"backend": "kafka", "bootstrap_servers": "localhost:9092"}, + registry=RegistryClass(), + producer_factory=lambda config: producer, + ) + + mq_client(app).send_json("demo.flush", {"id": "1"}, flush=True) + + assert producer.flushed == [None] + + def test_runner_commits_after_successful_sync_handler(): handled = [] message = FakeMessage() @@ -319,6 +419,139 @@ def test_kafka_backend_forces_manual_commit(): assert config["enable.auto.commit"] is False +def test_runner_assign_mode_uses_topic_metadata_and_offset_store(monkeypatch, tmp_path): + monkeypatch_confluent(monkeypatch) + handled = [] + fake_consumer = FakeConsumer([FakeMessage(topic="demo.topic", partition=1, offset=7)]) + registry = RegistryClass() + registry.register_consumer( + name="assign-demo", + topics="demo.topic", + mode="assign", + handler=lambda message: handled.append((message.partition, message.offset)), + ) + + app = create_app( + config_mapping=BaseConfig( + database_url="sqlite+pysqlite:///:memory:", + testing=True, + exchange_enabled=False, + ) + ) + init_mq( + app, + { + "backend": "kafka", + "bootstrap_servers": "localhost:9092", + "consumer_mode": "assign", + "offset_store": {"type": "memory"}, + "failure_backoff_seconds": 0.01, + "poll_timeout_seconds": 0.01, + }, + registry=registry, + producer_factory=lambda config: FakeProducer(config), + consumer_factory=lambda config: fake_consumer, + ) + + runner = app.state.iti_mq_runner + runner.start() + wait_until(lambda: handled == [(1, 7)]) + store = runner._workers[0].offset_store + runner.stop() + + assert fake_consumer.assigned == [[("demo.topic", 0, -2), ("demo.topic", 1, -2)]] + assert isinstance(store, MemoryMQOffsetStore) + assert store.get("assign-demo", "demo.topic", 1) == 8 + + +def test_runner_assign_mode_uses_global_consumer_mode(monkeypatch): + monkeypatch_confluent(monkeypatch) + fake_consumer = FakeConsumer() + registry = RegistryClass() + registry.register_consumer( + name="assign-global", + topics="demo.topic", + handler=lambda message: None, + ) + app = make_mq_app( + registry, + fake_consumer, + config_overrides={ + "consumer_mode": "assign", + "offset_store": {"type": "memory"}, + }, + ) + + app.state.iti_mq_runner.start() + app.state.iti_mq_runner.stop() + + assert fake_consumer.assigned == [[("demo.topic", 0, -2), ("demo.topic", 1, -2)]] + assert fake_consumer.subscribed == [] + + +def test_runner_assign_mode_rejects_invalid_offset_store(monkeypatch): + monkeypatch_confluent(monkeypatch) + fake_consumer = FakeConsumer() + registry = RegistryClass() + registry.register_consumer( + name="assign-invalid-store", + topics="demo.topic", + mode="assign", + handler=lambda message: None, + ) + app = make_mq_app( + registry, + fake_consumer, + config_overrides={ + "consumer_mode": "assign", + "offset_store": {"type": "unsupported"}, + }, + ) + + with pytest.raises(MQConfigError, match="unsupported mq offset store type"): + app.state.iti_mq_runner.start() + + +def test_runner_assign_mode_recovers_from_offset_out_of_range(monkeypatch): + monkeypatch_confluent(monkeypatch) + fake_consumer = FakeConsumer([FakeMessage(topic="demo.topic", partition=0, offset=5)]) + fake_consumer.raise_offset_out_of_range_once = True + registry = RegistryClass() + registry.register_consumer( + name="assign-recover", + topics="demo.topic", + mode="assign", + handler=lambda message: None, + ) + app = make_mq_app( + registry, + fake_consumer, + config_overrides={ + "consumer_mode": "assign", + "offset_store": {"type": "memory"}, + "auto_offset_reset": "latest", + }, + ) + + runner = app.state.iti_mq_runner + runner.start() + wait_until(lambda: ("demo.topic", 0, 11) in fake_consumer.sought) + runner.stop() + + assert ("demo.topic", 0, 11) in fake_consumer.sought + + +def test_sqlite_offset_store_round_trip(tmp_path): + path = tmp_path / "mq-offsets.sqlite" + store = SQLiteMQOffsetStore(path) + try: + store.set("demo", "topic", 1, 9) + assert store.get("demo", "topic", 1) == 9 + finally: + store.close() + assert Path(path).exists() + + def mq_consumer_registry_names(): from iti.mq import mq_registry @@ -336,7 +569,7 @@ def registry_with_consumer(handler): return registry -def make_mq_app(registry, fake_consumer): +def make_mq_app(registry, fake_consumer, config_overrides=None): app = create_app( config_mapping=BaseConfig( database_url="sqlite+pysqlite:///:memory:", @@ -350,14 +583,18 @@ def make_mq_app(registry, fake_consumer): fake_consumer.config = config return fake_consumer + config = { + "backend": "kafka", + "bootstrap_servers": "localhost:9092", + "failure_backoff_seconds": 0.01, + "poll_timeout_seconds": 0.01, + } + if config_overrides: + config.update(config_overrides) + init_mq( app, - { - "backend": "kafka", - "bootstrap_servers": "localhost:9092", - "failure_backoff_seconds": 0.01, - "poll_timeout_seconds": 0.01, - }, + config, registry=registry, producer_factory=lambda config: FakeProducer(config), consumer_factory=consumer_factory, @@ -381,3 +618,15 @@ def wait_until(predicate, timeout=1.0): def call_or_value(value, name): attr = getattr(value, name) return attr() if callable(attr) else attr + + +def monkeypatch_confluent(monkeypatch): + import iti.mq.runner as runner_module + + monkeypatch.setattr(runner_module, "_auto_offset", lambda runner: -1 if runner.backend.config.get("auto_offset_reset") == "latest" else -2) + monkeypatch.setitem(sys.modules, "confluent_kafka", type("FakeConfluent", (), { + "TopicPartition": FakeTopicPartition, + "KafkaError": FakeKafkaError, + "OFFSET_BEGINNING": -2, + "OFFSET_END": -1, + }))