You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/tests/test_mq.py

384 lines
10 KiB
Python

import asyncio
import builtins
import sys
import pytest
from iti import create_app
from iti.config import BaseConfig
from iti.mq import MQConfigError, init_mq, mq_client, mq_consumer
from iti.mq.backend import KafkaBackend
from iti.mq.registry import MQRegistry as RegistryClass
class FakeProducer:
def __init__(self, config):
self.config = config
self.produced = []
self.polled = []
self.flushed = []
def poll(self, timeout):
self.polled.append(timeout)
def produce(self, topic, *, value=None, key=None, headers=None):
self.produced.append(
{"topic": topic, "value": value, "key": key, "headers": headers}
)
def flush(self, timeout=None):
self.flushed.append(timeout)
class FakeConsumer:
def __init__(self, messages=None):
self.messages = list(messages or [])
self.subscribed = []
self.committed = []
self.sought = []
self.closed = False
self.config = None
def subscribe(self, topics):
self.subscribed.append(topics)
def poll(self, timeout):
if self.messages:
return self.messages.pop(0)
return None
def commit(self, message, asynchronous=False):
self.committed.append((message, asynchronous))
def seek(self, position):
if all(hasattr(position, name) for name in ("topic", "partition", "offset")):
self.sought.append(
(
call_or_value(position, "topic"),
call_or_value(position, "partition"),
call_or_value(position, "offset"),
)
)
return
self.sought.append(position)
def close(self):
self.closed = True
class FakeMessage:
def __init__(
self,
*,
topic="demo.topic",
partition=0,
offset=1,
key=b"k1",
value=b'{"ok":true}',
headers=None,
):
self._topic = topic
self._partition = partition
self._offset = offset
self._key = key
self._value = value
self._headers = headers or [("source", b"test")]
def topic(self):
return self._topic
def partition(self):
return self._partition
def offset(self):
return self._offset
def key(self):
return self._key
def value(self):
return self._value
def headers(self):
return self._headers
def timestamp(self):
return (0, 0)
def error(self):
return None
def test_mq_registry_registers_decorator_and_explicit_consumer():
registry = RegistryClass()
registry.register_consumer(
name="explicit",
topics=["demo.explicit"],
group_id="g1",
handler=lambda message: None,
)
assert registry.consumers["explicit"].topics == ("demo.explicit",)
before = set(mq_consumer_registry_names())
try:
@mq_consumer("demo.decorated", name="decorated-test", group_id="g1")
def decorated(message):
return None
assert "decorated-test" in mq_consumer_registry_names() - before
assert decorated.__name__ == "decorated"
finally:
from iti.mq import mq_registry
mq_registry.consumers.pop("decorated-test", None)
def test_mq_registry_rejects_duplicate_names():
registry = RegistryClass()
registry.register_producer(name="events", topic="demo.events")
registry.register_consumer(
name="consumer",
topics="demo.events",
handler=lambda message: None,
)
with pytest.raises(ValueError, match="producer already registered"):
registry.register_producer(name="events", topic="other")
with pytest.raises(ValueError, match="consumer already registered"):
registry.register_consumer(
name="consumer",
topics="other",
handler=lambda message: None,
)
def test_mq_enabled_false_does_not_import_or_configure_kafka(monkeypatch):
sys.modules.pop("confluent_kafka", None)
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
assert not hasattr(app.state, "iti_mq_client")
assert "confluent_kafka" not in sys.modules
def test_mq_enabled_true_without_dependency_raises_install_hint(monkeypatch):
sys.modules.pop("confluent_kafka", None)
real_import = builtins.__import__
def missing_confluent_kafka(name, *args, **kwargs):
if name == "confluent_kafka":
raise ImportError("missing")
return real_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", missing_confluent_kafka)
with pytest.raises(MQConfigError, match=r"iti-flask\[mq-kafka\]"):
create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=True,
mq={"backend": "kafka", "bootstrap_servers": "127.0.0.1:9092"},
exchange_enabled=False,
)
)
def test_mq_client_sends_json_bytes_and_registered_sender():
producer = FakeProducer({"bootstrap.servers": "localhost:9092"})
registry = RegistryClass()
registry.register_producer(name="events", topic="demo.events")
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
init_mq(
app,
{"backend": "kafka", "bootstrap_servers": "localhost:9092"},
registry=registry,
producer_factory=lambda config: producer,
)
client = mq_client(app)
client.send_json("demo.raw", {"id": "1"}, key="k1", headers={"h": "v"})
client.send("demo.bytes", value=b"raw", key=b"k2")
client.sender("events").send_json({"id": "2"})
client.flush(2)
assert producer.produced[0] == {
"topic": "demo.raw",
"value": b'{"id":"1"}',
"key": b"k1",
"headers": [("h", b"v")],
}
assert producer.produced[1]["value"] == b"raw"
assert producer.produced[1]["key"] == b"k2"
assert producer.produced[2]["topic"] == "demo.events"
assert producer.flushed == [2]
def test_runner_commits_after_successful_sync_handler():
handled = []
message = FakeMessage()
fake_consumer = FakeConsumer([message])
app = make_mq_app(
registry_with_consumer(lambda item: handled.append(item.value)),
fake_consumer,
)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: bool(fake_consumer.committed))
runner.stop()
assert handled == [{"ok": True}]
assert fake_consumer.committed == [(message, False)]
assert fake_consumer.sought == []
assert fake_consumer.closed is True
def test_runner_executes_async_handler():
handled = []
async def handler(message):
await asyncio.sleep(0)
handled.append(message.key)
fake_consumer = FakeConsumer([FakeMessage()])
app = make_mq_app(registry_with_consumer(handler), fake_consumer)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: bool(fake_consumer.committed))
runner.stop()
assert handled == ["k1"]
def test_runner_does_not_commit_and_seeks_after_handler_failure():
message = FakeMessage()
fake_consumer = FakeConsumer([message])
def handler(_message):
raise RuntimeError("boom")
app = make_mq_app(registry_with_consumer(handler), fake_consumer)
runner = app.state.iti_mq_runner
runner.start()
wait_until(lambda: bool(fake_consumer.sought))
runner.stop()
assert fake_consumer.committed == []
assert fake_consumer.sought == [("demo.topic", 0, 1)]
def test_runner_raises_when_group_id_missing():
registry = RegistryClass()
registry.register_consumer(
name="demo",
topics="demo.topic",
handler=lambda message: None,
)
app = make_mq_app(registry, FakeConsumer())
with pytest.raises(MQConfigError, match="missing group_id"):
app.state.iti_mq_runner.start()
def test_kafka_backend_forces_manual_commit():
backend = KafkaBackend(
{
"bootstrap_servers": "localhost:9092",
"group_id": "global",
"auto_offset_reset": "latest",
"consumer": {"enable.auto.commit": True},
},
producer_factory=lambda config: FakeProducer(config),
consumer_factory=lambda config: FakeConsumer(),
)
config = backend.consumer_config("g1")
assert config["bootstrap.servers"] == "localhost:9092"
assert config["group.id"] == "g1"
assert config["auto.offset.reset"] == "latest"
assert config["enable.auto.commit"] is False
def mq_consumer_registry_names():
from iti.mq import mq_registry
return set(mq_registry.consumers.keys())
def registry_with_consumer(handler):
registry = RegistryClass()
registry.register_consumer(
name="demo",
topics="demo.topic",
group_id="g1",
handler=handler,
)
return registry
def make_mq_app(registry, fake_consumer):
app = create_app(
config_mapping=BaseConfig(
database_url="sqlite+pysqlite:///:memory:",
testing=True,
mq_enabled=False,
exchange_enabled=False,
)
)
def consumer_factory(config):
fake_consumer.config = config
return fake_consumer
init_mq(
app,
{
"backend": "kafka",
"bootstrap_servers": "localhost:9092",
"failure_backoff_seconds": 0.01,
"poll_timeout_seconds": 0.01,
},
registry=registry,
producer_factory=lambda config: FakeProducer(config),
consumer_factory=consumer_factory,
)
return app
def wait_until(predicate, timeout=1.0):
loop = asyncio.new_event_loop()
try:
deadline = loop.time() + timeout
while loop.time() < deadline:
if predicate():
return
loop.run_until_complete(asyncio.sleep(0.01))
finally:
loop.close()
raise AssertionError("condition not reached")
def call_or_value(value, name):
attr = getattr(value, name)
return attr() if callable(attr) else attr