You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/exchange/registry.py

172 lines
5.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from .base import ExchangeBusinessSpec, ExchangeScope, ExchangeTaskHandler
@dataclass
class ExchangeRegistry:
specs: dict[str, ExchangeBusinessSpec] = field(default_factory=dict)
handlers: dict[str, ExchangeTaskHandler] = field(default_factory=dict)
sources: dict[str, object] = field(default_factory=dict)
def register_spec(
self,
spec: ExchangeBusinessSpec,
*,
handler: ExchangeTaskHandler | None = None,
handler_name: str | None = None,
) -> ExchangeBusinessSpec:
key = spec.scope.key()
if key in self.specs:
raise ValueError(f"exchange spec already registered: {key}")
resolved_handler_name = handler_name or spec.handler_name or key
if handler is not None:
self.register_handler(resolved_handler_name, handler)
spec = ExchangeBusinessSpec(
scope=spec.scope,
name=spec.name,
description=spec.description,
layout=spec.layout,
variables=spec.variables,
code=spec.code,
handler_name=resolved_handler_name,
)
self.specs[key] = spec
return spec
def get_spec(
self,
*,
biz_domain: str,
biz_obj: str,
operation: str,
) -> ExchangeBusinessSpec | None:
return self.specs.get(
ExchangeScope.from_mapping(
biz_domain=biz_domain,
biz_obj=biz_obj,
operation=operation,
).key()
)
def list_specs(self) -> list[ExchangeBusinessSpec]:
return sorted(
self.specs.values(),
key=lambda item: (
item.scope.biz_domain,
item.scope.biz_obj,
str(item.scope.operation),
),
)
def register_handler(
self,
name: str,
handler: ExchangeTaskHandler,
) -> ExchangeTaskHandler:
if not name:
raise ValueError("handler name is required")
if name in self.handlers:
raise ValueError(f"exchange handler already registered: {name}")
self.handlers[name] = handler
return handler
def get_handler(self, name: str) -> ExchangeTaskHandler | None:
return self.handlers.get(name)
def get_scope_handler(
self,
*,
biz_domain: str,
biz_obj: str,
operation: str,
) -> ExchangeTaskHandler | None:
spec = self.get_spec(
biz_domain=biz_domain,
biz_obj=biz_obj,
operation=operation,
)
if spec is None:
return None
return self.handlers.get(spec.handler_name or spec.scope.key())
def catalog(self) -> list[dict[str, Any]]:
domains: dict[str, dict[str, Any]] = {}
for spec in self.list_specs():
domain = domains.setdefault(
spec.scope.biz_domain,
{
"biz_domain": spec.scope.biz_domain,
"objects": {},
},
)
obj = domain["objects"].setdefault(
spec.scope.biz_obj,
{
"biz_obj": spec.scope.biz_obj,
"operations": [],
},
)
obj["operations"].append(spec.as_payload())
result: list[dict[str, Any]] = []
for domain in domains.values():
result.append(
{
"biz_domain": domain["biz_domain"],
"objects": list(domain["objects"].values()),
}
)
return result
def register_source(self, name: str, source: object) -> object:
if not name:
raise ValueError("source name is required")
if name in self.sources:
raise ValueError(f"exchange source already registered: {name}")
self.sources[name] = source
return source
def get_source(self, name: str) -> object | None:
return self.sources.get(name)
def get_exchange_registry(app) -> ExchangeRegistry:
registry = getattr(app.state, "iti_exchange", None)
if registry is None:
registry = ExchangeRegistry()
app.state.iti_exchange = registry
return registry
def register_exchange_spec(
app,
spec: ExchangeBusinessSpec,
*,
handler: ExchangeTaskHandler | None = None,
handler_name: str | None = None,
) -> ExchangeBusinessSpec:
return get_exchange_registry(app).register_spec(
spec,
handler=handler,
handler_name=handler_name,
)
def register_exchange_handler(
app,
name: str,
handler: ExchangeTaskHandler,
) -> ExchangeTaskHandler:
return get_exchange_registry(app).register_handler(name, handler)
def register_exchange_source(app, name: str, source: object) -> object:
return get_exchange_registry(app).register_source(name, source)
def get_exchange_source_by_name(app, name: str) -> object | None:
return get_exchange_registry(app).get_source(name)