You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
384 lines
10 KiB
Python
384 lines
10 KiB
Python
import asyncio
|
|
import builtins
|
|
import sys
|
|
|
|
import pytest
|
|
|
|
from iti import create_app
|
|
from iti.config import BaseConfig
|
|
from iti.mq import MQConfigError, init_mq, mq_client, mq_consumer
|
|
from iti.mq.backend import KafkaBackend
|
|
from iti.mq.registry import MQRegistry as RegistryClass
|
|
|
|
|
|
class FakeProducer:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.produced = []
|
|
self.polled = []
|
|
self.flushed = []
|
|
|
|
def poll(self, timeout):
|
|
self.polled.append(timeout)
|
|
|
|
def produce(self, topic, *, value=None, key=None, headers=None):
|
|
self.produced.append(
|
|
{"topic": topic, "value": value, "key": key, "headers": headers}
|
|
)
|
|
|
|
def flush(self, timeout=None):
|
|
self.flushed.append(timeout)
|
|
|
|
|
|
class FakeConsumer:
|
|
def __init__(self, messages=None):
|
|
self.messages = list(messages or [])
|
|
self.subscribed = []
|
|
self.committed = []
|
|
self.sought = []
|
|
self.closed = False
|
|
self.config = None
|
|
|
|
def subscribe(self, topics):
|
|
self.subscribed.append(topics)
|
|
|
|
def poll(self, timeout):
|
|
if self.messages:
|
|
return self.messages.pop(0)
|
|
return None
|
|
|
|
def commit(self, message, asynchronous=False):
|
|
self.committed.append((message, asynchronous))
|
|
|
|
def seek(self, position):
|
|
if all(hasattr(position, name) for name in ("topic", "partition", "offset")):
|
|
self.sought.append(
|
|
(
|
|
call_or_value(position, "topic"),
|
|
call_or_value(position, "partition"),
|
|
call_or_value(position, "offset"),
|
|
)
|
|
)
|
|
return
|
|
self.sought.append(position)
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
class FakeMessage:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
topic="demo.topic",
|
|
partition=0,
|
|
offset=1,
|
|
key=b"k1",
|
|
value=b'{"ok":true}',
|
|
headers=None,
|
|
):
|
|
self._topic = topic
|
|
self._partition = partition
|
|
self._offset = offset
|
|
self._key = key
|
|
self._value = value
|
|
self._headers = headers or [("source", b"test")]
|
|
|
|
def topic(self):
|
|
return self._topic
|
|
|
|
def partition(self):
|
|
return self._partition
|
|
|
|
def offset(self):
|
|
return self._offset
|
|
|
|
def key(self):
|
|
return self._key
|
|
|
|
def value(self):
|
|
return self._value
|
|
|
|
def headers(self):
|
|
return self._headers
|
|
|
|
def timestamp(self):
|
|
return (0, 0)
|
|
|
|
def error(self):
|
|
return None
|
|
|
|
|
|
def test_mq_registry_registers_decorator_and_explicit_consumer():
|
|
registry = RegistryClass()
|
|
registry.register_consumer(
|
|
name="explicit",
|
|
topics=["demo.explicit"],
|
|
group_id="g1",
|
|
handler=lambda message: None,
|
|
)
|
|
|
|
assert registry.consumers["explicit"].topics == ("demo.explicit",)
|
|
|
|
before = set(mq_consumer_registry_names())
|
|
|
|
try:
|
|
@mq_consumer("demo.decorated", name="decorated-test", group_id="g1")
|
|
def decorated(message):
|
|
return None
|
|
|
|
assert "decorated-test" in mq_consumer_registry_names() - before
|
|
assert decorated.__name__ == "decorated"
|
|
finally:
|
|
from iti.mq import mq_registry
|
|
|
|
mq_registry.consumers.pop("decorated-test", None)
|
|
|
|
|
|
def test_mq_registry_rejects_duplicate_names():
|
|
registry = RegistryClass()
|
|
registry.register_producer(name="events", topic="demo.events")
|
|
registry.register_consumer(
|
|
name="consumer",
|
|
topics="demo.events",
|
|
handler=lambda message: None,
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="producer already registered"):
|
|
registry.register_producer(name="events", topic="other")
|
|
with pytest.raises(ValueError, match="consumer already registered"):
|
|
registry.register_consumer(
|
|
name="consumer",
|
|
topics="other",
|
|
handler=lambda message: None,
|
|
)
|
|
|
|
|
|
def test_mq_enabled_false_does_not_import_or_configure_kafka(monkeypatch):
|
|
sys.modules.pop("confluent_kafka", None)
|
|
|
|
app = create_app(
|
|
config_mapping=BaseConfig(
|
|
database_url="sqlite+pysqlite:///:memory:",
|
|
testing=True,
|
|
mq_enabled=False,
|
|
exchange_enabled=False,
|
|
)
|
|
)
|
|
|
|
assert not hasattr(app.state, "iti_mq_client")
|
|
assert "confluent_kafka" not in sys.modules
|
|
|
|
|
|
def test_mq_enabled_true_without_dependency_raises_install_hint(monkeypatch):
|
|
sys.modules.pop("confluent_kafka", None)
|
|
real_import = builtins.__import__
|
|
|
|
def missing_confluent_kafka(name, *args, **kwargs):
|
|
if name == "confluent_kafka":
|
|
raise ImportError("missing")
|
|
return real_import(name, *args, **kwargs)
|
|
|
|
monkeypatch.setattr(builtins, "__import__", missing_confluent_kafka)
|
|
|
|
with pytest.raises(MQConfigError, match=r"iti-flask\[mq-kafka\]"):
|
|
create_app(
|
|
config_mapping=BaseConfig(
|
|
database_url="sqlite+pysqlite:///:memory:",
|
|
testing=True,
|
|
mq_enabled=True,
|
|
mq={"backend": "kafka", "bootstrap_servers": "127.0.0.1:9092"},
|
|
exchange_enabled=False,
|
|
)
|
|
)
|
|
|
|
|
|
def test_mq_client_sends_json_bytes_and_registered_sender():
|
|
producer = FakeProducer({"bootstrap.servers": "localhost:9092"})
|
|
registry = RegistryClass()
|
|
registry.register_producer(name="events", topic="demo.events")
|
|
app = create_app(
|
|
config_mapping=BaseConfig(
|
|
database_url="sqlite+pysqlite:///:memory:",
|
|
testing=True,
|
|
mq_enabled=False,
|
|
exchange_enabled=False,
|
|
)
|
|
)
|
|
init_mq(
|
|
app,
|
|
{"backend": "kafka", "bootstrap_servers": "localhost:9092"},
|
|
registry=registry,
|
|
producer_factory=lambda config: producer,
|
|
)
|
|
|
|
client = mq_client(app)
|
|
client.send_json("demo.raw", {"id": "1"}, key="k1", headers={"h": "v"})
|
|
client.send("demo.bytes", value=b"raw", key=b"k2")
|
|
client.sender("events").send_json({"id": "2"})
|
|
client.flush(2)
|
|
|
|
assert producer.produced[0] == {
|
|
"topic": "demo.raw",
|
|
"value": b'{"id":"1"}',
|
|
"key": b"k1",
|
|
"headers": [("h", b"v")],
|
|
}
|
|
assert producer.produced[1]["value"] == b"raw"
|
|
assert producer.produced[1]["key"] == b"k2"
|
|
assert producer.produced[2]["topic"] == "demo.events"
|
|
assert producer.flushed == [2]
|
|
|
|
|
|
def test_runner_commits_after_successful_sync_handler():
|
|
handled = []
|
|
message = FakeMessage()
|
|
fake_consumer = FakeConsumer([message])
|
|
app = make_mq_app(
|
|
registry_with_consumer(lambda item: handled.append(item.value)),
|
|
fake_consumer,
|
|
)
|
|
runner = app.state.iti_mq_runner
|
|
|
|
runner.start()
|
|
wait_until(lambda: bool(fake_consumer.committed))
|
|
runner.stop()
|
|
|
|
assert handled == [{"ok": True}]
|
|
assert fake_consumer.committed == [(message, False)]
|
|
assert fake_consumer.sought == []
|
|
assert fake_consumer.closed is True
|
|
|
|
|
|
def test_runner_executes_async_handler():
|
|
handled = []
|
|
|
|
async def handler(message):
|
|
await asyncio.sleep(0)
|
|
handled.append(message.key)
|
|
|
|
fake_consumer = FakeConsumer([FakeMessage()])
|
|
app = make_mq_app(registry_with_consumer(handler), fake_consumer)
|
|
runner = app.state.iti_mq_runner
|
|
|
|
runner.start()
|
|
wait_until(lambda: bool(fake_consumer.committed))
|
|
runner.stop()
|
|
|
|
assert handled == ["k1"]
|
|
|
|
|
|
def test_runner_does_not_commit_and_seeks_after_handler_failure():
|
|
message = FakeMessage()
|
|
fake_consumer = FakeConsumer([message])
|
|
|
|
def handler(_message):
|
|
raise RuntimeError("boom")
|
|
|
|
app = make_mq_app(registry_with_consumer(handler), fake_consumer)
|
|
runner = app.state.iti_mq_runner
|
|
|
|
runner.start()
|
|
wait_until(lambda: bool(fake_consumer.sought))
|
|
runner.stop()
|
|
|
|
assert fake_consumer.committed == []
|
|
assert fake_consumer.sought == [("demo.topic", 0, 1)]
|
|
|
|
|
|
def test_runner_raises_when_group_id_missing():
|
|
registry = RegistryClass()
|
|
registry.register_consumer(
|
|
name="demo",
|
|
topics="demo.topic",
|
|
handler=lambda message: None,
|
|
)
|
|
app = make_mq_app(registry, FakeConsumer())
|
|
|
|
with pytest.raises(MQConfigError, match="missing group_id"):
|
|
app.state.iti_mq_runner.start()
|
|
|
|
|
|
def test_kafka_backend_forces_manual_commit():
|
|
backend = KafkaBackend(
|
|
{
|
|
"bootstrap_servers": "localhost:9092",
|
|
"group_id": "global",
|
|
"auto_offset_reset": "latest",
|
|
"consumer": {"enable.auto.commit": True},
|
|
},
|
|
producer_factory=lambda config: FakeProducer(config),
|
|
consumer_factory=lambda config: FakeConsumer(),
|
|
)
|
|
|
|
config = backend.consumer_config("g1")
|
|
|
|
assert config["bootstrap.servers"] == "localhost:9092"
|
|
assert config["group.id"] == "g1"
|
|
assert config["auto.offset.reset"] == "latest"
|
|
assert config["enable.auto.commit"] is False
|
|
|
|
|
|
def mq_consumer_registry_names():
|
|
from iti.mq import mq_registry
|
|
|
|
return set(mq_registry.consumers.keys())
|
|
|
|
|
|
def registry_with_consumer(handler):
|
|
registry = RegistryClass()
|
|
registry.register_consumer(
|
|
name="demo",
|
|
topics="demo.topic",
|
|
group_id="g1",
|
|
handler=handler,
|
|
)
|
|
return registry
|
|
|
|
|
|
def make_mq_app(registry, fake_consumer):
|
|
app = create_app(
|
|
config_mapping=BaseConfig(
|
|
database_url="sqlite+pysqlite:///:memory:",
|
|
testing=True,
|
|
mq_enabled=False,
|
|
exchange_enabled=False,
|
|
)
|
|
)
|
|
|
|
def consumer_factory(config):
|
|
fake_consumer.config = config
|
|
return fake_consumer
|
|
|
|
init_mq(
|
|
app,
|
|
{
|
|
"backend": "kafka",
|
|
"bootstrap_servers": "localhost:9092",
|
|
"failure_backoff_seconds": 0.01,
|
|
"poll_timeout_seconds": 0.01,
|
|
},
|
|
registry=registry,
|
|
producer_factory=lambda config: FakeProducer(config),
|
|
consumer_factory=consumer_factory,
|
|
)
|
|
return app
|
|
|
|
|
|
def wait_until(predicate, timeout=1.0):
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
deadline = loop.time() + timeout
|
|
while loop.time() < deadline:
|
|
if predicate():
|
|
return
|
|
loop.run_until_complete(asyncio.sleep(0.01))
|
|
finally:
|
|
loop.close()
|
|
raise AssertionError("condition not reached")
|
|
|
|
|
|
def call_or_value(value, name):
|
|
attr = getattr(value, name)
|
|
return attr() if callable(attr) else attr
|