You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/tests/test_tasks.py

46 lines
1.2 KiB
Python

import time
from iti.tasks import TaskRegistry
from iti.tasks.runner import TaskRunner, _parse_interval, _parse_simple_cron
def test_task_registry_trigger_success():
registry = TaskRegistry()
registry.register(name="demo", handler=lambda: "done")
run = registry.trigger("demo")
assert run.status == "success"
assert run.result == "done"
assert run.finished_at is not None
def test_task_registry_skips_duplicate_running_task():
registry = TaskRegistry()
def handler():
time.sleep(0.05)
registry.register(name="demo", handler=handler)
registry._running.add("demo")
run = registry.trigger("demo")
assert run.status == "skipped"
def test_schedule_parsers():
assert _parse_interval("interval:5") == 5
assert _parse_simple_cron("*/2 * * * *") == 120
assert _parse_simple_cron("cron:* * * * *") == 60
def test_runner_due_calculation():
registry = TaskRegistry()
runner = TaskRunner(registry)
assert runner._due("interval:1", "demo", 100.0) is True
runner._last_run["demo"] = 100.0
assert runner._due("interval:10", "demo", 105.0) is False
assert runner._due("interval:10", "demo", 110.0) is True