You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/service_client/client.py

206 lines
7.1 KiB
Python

from __future__ import annotations
import time
import uuid
from typing import Any
import httpx
from .config import ServiceConfig
from .errors import ServiceBusinessError, ServiceHTTPError, ServiceUnavailableError
ENVELOPE_FIELDS = {"data", "code", "message"}
class ServiceClient:
"""Synchronous HTTP JSON service client."""
def __init__(
self,
config: ServiceConfig,
*,
transport: httpx.BaseTransport | None = None,
) -> None:
self.config = config
timeout = httpx.Timeout(
connect=config.timeout.connect,
read=config.timeout.read,
write=config.timeout.write,
pool=config.timeout.pool,
)
self._client = httpx.Client(
base_url=config.base_url,
timeout=timeout,
transport=transport,
)
self._fail_count = 0
self._opened_at: float | None = None
def get(self, endpoint: str, **kwargs: Any) -> Any:
return self.request("GET", endpoint, **kwargs)
def post(self, endpoint: str, **kwargs: Any) -> Any:
return self.request("POST", endpoint, **kwargs)
def put(self, endpoint: str, **kwargs: Any) -> Any:
return self.request("PUT", endpoint, **kwargs)
def delete(self, endpoint: str, **kwargs: Any) -> Any:
return self.request("DELETE", endpoint, **kwargs)
def request(
self,
method: str,
endpoint: str,
*,
path: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
json: Any = None,
headers: dict[str, str] | None = None,
retry: bool | None = None,
expect_json: bool = True,
) -> Any:
method = method.upper()
url = endpoint.format(**(path or {}))
self._raise_if_open()
attempts = self._attempts_for(method, retry)
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
response = self._client.request(
method,
url,
params=params,
json=json,
headers=self._headers(headers),
)
if not expect_json:
if response.status_code >= 400:
if self._should_retry_status(method, response.status_code, attempt, attempts):
time.sleep(self.config.retry.backoff * attempt)
continue
self._record_failure()
raise ServiceHTTPError(
self.config.name, response.status_code, response.text
)
self._record_success()
return response
if response.status_code >= 400 and self._should_retry_status(
method, response.status_code, attempt, attempts
):
time.sleep(self.config.retry.backoff * attempt)
continue
if not response.content:
if response.status_code >= 400:
self._record_failure()
raise ServiceHTTPError(
self.config.name, response.status_code, response.text
)
self._record_success()
return None
return self._parse_response(response)
except (httpx.TimeoutException, httpx.TransportError) as exc:
last_error = exc
if attempt < attempts:
time.sleep(self.config.retry.backoff * attempt)
continue
self._record_failure()
raise ServiceUnavailableError(
f"service {self.config.name} unavailable: {exc}"
) from exc
raise ServiceUnavailableError(
f"service {self.config.name} unavailable: {last_error}"
)
def close(self) -> None:
self._client.close()
def _headers(self, headers: dict[str, str] | None) -> dict[str, str]:
result = dict(headers or {})
result.setdefault("Accept", "application/json")
result.setdefault("Content-Type", "application/json")
if self.config.token:
result.setdefault("Authorization", f"Bearer {self.config.token}")
result.setdefault("X-Trace-Id", uuid.uuid4().hex)
return result
def _attempts_for(self, method: str, retry: bool | None) -> int:
if retry is False:
return 1
if retry is True:
return self.config.retry.attempts
if method in self.config.retry.methods:
return self.config.retry.attempts
return 1
def _should_retry_status(
self, method: str, status_code: int, attempt: int, attempts: int
) -> bool:
return (
attempt < attempts
and method in self.config.retry.methods
and status_code in self.config.retry.statuses
)
def _raise_if_open(self) -> None:
breaker = self.config.circuit_breaker
if not breaker.enabled or self._opened_at is None:
return
elapsed = time.monotonic() - self._opened_at
if elapsed < breaker.reset_timeout:
raise ServiceUnavailableError(
f"service {self.config.name} circuit breaker is open"
)
self._opened_at = None
self._fail_count = 0
def _record_success(self) -> None:
self._fail_count = 0
self._opened_at = None
def _record_failure(self) -> None:
breaker = self.config.circuit_breaker
if not breaker.enabled:
return
self._fail_count += 1
if self._fail_count >= breaker.fail_max:
self._opened_at = time.monotonic()
def _parse_response(self, response: httpx.Response) -> Any:
try:
payload = response.json()
except ValueError as exc:
if response.status_code >= 400:
self._record_failure()
raise ServiceHTTPError(
self.config.name, response.status_code, response.text
) from exc
self._record_success()
raise
if not self._is_envelope(payload):
if response.status_code >= 400:
self._record_failure()
raise ServiceHTTPError(
self.config.name, response.status_code, response.text
)
self._record_success()
return payload
code = int(payload["code"])
if response.status_code < 400 and code == 200:
self._record_success()
return payload.get("data")
self._record_failure()
raise ServiceBusinessError(
self.config.name,
code,
str(payload.get("message") or ""),
payload.get("data"),
status_code=response.status_code,
)
def _is_envelope(self, payload: Any) -> bool:
return isinstance(payload, dict) and ENVELOPE_FIELDS.issubset(payload.keys())