You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

709 lines
26 KiB
Python

from __future__ import annotations
import argparse
import os
import re
import subprocess
import sys
import tomllib
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Sequence
DEFAULT_FRAMEWORK_GIT = os.environ.get(
"ITICLI_FRAMEWORK_GIT",
"https://git.noahlan.cn/iti-framework/iTi-Flask.git",
)
DEFAULT_SYSTEM_GIT = os.environ.get(
"ITICLI_SYSTEM_GIT",
"https://git.noahlan.cn/iti-framework/iTi-System.git",
)
DEFAULT_COPIER_REF = os.environ.get("ITICLI_COPIER_REF", "HEAD")
ENV_NAMES = {"dev", "test", "prod", "default"}
class CliError(Exception):
def __init__(self, message: str, code: int = 2) -> None:
super().__init__(message)
self.code = code
@dataclass(frozen=True)
class ProjectContext:
root: Path
kind: str
pyproject: dict[str, Any]
has_system: bool
def main(argv: Sequence[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
return int(args.handler(args))
except CliError as exc:
print(str(exc), file=sys.stderr)
return exc.code
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="iticli",
description="iTi-Flask framework and business project CLI.",
)
parser.set_defaults(handler=lambda _args: parser.print_help() or 0)
subparsers = parser.add_subparsers(dest="command")
help_parser = subparsers.add_parser("help", help="show help")
help_parser.set_defaults(handler=lambda _args: parser.print_help() or 0)
install_parser = subparsers.add_parser("install", help="install project dependencies")
install_parser.add_argument(
"profile",
nargs="?",
choices=["dev", "odbc"],
help="dev installs project dev dependencies; odbc also installs pyodbc",
)
install_parser.set_defaults(handler=handle_install)
test_parser = subparsers.add_parser("test", help="run tests")
test_parser.set_defaults(handler=handle_test)
add_run_parser(subparsers, "run")
add_run_parser(subparsers, "serve")
migrate_parser = subparsers.add_parser("migrate", help="run Alembic commands")
migrate_parser.add_argument("action", nargs="?", help="upgrade target, heads, current, revision, ...")
migrate_parser.add_argument("args", nargs=argparse.REMAINDER)
migrate_parser.add_argument("-m", "--message", help="migration message for revision")
migrate_parser.set_defaults(handler=handle_migrate)
create_parser = subparsers.add_parser("create", help="create a business project")
create_parser.add_argument("project_name", help="target directory or project name")
create_parser.add_argument("--with-system", action="store_true", help="include iti-system")
create_parser.add_argument("--target", help="target directory, defaults to project_name")
create_parser.add_argument("--slug", help="distribution/project slug, defaults to target basename")
create_parser.add_argument("--display-name", help="business project display name")
create_parser.add_argument("--source", help="Copier template source, defaults to iTi-Flask private Git")
create_parser.add_argument("--ref", default=DEFAULT_COPIER_REF, help="Copier source ref, defaults to HEAD")
create_parser.add_argument("--local", action="store_true", help="use current iTi-Flask framework checkout as template source")
create_parser.add_argument("--framework-git", help="framework dependency Git URL rendered into the project")
create_parser.add_argument("--framework-tag", help="framework dependency tag rendered into the project")
create_parser.add_argument("--system-git", help="system dependency Git URL rendered into the project")
create_parser.add_argument("--system-tag", help="system dependency tag rendered into the project")
create_parser.set_defaults(handler=handle_create)
release_parser = subparsers.add_parser("release", help="release iTi-Flask framework")
release_parser.add_argument("version", nargs="?", help="target version, defaults to next patch")
release_parser.set_defaults(handler=handle_release)
sync_parser = subparsers.add_parser("sync", help="sync framework or system package")
sync_parser.add_argument("target", choices=["flask", "system", "all"], nargs="?", default="all")
sync_parser.set_defaults(handler=handle_sync)
template_parser = subparsers.add_parser("template", help="check or update Copier template")
template_subparsers = template_parser.add_subparsers(dest="template_action", required=True)
template_check = template_subparsers.add_parser("check", help="check template updates")
template_check.set_defaults(handler=handle_template)
template_update = template_subparsers.add_parser("update", help="apply template updates")
template_update.set_defaults(handler=handle_template)
init_parser = subparsers.add_parser("init", help="initialize a business project")
init_parser.add_argument("scope", nargs="?", choices=["system"], help="only run system migration and seed")
init_parser.set_defaults(handler=handle_init)
docker_parser = subparsers.add_parser("docker", help="run Docker Compose commands")
docker_parser.add_argument("action", choices=["build", "up", "down", "logs"])
docker_parser.add_argument("--db", action="store_true", help="include docker-compose.with-db.yml")
docker_parser.add_argument("--service", default="app", help="service name for logs")
docker_parser.set_defaults(handler=handle_docker)
return parser
def add_run_parser(subparsers: argparse._SubParsersAction[argparse.ArgumentParser], name: str) -> None:
run_parser = subparsers.add_parser(name, help="run the app")
run_parser.add_argument("env_or_port", nargs="?", help="dev, test, prod, or port")
run_parser.add_argument("port", nargs="?", help="port when env is provided")
run_parser.set_defaults(handler=handle_run)
def handle_install(args: argparse.Namespace) -> int:
ctx = require_project()
if args.profile == "odbc":
extra_name = find_optional_extra(ctx.pyproject, ["odbc", "erp"])
if extra_name:
return run(["uv", "sync", "--extra", "dev", "--extra", extra_name], ctx.root)
code = run(["uv", "sync", "--extra", "dev"], ctx.root)
if code:
return code
return run(["uv", "pip", "install", "pyodbc>=5.3.0"], ctx.root)
return run(["uv", "sync", "--extra", "dev"], ctx.root)
def handle_test(_args: argparse.Namespace) -> int:
ctx = require_project()
if ctx.kind == "business":
return run(["uv", "run", "--extra", "dev", "pytest", "-q"], ctx.root)
return run(["uv", "run", "pytest", "-q"], ctx.root)
def handle_run(args: argparse.Namespace) -> int:
ctx = require_project()
env_name, port = parse_run_args(args.env_or_port, args.port)
env = {"APP_ENV": env_name}
if ctx.kind == "framework":
cmd = ["uv", "run", "uvicorn", "iti.app:create_app", "--factory", "--port", port]
else:
cmd = ["uv", "run", "uvicorn", "main:app", "--port", port]
if env_name == "prod":
cmd.extend(["--host", "0.0.0.0"])
else:
cmd.append("--reload")
return run(cmd, ctx.root, extra_env=env)
def handle_migrate(args: argparse.Namespace) -> int:
ctx = require_project()
action = args.action
rest = list(args.args or [])
base_cmd = alembic_base_cmd(ctx)
if not action:
return run([*base_cmd, "upgrade", "head"], ctx.root)
if action in {"revision", "migration"}:
message = args.message or extract_message(rest) or " ".join(rest).strip()
if not message:
raise CliError('missing migration message, example: iticli migrate revision "alice add order table"')
return run([*base_cmd, "revision", "--autogenerate", "-m", message], ctx.root)
if action == "upgrade":
target = rest[0] if rest else "head"
return run([*base_cmd, "upgrade", target, *rest[1:]], ctx.root)
if action in {"heads", "current", "history", "branches", "downgrade", "stamp", "show"}:
return run([*base_cmd, action, *rest], ctx.root)
return run([*base_cmd, "upgrade", action, *rest], ctx.root)
def handle_create(args: argparse.Namespace) -> int:
cwd = Path.cwd()
source = args.source or DEFAULT_FRAMEWORK_GIT
if args.local:
ctx = require_project({"framework"})
source = str(ctx.root)
target = Path(args.target or args.project_name).expanduser()
if not target.is_absolute():
target = cwd / target
slug = args.slug or target.name
display_name = args.display_name or target.name
cmd = [
"uvx",
"copier",
"copy",
"--defaults",
"--vcs-ref",
args.ref,
source,
str(target),
"-d",
f"project_name={display_name}",
"-d",
f"project_slug={slug}",
"-d",
f"include_system={str(args.with_system).lower()}",
]
if args.framework_git:
cmd.extend(["-d", f"framework_git={args.framework_git}"])
if args.framework_tag is not None:
cmd.extend(["-d", f"framework_tag={args.framework_tag}"])
if args.system_git:
cmd.extend(["-d", f"system_git={args.system_git}"])
if args.system_tag is not None:
cmd.extend(["-d", f"system_tag={args.system_tag}"])
return run(cmd, cwd)
def handle_release(args: argparse.Namespace) -> int:
ctx = require_project({"framework", "system"})
if ctx.kind == "system":
return release_system(ctx.root, args.version)
return release_framework(ctx.root, args.version)
def handle_sync(args: argparse.Namespace) -> int:
ctx = require_project({"business"})
if args.target in {"flask", "all"}:
cmd = ["uv", "sync", "--extra", "dev", "--upgrade-package", "iti-flask"]
if args.target == "all" and ctx.has_system:
cmd.extend(["--upgrade-package", "iti-system"])
code = run(cmd, ctx.root)
if code:
return code
if args.target in {"system", "all"}:
if not ctx.has_system:
raise CliError("current business project does not depend on iti-system")
code = run(["uv", "run", "iti-system", "migrations", "sync", "--target", "migrations/versions"], ctx.root)
if code:
return code
return 0
def handle_template(args: argparse.Namespace) -> int:
ctx = require_project({"business"})
cmd = ["uvx", "copier", "update", "--defaults", "--vcs-ref", "HEAD", str(ctx.root)]
if args.template_action == "check":
cmd.insert(4, "--pretend")
return run(cmd, ctx.root)
def handle_init(args: argparse.Namespace) -> int:
ctx = require_project({"business"})
if args.scope == "system":
if not ctx.has_system:
raise CliError("current business project does not depend on iti-system")
code = sync_system_migrations(ctx)
if code:
return code
code = run([*alembic_base_cmd(ctx), "upgrade", "head"], ctx.root)
if code:
return code
return seed_system(ctx)
code = run(["uv", "sync", "--extra", "dev"], ctx.root)
if code:
return code
if ctx.has_system:
code = sync_system_migrations(ctx)
if code:
return code
code = run([*alembic_base_cmd(ctx), "upgrade", "head"], ctx.root)
if code:
return code
if ctx.has_system:
return seed_system(ctx)
return 0
def handle_docker(args: argparse.Namespace) -> int:
ctx = require_project({"business"})
compose = ["docker", "compose"]
if args.db:
compose.extend(["-f", "docker-compose.yml", "-f", "docker-compose.with-db.yml"])
if args.action == "build":
return run([*compose, "build"], ctx.root)
if args.action == "up":
return run([*compose, "up", "-d", "--build"], ctx.root)
if args.action == "down":
return run([*compose, "down"], ctx.root)
return run([*compose, "logs", "-f", args.service], ctx.root)
def require_project(allowed: set[str] | None = None) -> ProjectContext:
ctx = detect_project(Path.cwd())
if ctx.kind == "unknown":
raise CliError("current directory is not an iTi-Flask framework or business project")
if allowed and ctx.kind not in allowed:
expected = " or ".join(sorted(allowed))
raise CliError(f"command requires {expected} project, current project is {ctx.kind}")
return ctx
def detect_project(cwd: Path) -> ProjectContext:
root = find_project_root(cwd)
if root is None:
return ProjectContext(cwd, "unknown", {}, False)
pyproject = read_pyproject(root / "pyproject.toml")
project_name = str(pyproject.get("project", {}).get("name", ""))
is_framework = (
project_name == "iti-flask"
and (root / "iti" / "app.py").is_file()
and (root / "copier.yml").is_file()
)
has_flask = has_package(pyproject, "iti-flask")
has_system = has_package(pyproject, "iti-system")
is_business = (
has_flask
and (root / "main.py").is_file()
and (root / "app").is_dir()
and (root / "migrations" / "alembic.ini").is_file()
)
is_system = (
project_name == "iti-system"
and (root / "iti_system" / "module.py").is_file()
and (root / "iti_system" / "cli.py").is_file()
)
if is_framework:
kind = "framework"
elif is_system:
kind = "system"
elif is_business:
kind = "business"
else:
kind = "unknown"
return ProjectContext(root, kind, pyproject, has_system)
def find_project_root(cwd: Path) -> Path | None:
current = cwd.resolve()
for path in (current, *current.parents):
if (path / "pyproject.toml").is_file():
return path
return None
def read_pyproject(path: Path) -> dict[str, Any]:
try:
with path.open("rb") as file:
return tomllib.load(file)
except FileNotFoundError:
return {}
except tomllib.TOMLDecodeError as exc:
raise CliError(f"invalid pyproject.toml: {exc}") from exc
def has_package(pyproject: dict[str, Any], package_name: str) -> bool:
project = pyproject.get("project", {})
dependencies = list(project.get("dependencies", []) or [])
optional = project.get("optional-dependencies", {}) or {}
for values in optional.values():
dependencies.extend(values or [])
normalized = package_name.lower().replace("_", "-")
for dependency in dependencies:
name = re.split(r"\s|@|>=|<=|==|~=|!=|>|<|\[", str(dependency), maxsplit=1)[0]
if name.lower().replace("_", "-") == normalized:
return True
uv_sources = pyproject.get("tool", {}).get("uv", {}).get("sources", {}) or {}
return normalized in {str(key).lower().replace("_", "-") for key in uv_sources}
def find_optional_extra(pyproject: dict[str, Any], names: Sequence[str]) -> str | None:
optional = pyproject.get("project", {}).get("optional-dependencies", {}) or {}
normalized = {str(name).lower().replace("_", "-"): str(name) for name in optional}
for name in names:
found = normalized.get(name.lower().replace("_", "-"))
if found:
return found
return None
def parse_run_args(env_or_port: str | None, port_arg: str | None) -> tuple[str, str]:
env_name = os.environ.get("APP_ENV", "dev")
port = "8000"
if env_or_port:
if env_or_port in ENV_NAMES:
env_name = "dev" if env_or_port == "default" else env_or_port
port = port_arg or port
elif env_or_port.isdigit():
port = env_or_port
else:
env_name = env_or_port
port = port_arg or port
if port_arg and not env_or_port:
port = port_arg
if not str(port).isdigit():
raise CliError(f"invalid port: {port}")
return env_name, str(port)
def extract_message(rest: list[str]) -> str | None:
for flag in ("-m", "--message"):
if flag in rest:
index = rest.index(flag)
if index + 1 >= len(rest):
raise CliError(f"{flag} requires a message")
message = rest[index + 1]
del rest[index : index + 2]
return message
for item in list(rest):
if item.startswith("--message="):
rest.remove(item)
return item.partition("=")[2]
return None
def alembic_base_cmd(ctx: ProjectContext) -> list[str]:
cmd = ["uv", "run", "alembic"]
if ctx.kind == "business":
cmd.extend(["-c", "migrations/alembic.ini"])
return cmd
def sync_system_migrations(ctx: ProjectContext) -> int:
return run(["uv", "run", "iti-system", "migrations", "sync", "--target", "migrations/versions"], ctx.root)
def seed_system(ctx: ProjectContext) -> int:
return run(
["uv", "run", "iti-system", "seed", "system", "app:create_app"],
ctx.root,
extra_env={"PYTHONPATH": "."},
)
def run(cmd: Sequence[str], cwd: Path, extra_env: dict[str, str] | None = None) -> int:
env = project_env(cwd, extra_env)
completed = subprocess.run(list(cmd), cwd=str(cwd), env=env, check=False)
return int(completed.returncode)
def project_env(root: Path, extra_env: dict[str, str] | None = None) -> dict[str, str]:
env = os.environ.copy()
virtual_env = env.get("VIRTUAL_ENV")
project_venv = root / ".venv"
if virtual_env:
try:
same_venv = Path(virtual_env).resolve() == project_venv.resolve()
except OSError:
same_venv = False
if not same_venv:
env.pop("VIRTUAL_ENV", None)
if extra_env:
env.update(extra_env)
return env
def release_framework(root: Path, version_arg: str | None) -> int:
current_version = read_current_version(root / "iti" / "__about__.py")
target_version = normalize_version(version_arg) if version_arg else bump_patch(current_version)
target_tag = f"v{target_version}"
if target_version == current_version:
raise CliError(f"version already set to {target_version}")
if not version_is_newer(current_version, target_version):
raise CliError(f"target version must be newer than {current_version}")
if git_output(root, ["status", "--porcelain"]).strip():
raise CliError("working tree is not clean")
branch = git_output(root, ["symbolic-ref", "--quiet", "--short", "HEAD"]).strip()
if not branch:
raise CliError("release requires a branch checkout")
if subprocess.run(["git", "rev-parse", "--verify", "--quiet", f"refs/tags/{target_tag}"], cwd=root).returncode == 0:
raise CliError(f"tag already exists: {target_tag}")
code = run(["uv", "run", "pytest", "-q"], root)
if code:
return code
write_framework_release_files(root, target_version)
code = run(
[
"git",
"add",
"iti/__about__.py",
"copier.yml",
"copier-template/pyproject.toml.jinja",
"README.md",
],
root,
)
if code:
return code
code = run(["git", "commit", "-m", f"chore: release {target_tag}"], root)
if code:
return code
code = run(["git", "tag", "-a", target_tag, "-m", f"release {target_tag}"], root)
if code:
return code
code = run(["git", "push", "origin", branch, target_tag], root)
if code:
return code
print(f"released {target_tag}")
return 0
def release_system(root: Path, version_arg: str | None) -> int:
current_version = read_current_version(root / "iti_system" / "__about__.py")
target_version = normalize_version(version_arg) if version_arg else bump_patch(current_version)
target_tag = f"v{target_version}"
if target_version == current_version:
raise CliError(f"version already set to {target_version}")
if not version_is_newer(current_version, target_version):
raise CliError(f"target version must be newer than {current_version}")
if git_output(root, ["status", "--porcelain"]).strip():
raise CliError("working tree is not clean")
branch = git_output(root, ["symbolic-ref", "--quiet", "--short", "HEAD"]).strip()
if not branch:
raise CliError("release requires a branch checkout")
if subprocess.run(["git", "rev-parse", "--verify", "--quiet", f"refs/tags/{target_tag}"], cwd=root).returncode == 0:
raise CliError(f"tag already exists: {target_tag}")
code = run(["uv", "run", "pytest", "-q"], root)
if code:
return code
write_system_release_files(root, target_version)
code = run(["git", "add", "iti_system/__about__.py", "pyproject.toml", "README.md"], root)
if code:
return code
code = run(["git", "commit", "-m", f"chore: release {target_tag}"], root)
if code:
return code
code = run(["git", "tag", "-a", target_tag, "-m", f"release {target_tag}"], root)
if code:
return code
code = run(["git", "push", "origin", branch, target_tag], root)
if code:
return code
print(f"released {target_tag}")
return 0
def git_output(root: Path, args: Sequence[str]) -> str:
completed = subprocess.run(
["git", *args],
cwd=root,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if completed.returncode:
raise CliError(completed.stderr.strip() or f"git {' '.join(args)} failed")
return completed.stdout
def read_current_version(about_file: Path) -> str:
match = re.search(r'^__version__\s*=\s*"([^"]+)"', about_file.read_text(encoding="utf-8"), re.MULTILINE)
if not match:
raise CliError(f"version not found in {about_file}")
return match.group(1)
def write_framework_release_files(root: Path, target_version: str) -> None:
replace_first_line(
root / "iti" / "__about__.py",
lambda line: line.startswith("__version__ = "),
f'__version__ = "{target_version}"',
)
replace_section_default(root / "copier.yml", "framework_tag:", f" default: v{target_version}")
replace_section_default(root / "copier.yml", "system_tag:", f" default: v{target_version}")
replace_first_line(
root / "copier-template" / "pyproject.toml.jinja",
lambda line: line.startswith("version = "),
f'version = "{target_version}"',
)
replace_first_line(
root / "README.md",
lambda line: line.startswith(' "iti-flask @ git+https://git.noahlan.cn/iti-framework/iTi-Flask.git@v'),
f' "iti-flask @ git+https://git.noahlan.cn/iti-framework/iTi-Flask.git@v{target_version}",',
)
replace_first_line(
root / "README.md",
lambda line: line.startswith("iticli release v"),
f"iticli release v{target_version}",
count=2,
)
def write_system_release_files(root: Path, target_version: str) -> None:
replace_first_line(
root / "iti_system" / "__about__.py",
lambda line: line.startswith("__version__ = "),
f'__version__ = "{target_version}"',
)
replace_first_line(
root / "pyproject.toml",
lambda line: line.startswith('iti-flask = { git = "https://git.noahlan.cn/iti-framework/iTi-Flask.git", tag = "'),
f'iti-flask = {{ git = "https://git.noahlan.cn/iti-framework/iTi-Flask.git", tag = "v{target_version}" }}',
)
replace_first_line(
root / "README.md",
lambda line: line.startswith(' "iti-flask @ git+https://git.noahlan.cn/iti-framework/iTi-Flask.git@v'),
f' "iti-flask @ git+https://git.noahlan.cn/iti-framework/iTi-Flask.git@v{target_version}",',
)
replace_first_line(
root / "README.md",
lambda line: line.startswith(' "iti-system @ git+https://git.noahlan.cn/iti-framework/iTi-System.git@v'),
f' "iti-system @ git+https://git.noahlan.cn/iti-framework/iTi-System.git@v{target_version}",',
)
replace_first_line(
root / "README.md",
lambda line: line.startswith("iticli release v"),
f"iticli release v{target_version}",
count=2,
)
def replace_first_line(path: Path, predicate: Any, new_line: str, count: int = 1) -> None:
lines = path.read_text(encoding="utf-8").splitlines()
updated = 0
for index, line in enumerate(lines):
if predicate(line):
lines[index] = new_line
updated += 1
if updated == count:
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return
raise CliError(f"line not updated in {path}")
def replace_section_default(path: Path, section_header: str, new_line: str) -> None:
lines = path.read_text(encoding="utf-8").splitlines()
in_section = False
for index, line in enumerate(lines):
if line == section_header:
in_section = True
continue
if in_section and line and not line.startswith((" ", "\t")):
break
if in_section and line.startswith(" default: v"):
lines[index] = new_line
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return
raise CliError(f"line not updated in {path}")
def normalize_version(raw: str) -> str:
major, minor, patch = parse_version(raw)
return f"{major}.{minor}.{patch}"
def bump_patch(raw: str) -> str:
major, minor, patch = parse_version(raw)
return f"{major}.{minor}.{patch + 1}"
def version_is_newer(current: str, target: str) -> bool:
return parse_version(target) > parse_version(current)
def parse_version(raw: str) -> tuple[int, int, int]:
version = raw.removeprefix("v")
parts = version.split(".")
if len(parts) != 3:
raise CliError(f"invalid version: {raw}")
try:
return tuple(int(part) for part in parts) # type: ignore[return-value]
except ValueError as exc:
raise CliError(f"invalid version: {raw}") from exc
if __name__ == "__main__":
raise SystemExit(main())