You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/tasks/runner.py

90 lines
2.7 KiB
Python

from __future__ import annotations
import re
import threading
import time
import atexit
from .registry import TaskRegistry, task_registry
class TaskRunner:
"""Single-process task scheduler."""
def __init__(self, registry: TaskRegistry, *, tick_seconds: int = 1) -> None:
self.registry = registry
self.tick_seconds = tick_seconds
self._last_run: dict[str, float] = {}
self._stop = threading.Event()
self._thread: threading.Thread | None = None
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread:
self._thread.join(timeout=3)
def _loop(self) -> None:
while not self._stop.is_set():
now = time.time()
for task in list(self.registry.tasks.values()):
if self._due(task.schedule, task.name, now):
self._last_run[task.name] = now
threading.Thread(
target=self.registry.trigger,
args=(task.name,),
daemon=True,
).start()
self._stop.wait(self.tick_seconds)
def _due(self, schedule: str | None, name: str, now: float) -> bool:
if not schedule:
return False
interval = _parse_interval(schedule)
if interval is None:
interval = _parse_simple_cron(schedule)
if interval is None:
return False
last = self._last_run.get(name)
return last is None or now - last >= interval
def _parse_interval(schedule: str) -> int | None:
match = re.fullmatch(r"interval:(\d+)", schedule.strip())
if not match:
return None
return max(int(match.group(1)), 1)
def _parse_simple_cron(schedule: str) -> int | None:
value = schedule.strip()
if value.startswith("cron:"):
value = value[5:].strip()
parts = value.split()
if len(parts) != 5:
return None
minute = parts[0]
if minute == "*":
return 60
match = re.fullmatch(r"\*/(\d+)", minute)
if match:
return max(int(match.group(1)), 1) * 60
return None
def init_task_runner(app, registry: TaskRegistry | None = None) -> TaskRunner:
runner = TaskRunner(registry or task_registry)
app.extensions["iti_task_registry"] = registry or task_registry
app.extensions["iti_task_runner"] = runner
if app.config.get("TASKS_ENABLED", False):
app.logger.info("starting single-process task runner")
runner.start()
atexit.register(runner.stop)
return runner