You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/mq/offset_store.py

105 lines
3.5 KiB
Python

from __future__ import annotations
import sqlite3
import threading
from pathlib import Path
from typing import Protocol
class MQOffsetStore(Protocol):
def get(self, consumer_name: str, topic: str, partition: int) -> int | None:
"""Return the next offset to consume."""
def set(self, consumer_name: str, topic: str, partition: int, offset: int) -> None:
"""Persist the next offset to consume."""
def close(self) -> None:
"""Release store resources."""
class SQLiteMQOffsetStore:
def __init__(self, path: str | Path) -> None:
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._conn = sqlite3.connect(self.path, check_same_thread=False)
self._closed = False
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS mq_offsets (
consumer_name TEXT NOT NULL,
topic TEXT NOT NULL,
partition INTEGER NOT NULL,
offset INTEGER NOT NULL,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (consumer_name, topic, partition)
)
"""
)
self._conn.commit()
def get(self, consumer_name: str, topic: str, partition: int) -> int | None:
with self._lock:
if self._closed:
return None
row = self._conn.execute(
"""
SELECT offset
FROM mq_offsets
WHERE consumer_name = ? AND topic = ? AND partition = ?
""",
(consumer_name, topic, partition),
).fetchone()
if row is None:
return None
return int(row[0])
def set(self, consumer_name: str, topic: str, partition: int, offset: int) -> None:
with self._lock:
if self._closed:
return
self._conn.execute(
"""
INSERT INTO mq_offsets (consumer_name, topic, partition, offset, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(consumer_name, topic, partition)
DO UPDATE SET offset = excluded.offset, updated_at = CURRENT_TIMESTAMP
""",
(consumer_name, topic, partition, offset),
)
self._conn.commit()
def close(self) -> None:
with self._lock:
if self._closed:
return
self._conn.close()
self._closed = True
class MemoryMQOffsetStore:
def __init__(self) -> None:
self._offsets: dict[tuple[str, str, int], int] = {}
self._lock = threading.Lock()
def get(self, consumer_name: str, topic: str, partition: int) -> int | None:
with self._lock:
return self._offsets.get((consumer_name, topic, partition))
def set(self, consumer_name: str, topic: str, partition: int, offset: int) -> None:
with self._lock:
self._offsets[(consumer_name, topic, partition)] = offset
def close(self) -> None:
return None
def create_offset_store(config: dict | None, *, default_path: str | Path) -> MQOffsetStore:
config = dict(config or {})
store_type = str(config.get("type", "sqlite")).strip().lower()
if store_type == "memory":
return MemoryMQOffsetStore()
if store_type == "sqlite":
return SQLiteMQOffsetStore(config.get("path") or default_path)
raise ValueError(f"unsupported mq offset store type: {store_type!r}")