You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/docs/MQ.md

146 lines
2.8 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# MQ
iTi-Flask 提供通用 MQ 入口。
v1 内置 Kafka backend底层使用 `confluent-kafka`
## 安装
Kafka 客户端是可选依赖:
```bash
uv add "iti-flask[mq-kafka]"
```
未启用 MQ 的项目不需要安装 Kafka 依赖。
## 配置
MQ 默认关闭。
```python
class DevConfig(BaseDevConfig):
def __init__(self) -> None:
super().__init__()
self.mq_enabled = True
self.mq = {
"backend": "kafka",
"bootstrap_servers": "127.0.0.1:9092",
"group_id": "my-app",
"client_id": "my-app-dev",
"auto_offset_reset": "earliest",
"failure_backoff_seconds": 1.0,
}
```
也可以使用环境变量:
```bash
MQ_ENABLED=true
MQ_BACKEND=kafka
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092
KAFKA_GROUP_ID=my-app
KAFKA_CLIENT_ID=my-app-dev
KAFKA_AUTO_OFFSET_RESET=earliest
MQ_FAILURE_BACKOFF_SECONDS=1
MQ_POLL_TIMEOUT_SECONDS=1
```
## 发送消息
```python
from iti.mq import mq_client
mq_client(app).send_json("mes.work_order.events", {"id": "MO001"}, key="MO001")
mq_client(app).send("raw.topic", value=b"bytes", key=b"k")
```
也可以先注册 producer
```python
from iti.mq import mq_registry
mq_registry.register_producer(
name="work-order-events",
topic="mes.work_order.events",
)
mq_client(app).sender("work-order-events").send_json({"id": "MO001"})
```
## 监听消息
使用装饰器:
```python
from iti.mq import MQMessage, mq_consumer
@mq_consumer("mes.work_order.created", group_id="hsyh-mes")
async def handle_work_order(message: MQMessage) -> None:
payload = message.value
```
也可以显式注册:
```python
from iti.mq import mq_registry
def handle_work_order(message):
payload = message.value
mq_registry.register_consumer(
name="work-order-created",
topics=["mes.work_order.created"],
group_id="hsyh-mes",
handler=handle_work_order,
)
```
业务模块也可以在 `register_mq(app)` 阶段注册 consumer 和 producer。
## 消息格式
默认 `value_format="json"`
框架用 UTF-8 JSON 解码 `message.value`,并保留 `message.raw_value`
`value_format="bytes"` 时,`message.value` 是原始 bytes。
`MQMessage` 包含:
- `app`
- `topic`
- `partition`
- `offset`
- `key`
- `raw_key`
- `value`
- `raw_value`
- `headers`
- `timestamp`
- `raw_message`
## 消费语义
Kafka consumer 强制设置:
```python
enable.auto.commit = False
```
handler 正常结束后,框架同步提交当前消息 offset。
handler 抛错或 JSON 解码失败时,框架记录日志、不提交 offset、seek 回当前消息,并按 `failure_backoff_seconds` 等待后重试。
每个 consumer definition 启动一个线程,单线程顺序处理消息。
## 不支持
v1 不支持:
- DLQ。
- 批量消费。
- exactly-once 事务。
- 多 backend。
- Kafka topic 自动创建。