You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/tests/test_mq.py

633 lines
17 KiB
Python

import asyncio
import builtins
import sys
from pathlib import Path
import pytest
from iti import create_app
from iti.config import BaseConfig
from iti.mq import MQConfigError, init_mq, mq_client, mq_consumer
from iti.mq.backend import KafkaBackend
from iti.mq.offset_store import MemoryMQOffsetStore, SQLiteMQOffsetStore
from iti.mq.registry import MQRegistry as RegistryClass
class FakeProducer:
def __init__(self, config):
self.config = config
self.produced = []
self.polled = []
self.flushed = []
def poll(self, timeout):
self.polled.append(timeout)
def produce(self, topic, *, value=None, key=None, headers=None):
self.produced.append(
{"topic": topic, "value": value, "key": key, "headers": headers}
)
def flush(self, timeout=None):
self.flushed.append(timeout)
class FakeConsumer:
def __init__(self, messages=None):
self.messages = list(messages or [])
self.subscribed = []
self.assigned = []
self.committed = []
self.sought = []
self.closed = False
self.config = None
self.raise_offset_out_of_range_once = False
def subscribe(self, topics):
self.subscribed.append(topics)
def assign(self, partitions):
self.assigned.append(
[
(
call_or_value(partition, "topic"),
call_or_value(partition, "partition"),
call_or_value(partition, "offset"),
)
for partition in partitions
]
)
def poll(self, timeout):
if self.raise_offset_out_of_range_once:
self.raise_offset_out_of_range_once = False
return FakeErrorMessage(topic="demo.topic", partition=0)
if self.messages:
return self.messages.pop(0)
return None
def commit(self, message, asynchronous=False):
self.committed.append((message, asynchronous))
def seek(self, position):
if all(hasattr(position, name) for name in ("topic", "partition", "offset")):
self.sought.append(
(
call_or_value(position, "topic"),
call_or_value(position, "partition"),
call_or_value(position, "offset"),
)
)
return
self.sought.append(position)
def close(self):
self.closed = True
def list_topics(self, timeout=10):
return FakeMetadata(
{
"demo.topic": FakeTopicMetadata([0, 1]),
"demo.other": FakeTopicMetadata([2]),
}
)
def get_watermark_offsets(self, partition, timeout=10, cached=False):
return (5, 11)
class FakeKafkaError:
_OFFSET_OUT_OF_RANGE = 1
def __init__(self, code=1, message="offset out of range"):
self._code = code
self._message = message
def code(self):
return self._code
def __str__(self):
return self._message
class FakeErrorMessage:
def __init__(self, *, topic="demo.topic", partition=0):
self._topic = topic
self._partition = partition
self._error = FakeKafkaError()
def error(self):
return self._error
def topic(self):
return self._topic
def partition(self):
return self._partition
class FakeTopicPartition:
def __init__(self, topic, partition, offset=None):
self.topic = topic
self.partition = partition
self.offset = offset
class FakeTopicMetadata:
def __init__(self, partitions, error=None):
self.partitions = {partition: object() for partition in partitions}
self.error = error
class FakeMetadata:
def __init__(self, topics):
self.topics = topics
class FakeMessage:
def __init__(
self,
*,
topic="demo.topic",
partition=0,
offset=1,
key=b"k1",
value=b'{"ok":true}',
headers=None,
):
self._topic = topic
self._partition = partition
self._offset = offset
self._key = key
self._value = value
self._headers = headers or [("source", b"test")]
def topic(self):
return self._topic
def partition(self):
return self._partition
def offset(self):
return self._offset
def key(self):
return self._key
def value(self):
return self._value
def headers(self):
return self._headers
def timestamp(self):
return (0, 0)
def error(self):
return None
def test_mq_registry_registers_decorator_and_explicit_consumer():
registry = RegistryClass()
registry.register_consumer(
name="explicit",
topics=["demo.explicit"],
group_id="g1",
handler=lambda message: None,
)
assert registry.consumers["explicit"].topics == ("demo.explicit",)
before = set(mq_consumer_registry_names())
try:
@mq_consumer("demo.decorated", name="decorated-test", group_id="g1")
def decorated(message):
return None
assert "decorated-test" in mq_consumer_registry_names() - before
assert decorated.__name__ == "decorated"
finally:
from iti.mq import mq_registry
mq_registry.consumers.pop("decorated-test", None)
def test_mq_registry_rejects_duplicate_names():
registry = RegistryClass()
registry.register_producer(name="events", topic="demo.events")
registry.register_consumer(
name="consumer",
topics="demo.events",
handler=lambda message: None,
)
with pytest.raises(ValueError, match="producer already registered"):
registry.register_producer(name="events", topic="other")
with pytest.raises(ValueError, match="consumer already registered"):
registry.register_consumer(
name="consumer",
topics="other",
handler=lambda message: None,
)
def test_mq_enabled_false_does_not_import_or_configure_kafka(monkeypatch):
sys.modules.pop("confluent_kafka", None)
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
assert not hasattr(app.state, "iti_mq_client")
assert "confluent_kafka" not in sys.modules
def test_mq_enabled_true_without_dependency_raises_install_hint(monkeypatch):
sys.modules.pop("confluent_kafka", None)
real_import = builtins.__import__
def missing_confluent_kafka(name, *args, **kwargs):
if name == "confluent_kafka":
raise ImportError("missing")
return real_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", missing_confluent_kafka)
with pytest.raises(MQConfigError, match=r"iti-flask\[mq-kafka\]"):
create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=True,
mq={"backend": "kafka", "bootstrap_servers": "127.0.0.1:9092"},
exchange_enabled=False,
)
)
def test_mq_client_sends_json_bytes_and_registered_sender():
producer = FakeProducer({"bootstrap.servers": "localhost:9092"})
registry = RegistryClass()
registry.register_producer(name="events", topic="demo.events")
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
init_mq(
app,
{"backend": "kafka", "bootstrap_servers": "localhost:9092"},
registry=registry,
producer_factory=lambda config: producer,
)
client = mq_client(app)
client.send_json("demo.raw", {"id": "1"}, key="k1", headers={"h": "v"})
client.send("demo.bytes", value=b"raw", key=b"k2")
client.sender("events").send_json({"id": "2"})
client.flush(2)
assert producer.produced[0] == {
"topic": "demo.raw",
"value": b'{"id":"1"}',
"key": b"k1",
"headers": [("h", b"v")],
}
assert producer.produced[1]["value"] == b"raw"
assert producer.produced[1]["key"] == b"k2"
assert producer.produced[2]["topic"] == "demo.events"
assert producer.flushed == [2]
def test_mq_client_supports_inline_flush():
producer = FakeProducer({"bootstrap.servers": "localhost:9092"})
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
init_mq(
app,
{"backend": "kafka", "bootstrap_servers": "localhost:9092"},
registry=RegistryClass(),
producer_factory=lambda config: producer,
)
mq_client(app).send_json("demo.flush", {"id": "1"}, flush=True)
assert producer.flushed == [None]
def test_runner_commits_after_successful_sync_handler():
handled = []
message = FakeMessage()
fake_consumer = FakeConsumer([message])
app = make_mq_app(
registry_with_consumer(lambda item: handled.append(item.value)),
fake_consumer,
)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: bool(fake_consumer.committed))
runner.stop()
assert handled == [{"ok": True}]
assert fake_consumer.committed == [(message, False)]
assert fake_consumer.sought == []
assert fake_consumer.closed is True
def test_runner_executes_async_handler():
handled = []
async def handler(message):
await asyncio.sleep(0)
handled.append(message.key)
fake_consumer = FakeConsumer([FakeMessage()])
app = make_mq_app(registry_with_consumer(handler), fake_consumer)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: bool(fake_consumer.committed))
runner.stop()
assert handled == ["k1"]
def test_runner_does_not_commit_and_seeks_after_handler_failure():
message = FakeMessage()
fake_consumer = FakeConsumer([message])
def handler(_message):
raise RuntimeError("boom")
app = make_mq_app(registry_with_consumer(handler), fake_consumer)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: bool(fake_consumer.sought))
runner.stop()
assert fake_consumer.committed == []
assert fake_consumer.sought == [("demo.topic", 0, 1)]
def test_runner_raises_when_group_id_missing():
registry = RegistryClass()
registry.register_consumer(
name="demo",
topics="demo.topic",
handler=lambda message: None,
)
app = make_mq_app(registry, FakeConsumer())
with pytest.raises(MQConfigError, match="missing group_id"):
app.state.iti_mq_runner.start()
def test_kafka_backend_forces_manual_commit():
backend = KafkaBackend(
{
"bootstrap_servers": "localhost:9092",
"group_id": "global",
"auto_offset_reset": "latest",
"consumer": {"enable.auto.commit": True},
},
producer_factory=lambda config: FakeProducer(config),
consumer_factory=lambda config: FakeConsumer(),
)
config = backend.consumer_config("g1")
assert config["bootstrap.servers"] == "localhost:9092"
assert config["group.id"] == "g1"
assert config["auto.offset.reset"] == "latest"
assert config["enable.auto.commit"] is False
def test_runner_assign_mode_uses_topic_metadata_and_offset_store(monkeypatch, tmp_path):
monkeypatch_confluent(monkeypatch)
handled = []
fake_consumer = FakeConsumer([FakeMessage(topic="demo.topic", partition=1, offset=7)])
registry = RegistryClass()
registry.register_consumer(
name="assign-demo",
topics="demo.topic",
mode="assign",
handler=lambda message: handled.append((message.partition, message.offset)),
)
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
exchange_enabled=False,
)
)
init_mq(
app,
{
"backend": "kafka",
"bootstrap_servers": "localhost:9092",
"consumer_mode": "assign",
"offset_store": {"type": "memory"},
"failure_backoff_seconds": 0.01,
"poll_timeout_seconds": 0.01,
},
registry=registry,
producer_factory=lambda config: FakeProducer(config),
consumer_factory=lambda config: fake_consumer,
)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: handled == [(1, 7)])
store = runner._workers[0].offset_store
runner.stop()
assert fake_consumer.assigned == [[("demo.topic", 0, -2), ("demo.topic", 1, -2)]]
assert isinstance(store, MemoryMQOffsetStore)
assert store.get("assign-demo", "demo.topic", 1) == 8
def test_runner_assign_mode_uses_global_consumer_mode(monkeypatch):
monkeypatch_confluent(monkeypatch)
fake_consumer = FakeConsumer()
registry = RegistryClass()
registry.register_consumer(
name="assign-global",
topics="demo.topic",
handler=lambda message: None,
)
app = make_mq_app(
registry,
fake_consumer,
config_overrides={
"consumer_mode": "assign",
"offset_store": {"type": "memory"},
},
)
app.state.iti_mq_runner.start()
app.state.iti_mq_runner.stop()
assert fake_consumer.assigned == [[("demo.topic", 0, -2), ("demo.topic", 1, -2)]]
assert fake_consumer.subscribed == []
def test_runner_assign_mode_rejects_invalid_offset_store(monkeypatch):
monkeypatch_confluent(monkeypatch)
fake_consumer = FakeConsumer()
registry = RegistryClass()
registry.register_consumer(
name="assign-invalid-store",
topics="demo.topic",
mode="assign",
handler=lambda message: None,
)
app = make_mq_app(
registry,
fake_consumer,
config_overrides={
"consumer_mode": "assign",
"offset_store": {"type": "unsupported"},
},
)
with pytest.raises(MQConfigError, match="unsupported mq offset store type"):
app.state.iti_mq_runner.start()
def test_runner_assign_mode_recovers_from_offset_out_of_range(monkeypatch):
monkeypatch_confluent(monkeypatch)
fake_consumer = FakeConsumer([FakeMessage(topic="demo.topic", partition=0, offset=5)])
fake_consumer.raise_offset_out_of_range_once = True
registry = RegistryClass()
registry.register_consumer(
name="assign-recover",
topics="demo.topic",
mode="assign",
handler=lambda message: None,
)
app = make_mq_app(
registry,
fake_consumer,
config_overrides={
"consumer_mode": "assign",
"offset_store": {"type": "memory"},
"auto_offset_reset": "latest",
},
)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: ("demo.topic", 0, 11) in fake_consumer.sought)
runner.stop()
assert ("demo.topic", 0, 11) in fake_consumer.sought
def test_sqlite_offset_store_round_trip(tmp_path):
path = tmp_path / "mq-offsets.sqlite"
store = SQLiteMQOffsetStore(path)
try:
store.set("demo", "topic", 1, 9)
assert store.get("demo", "topic", 1) == 9
finally:
store.close()
assert Path(path).exists()
def mq_consumer_registry_names():
from iti.mq import mq_registry
return set(mq_registry.consumers.keys())
def registry_with_consumer(handler):
registry = RegistryClass()
registry.register_consumer(
name="demo",
topics="demo.topic",
group_id="g1",
handler=handler,
)
return registry
def make_mq_app(registry, fake_consumer, config_overrides=None):
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
def consumer_factory(config):
fake_consumer.config = config
return fake_consumer
config = {
"backend": "kafka",
"bootstrap_servers": "localhost:9092",
"failure_backoff_seconds": 0.01,
"poll_timeout_seconds": 0.01,
}
if config_overrides:
config.update(config_overrides)
init_mq(
app,
config,
registry=registry,
producer_factory=lambda config: FakeProducer(config),
consumer_factory=consumer_factory,
)
return app
def wait_until(predicate, timeout=1.0):
loop = asyncio.new_event_loop()
try:
deadline = loop.time() + timeout
while loop.time() < deadline:
if predicate():
return
loop.run_until_complete(asyncio.sleep(0.01))
finally:
loop.close()
raise AssertionError("condition not reached")
def call_or_value(value, name):
attr = getattr(value, name)
return attr() if callable(attr) else attr
def monkeypatch_confluent(monkeypatch):
import iti.mq.runner as runner_module
monkeypatch.setattr(runner_module, "_auto_offset", lambda runner: -1 if runner.backend.config.get("auto_offset_reset") == "latest" else -2)
monkeypatch.setitem(sys.modules, "confluent_kafka", type("FakeConfluent", (), {
"TopicPartition": FakeTopicPartition,
"KafkaError": FakeKafkaError,
"OFFSET_BEGINNING": -2,
"OFFSET_END": -1,
}))