File
Local panel · 127.0.0.1:8858
File:
/home/webhost/domains/cvt8858.aurorum.co/public/panelctl.py
#!/usr/bin/env python3 """CVT8858 headless core CLI. Batch 5 completes the core loop: provider-brokered auth/permissions, whitelisted actions, recovery CLI, plugin registry/integrity, and runtime. """ from __future__ import annotations import argparse import getpass import json import sys from dataclasses import asdict from typing import Any from core.actions import ( ActionErrorCVT, action_to_dict, list_actions, list_runs, run_action, ) from core.auth import ( AuthErrorCVT, auth_provider_to_dict, get_auth_provider, login, logout, set_auth_provider, user_create, user_list, verify_session, ) from core.config import CORE_VERSION, PROJECT_NAME, get_paths from core.install import doctor, install, status, to_json, upgrade from core.permissions import ( PermissionErrorCVT, check_permission, get_permission_provider, grant_permission, list_permissions, provider_to_dict, revoke_permission, set_permission_provider, ) from core.plugins import ( disable_plugin, enable_plugin, get_plugin, list_plugins, plugin_to_dict, scan_plugins, lock_plugin, verify_all_plugins, verify_plugin, ) from core.recovery import ( RecoveryErrorCVT, recovery_disable_plugin, recovery_reset_auth, recovery_reset_permissions, recovery_status_dict, ) from core.runtime import ( RuntimeErrorCVT, call_plugin, list_runtime_states, plugin_health, read_plugin_logs, runtime_to_dict, start_plugin, stop_plugin, ) from core.setupwizard import main as setupwizard_main def _print_kv(title: str, data: dict[str, object] | None) -> None: print(title) if not data: print(" <none>") return for key, value in data.items(): if isinstance(value, (dict, list)): rendered = json.dumps(value, ensure_ascii=False, indent=2) print(f" {key}: {rendered}") else: print(f" {key}: {value}") def _print_plugin_table(records) -> None: if not records: print("No plugins registered. Run: python3 panelctl.py plugin scan") return headers = ["id", "status", "locked", "changed", "type", "version", "name"] rows = [ [ r.id, r.status, "yes" if r.locked else "no", "yes" if r.changed else "no", r.type, r.version, r.name, ] for r in records ] _print_table(headers, rows) def _print_runtime_table(records) -> None: if not records: print("No plugin runtime records yet.") return headers = ["plugin", "status", "pid", "mode", "started", "stopped", "error"] rows = [[r.plugin_id, r.status, r.pid or "-", r.mode, r.started_at or "-", r.stopped_at or "-", r.last_error or "-"] for r in records] _print_table(headers, rows) def _print_table(headers: list[str], rows: list[list[Any]]) -> None: widths = [len(h) for h in headers] for row in rows: for i, item in enumerate(row): widths[i] = max(widths[i], len(str(item))) fmt = " ".join("{:<" + str(w) + "}" for w in widths) print(fmt.format(*headers)) print(fmt.format(*["-" * w for w in widths])) for row in rows: print(fmt.format(*row)) def _parse_json_params(raw: str | None) -> dict[str, Any]: if not raw: return {} try: payload = json.loads(raw) except json.JSONDecodeError as exc: raise argparse.ArgumentTypeError(f"invalid JSON params: {exc}") from exc if not isinstance(payload, dict): raise argparse.ArgumentTypeError("params must be a JSON object") return payload def _extract_rpc_result(result: dict[str, Any]) -> dict[str, Any]: response = result.get("response", {}) if isinstance(response, dict) and isinstance(response.get("result"), dict): return response["result"] return result def _password_arg_or_prompt(args: argparse.Namespace, *, confirm: bool = False) -> str: if args.password: return args.password first = getpass.getpass("Password: ") if confirm: second = getpass.getpass("Confirm password: ") if first != second: raise ValueError("passwords do not match") return first def cmd_install(args: argparse.Namespace) -> int: result = install() if args.json: print(to_json(result)) else: _print_kv("Installed / verified CVT8858 core:", result) return 0 def cmd_upgrade(args: argparse.Namespace) -> int: result = upgrade() if args.json: print(to_json(result)) else: _print_kv("Upgrade complete:", result) return 0 def cmd_doctor(args: argparse.Namespace) -> int: results = doctor() if args.json: print(to_json(results)) else: print("Doctor checks:") for item in results: mark = {"ok": "OK", "warn": "WARN", "fail": "FAIL"}.get(item.status, item.status.upper()) print(f" [{mark}] {item.name}: {item.message}") return 1 if any(item.status == "fail" for item in results) else 0 def cmd_status(args: argparse.Namespace) -> int: result = status() if args.json: print(to_json(result)) else: _print_kv("Core status:", asdict(result)) return 0 def cmd_paths(args: argparse.Namespace) -> int: paths = get_paths() data = {key: str(value) for key, value in asdict(paths).items()} if args.json: print(to_json(data)) else: _print_kv("Core paths:", data) return 0 def cmd_version(args: argparse.Namespace) -> int: if args.json: print(to_json({"project": PROJECT_NAME, "version": CORE_VERSION})) else: print(f"{PROJECT_NAME} {CORE_VERSION}") return 0 def cmd_plugin_scan(args: argparse.Namespace) -> int: result = scan_plugins() if args.json: print(to_json(result)) else: print(f"Scanned {result['scanned']} plugin directorie(s).") if result["registered"]: print("Registered: " + ", ".join(result["registered"])) if result["broken"]: print("Broken plugins:") for pid, err in result["broken"].items(): print(f" - {pid}: {err}") return 1 if result["broken"] else 0 def cmd_plugin_list(args: argparse.Namespace) -> int: records = list_plugins(include_broken=not args.hide_broken) if args.json: print(to_json([plugin_to_dict(r) for r in records])) else: _print_plugin_table(records) return 0 def cmd_plugin_info(args: argparse.Namespace) -> int: record = get_plugin(args.plugin_id) if not record: print(f"Unknown plugin: {args.plugin_id}", file=sys.stderr) return 1 data = plugin_to_dict(record) if args.json: print(to_json(data)) else: _print_kv(f"Plugin {record.id}:", data) return 0 def cmd_plugin_lock(args: argparse.Namespace) -> int: try: record = lock_plugin(args.plugin_id) except Exception as exc: # noqa: BLE001 print(f"Lock failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(plugin_to_dict(record))) else: print(f"Locked plugin {record.id}: {record.locked_hash}") return 0 def cmd_plugin_verify(args: argparse.Namespace) -> int: try: result = verify_plugin(args.plugin_id) except Exception as exc: # noqa: BLE001 print(f"Verify failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: mark = "OK" if result["ok"] else "FAIL" print(f"[{mark}] {result['id']}: {result['reason']}") print(f" current_hash: {result.get('current_hash')}") print(f" locked_hash: {result.get('locked_hash')}") return 0 if result["ok"] else 1 def cmd_plugin_verify_all(args: argparse.Namespace) -> int: results = verify_all_plugins() if args.json: print(to_json(results)) else: for result in results: mark = "OK" if result["ok"] else "FAIL" print(f"[{mark}] {result['id']}: {result['reason']}") return 0 if all(r["ok"] for r in results) else 1 def cmd_plugin_enable(args: argparse.Namespace) -> int: try: record = enable_plugin(args.plugin_id, force=args.force) except Exception as exc: # noqa: BLE001 print(f"Enable failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(plugin_to_dict(record))) else: print(f"Enabled plugin {record.id}") return 0 def cmd_plugin_disable(args: argparse.Namespace) -> int: try: record = disable_plugin(args.plugin_id) except Exception as exc: # noqa: BLE001 print(f"Disable failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(plugin_to_dict(record))) else: print(f"Disabled plugin {record.id}") return 0 def cmd_plugin_start(args: argparse.Namespace) -> int: try: state = start_plugin(args.plugin_id, force=args.force) except Exception as exc: # noqa: BLE001 print(f"Start failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(runtime_to_dict(state))) else: _print_kv(f"Runtime state for {args.plugin_id}:", runtime_to_dict(state)) return 0 def cmd_plugin_stop(args: argparse.Namespace) -> int: try: state = stop_plugin(args.plugin_id) except Exception as exc: # noqa: BLE001 print(f"Stop failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(runtime_to_dict(state))) else: _print_kv(f"Runtime state for {args.plugin_id}:", runtime_to_dict(state)) return 0 def cmd_plugin_health(args: argparse.Namespace) -> int: try: result = plugin_health(args.plugin_id) except Exception as exc: # noqa: BLE001 print(f"Health failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: _print_kv(f"Health for {args.plugin_id}:", result) return 0 if result.get("healthy") else 1 def cmd_plugin_call(args: argparse.Namespace) -> int: try: result = call_plugin(args.plugin_id, args.method, _parse_json_params(args.params), timeout=args.timeout, force=args.force) except Exception as exc: # noqa: BLE001 print(f"Call failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: _print_kv(f"RPC call {args.plugin_id}.{args.method}:", result) return 0 if result.get("ok") else 1 def cmd_plugin_logs(args: argparse.Namespace) -> int: try: result = read_plugin_logs(args.plugin_id, args.stream, args.lines) except Exception as exc: # noqa: BLE001 print(f"Logs failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: print(f"{result['path']} ({result['stream']}):") for line in result["lines"]: print(line) return 0 def cmd_runtime_list(args: argparse.Namespace) -> int: records = list_runtime_states() if args.json: print(to_json([runtime_to_dict(r) for r in records])) else: _print_runtime_table(records) return 0 def cmd_permission_provider_show(args: argparse.Namespace) -> int: data = provider_to_dict(get_permission_provider()) if args.json: print(to_json(data)) else: _print_kv("Permission provider:", data) return 0 def cmd_permission_provider_set(args: argparse.Namespace) -> int: try: state = set_permission_provider(args.plugin_id, force=args.force) except (PermissionErrorCVT, Exception) as exc: # noqa: BLE001 print(f"Set permission provider failed: {exc}", file=sys.stderr) return 1 data = provider_to_dict(state) if args.json: print(to_json(data)) else: _print_kv("Permission provider set:", data) return 0 def cmd_permission_grant(args: argparse.Namespace) -> int: try: result = grant_permission(args.actor, args.capability, _parse_json_params(args.scope)) except Exception as exc: # noqa: BLE001 print(f"Grant failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Permission grant:", data) return 0 def cmd_permission_revoke(args: argparse.Namespace) -> int: try: result = revoke_permission(args.actor, args.capability, _parse_json_params(args.scope)) except Exception as exc: # noqa: BLE001 print(f"Revoke failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Permission revoke:", data) return 0 def cmd_permission_check(args: argparse.Namespace) -> int: try: result = check_permission(args.actor, args.capability, _parse_json_params(args.scope)) except Exception as exc: # noqa: BLE001 print(f"Check failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Permission check:", data) return 0 if data.get("allow", False) else 2 def cmd_permission_list(args: argparse.Namespace) -> int: try: result = list_permissions(args.actor) except Exception as exc: # noqa: BLE001 print(f"List permissions failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Permission grants:", data) return 0 def cmd_auth_provider_show(args: argparse.Namespace) -> int: data = auth_provider_to_dict(get_auth_provider()) if args.json: print(to_json(data)) else: _print_kv("Auth provider:", data) return 0 def cmd_auth_provider_set(args: argparse.Namespace) -> int: try: state = set_auth_provider(args.plugin_id, force=args.force) except Exception as exc: # noqa: BLE001 print(f"Set auth provider failed: {exc}", file=sys.stderr) return 1 data = auth_provider_to_dict(state) if args.json: print(to_json(data)) else: _print_kv("Auth provider set:", data) return 0 def cmd_user_create(args: argparse.Namespace) -> int: try: password = _password_arg_or_prompt(args, confirm=not bool(args.password)) result = user_create(args.username, password) except Exception as exc: # noqa: BLE001 print(f"Create user failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("User created:", data) return 0 def cmd_user_list(args: argparse.Namespace) -> int: try: result = user_list() except Exception as exc: # noqa: BLE001 print(f"List users failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Users:", data) return 0 def cmd_auth_login(args: argparse.Namespace) -> int: try: password = _password_arg_or_prompt(args) result = login(args.username, password) except Exception as exc: # noqa: BLE001 print(f"Login failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Login result:", data) return 0 def cmd_auth_verify(args: argparse.Namespace) -> int: try: result = verify_session(args.token) except Exception as exc: # noqa: BLE001 print(f"Verify session failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Session verification:", data) return 0 if data.get("valid", False) else 2 def cmd_auth_logout(args: argparse.Namespace) -> int: try: result = logout(args.token) except Exception as exc: # noqa: BLE001 print(f"Logout failed: {exc}", file=sys.stderr) return 1 data = _extract_rpc_result(result) if args.json: print(to_json(data)) else: _print_kv("Logout result:", data) return 0 def cmd_action_list(args: argparse.Namespace) -> int: try: actions = list_actions() except ActionErrorCVT as exc: print(f"List actions failed: {exc}", file=sys.stderr) return 1 data = [action_to_dict(a) for a in actions] if args.json: print(to_json(data)) else: if not actions: print("No actions found.") else: headers = ["id", "capability", "dangerous", "runtime", "name"] rows = [[a.id, a.required_capability, "yes" if a.dangerous else "no", a.runtime, a.name] for a in actions] _print_table(headers, rows) return 0 def cmd_action_run(args: argparse.Namespace) -> int: try: result = run_action(args.action_id, _parse_json_params(args.params), actor=args.actor, force=args.force) except Exception as exc: # noqa: BLE001 print(f"Action failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: _print_kv(f"Action {args.action_id}:", result) return 0 def cmd_action_runs(args: argparse.Namespace) -> int: try: runs = list_runs(args.limit) except Exception as exc: # noqa: BLE001 print(f"List action runs failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(runs)) else: if not runs: print("No action runs yet.") else: headers = ["id", "created", "actor", "action", "status", "ms", "error"] rows = [[r["id"], r["created_at"], r["actor"], r["action_id"], r["status"], r["duration_ms"] or "-", r["error"] or "-"] for r in runs] _print_table(headers, rows) return 0 def cmd_recovery_status(args: argparse.Namespace) -> int: data = recovery_status_dict() if args.json: print(to_json(data)) else: _print_kv("Recovery status:", data) return 0 def cmd_recovery_disable_plugin(args: argparse.Namespace) -> int: try: result = recovery_disable_plugin(args.plugin_id) except Exception as exc: # noqa: BLE001 print(f"Recovery disable failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: _print_kv("Recovery disabled plugin:", result) return 0 def cmd_recovery_reset_auth(args: argparse.Namespace) -> int: try: result = recovery_reset_auth() except Exception as exc: # noqa: BLE001 print(f"Recovery reset auth failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: _print_kv("Recovery reset auth provider:", result) return 0 def cmd_recovery_reset_permissions(args: argparse.Namespace) -> int: try: result = recovery_reset_permissions() except Exception as exc: # noqa: BLE001 print(f"Recovery reset permissions failed: {exc}", file=sys.stderr) return 1 if args.json: print(to_json(result)) else: _print_kv("Recovery reset permission provider:", result) return 0 def cmd_setup(args: argparse.Namespace) -> int: argv = [] if args.host: argv += ["--host", args.host] if args.port: argv += ["--port", str(args.port)] if args.token: argv += ["--token", args.token] return setupwizard_main(argv) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="panelctl.py", description="CVT8858 headless micro-core CLI") parser.add_argument("--json", action="store_true", help="output machine-readable JSON where supported") sub = parser.add_subparsers(dest="command", required=True) for name, help_text, func in [ ("install", "initialize directories, secret, config, and database", cmd_install), ("upgrade", "apply pending database migrations", cmd_upgrade), ("doctor", "check whether the core installation is healthy", cmd_doctor), ("status", "show installation status", cmd_status), ("paths", "show resolved core paths", cmd_paths), ("version", "show core version", cmd_version), ]: p = sub.add_parser(name, help=help_text) p.set_defaults(func=func) p_setup = sub.add_parser("setup", help="start the visual Python+HTML setup wizard") p_setup.add_argument("--host", default="127.0.0.1", help="bind host; use 0.0.0.0 for token-protected temporary remote access") p_setup.add_argument("--port", type=int, default=8859, help="bind port") p_setup.add_argument("--token", help="setup token; default is random") p_setup.set_defaults(func=cmd_setup) plugin = sub.add_parser("plugin", help="manage plugins") plugin_sub = plugin.add_subparsers(dest="plugin_command", required=True) p_scan = plugin_sub.add_parser("scan", help="scan plugins/*/manifest.json") p_scan.set_defaults(func=cmd_plugin_scan) p_list = plugin_sub.add_parser("list", help="list registered plugins") p_list.add_argument("--hide-broken", action="store_true", help="hide broken plugin records") p_list.set_defaults(func=cmd_plugin_list) p_info = plugin_sub.add_parser("info", help="show plugin metadata") p_info.add_argument("plugin_id") p_info.set_defaults(func=cmd_plugin_info) p_lock = plugin_sub.add_parser("lock", help="optionally save current plugin file hash") p_lock.add_argument("plugin_id") p_lock.set_defaults(func=cmd_plugin_lock) p_verify = plugin_sub.add_parser("verify", help="optionally verify current files against saved lock") p_verify.add_argument("plugin_id") p_verify.set_defaults(func=cmd_plugin_verify) p_verify_all = plugin_sub.add_parser("verify-all", help="verify all registered plugins") p_verify_all.set_defaults(func=cmd_plugin_verify_all) p_enable = plugin_sub.add_parser("enable", help="enable a loaded plugin") p_enable.add_argument("plugin_id") p_enable.add_argument("--force", action="store_true", help="reserved; enable no longer requires hash verification") p_enable.set_defaults(func=cmd_plugin_enable) p_disable = plugin_sub.add_parser("disable", help="disable a plugin") p_disable.add_argument("plugin_id") p_disable.set_defaults(func=cmd_plugin_disable) p_start = plugin_sub.add_parser("start", help="start a plugin process or run oneshot hello") p_start.add_argument("plugin_id") p_start.add_argument("--force", action="store_true", help="start even when plugin is disabled") p_start.set_defaults(func=cmd_plugin_start) p_stop = plugin_sub.add_parser("stop", help="stop a detached daemon plugin") p_stop.add_argument("plugin_id") p_stop.set_defaults(func=cmd_plugin_stop) p_health = plugin_sub.add_parser("health", help="check plugin runtime health") p_health.add_argument("plugin_id") p_health.set_defaults(func=cmd_plugin_health) p_call = plugin_sub.add_parser("call", help="make one JSON Lines RPC call to a plugin") p_call.add_argument("plugin_id") p_call.add_argument("method") p_call.add_argument("--params", help='JSON object, e.g. \'{"message":"hi"}\'') p_call.add_argument("--timeout", type=float, help="RPC timeout seconds") p_call.add_argument("--force", action="store_true", help="call even when plugin is disabled") p_call.set_defaults(func=cmd_plugin_call) p_logs = plugin_sub.add_parser("logs", help="show plugin runtime logs") p_logs.add_argument("plugin_id") p_logs.add_argument("--stream", choices=["stdout", "stderr"], default="stdout") p_logs.add_argument("--lines", type=int, default=80) p_logs.set_defaults(func=cmd_plugin_logs) runtime = sub.add_parser("runtime", help="inspect runtime process records") runtime_sub = runtime.add_subparsers(dest="runtime_command", required=True) p_runtime_list = runtime_sub.add_parser("list", help="list plugin runtime records") p_runtime_list.set_defaults(func=cmd_runtime_list) permission = sub.add_parser("permission", help="manage permission provider and grants") permission_sub = permission.add_subparsers(dest="permission_command", required=True) permission_provider = permission_sub.add_parser("provider", help="show or set active permission provider") permission_provider_sub = permission_provider.add_subparsers(dest="permission_provider_command", required=True) p_perm_provider_show = permission_provider_sub.add_parser("show", help="show active permission provider") p_perm_provider_show.set_defaults(func=cmd_permission_provider_show) p_perm_provider_set = permission_provider_sub.add_parser("set", help="set active permission provider plugin") p_perm_provider_set.add_argument("plugin_id") p_perm_provider_set.add_argument("--force", action="store_true") p_perm_provider_set.set_defaults(func=cmd_permission_provider_set) p_perm_grant = permission_sub.add_parser("grant", help="grant a capability to an actor") p_perm_grant.add_argument("actor", help="e.g. user:admin, plugin:node-apps, local-cli, *") p_perm_grant.add_argument("capability", help="e.g. action.run:restart_service or site.*") p_perm_grant.add_argument("--scope", help='JSON object scope, e.g. \'{"site":"example.com"}\'') p_perm_grant.set_defaults(func=cmd_permission_grant) p_perm_revoke = permission_sub.add_parser("revoke", help="revoke a capability from an actor") p_perm_revoke.add_argument("actor") p_perm_revoke.add_argument("capability") p_perm_revoke.add_argument("--scope", help="JSON object scope") p_perm_revoke.set_defaults(func=cmd_permission_revoke) p_perm_check = permission_sub.add_parser("check", help="check whether an actor has a capability") p_perm_check.add_argument("actor") p_perm_check.add_argument("capability") p_perm_check.add_argument("--scope", help="JSON object scope") p_perm_check.set_defaults(func=cmd_permission_check) p_perm_list = permission_sub.add_parser("list", help="list grants") p_perm_list.add_argument("--actor", help="filter by actor") p_perm_list.set_defaults(func=cmd_permission_list) auth = sub.add_parser("auth", help="manage auth provider and sessions") auth_sub = auth.add_subparsers(dest="auth_command", required=True) auth_provider = auth_sub.add_parser("provider", help="show or set active auth provider") auth_provider_sub = auth_provider.add_subparsers(dest="auth_provider_command", required=True) p_auth_provider_show = auth_provider_sub.add_parser("show", help="show active auth provider") p_auth_provider_show.set_defaults(func=cmd_auth_provider_show) p_auth_provider_set = auth_provider_sub.add_parser("set", help="set active auth provider plugin") p_auth_provider_set.add_argument("plugin_id") p_auth_provider_set.add_argument("--force", action="store_true") p_auth_provider_set.set_defaults(func=cmd_auth_provider_set) user = sub.add_parser("user", help="manage users through the active auth provider") user_sub = user.add_subparsers(dest="user_command", required=True) p_user_create = user_sub.add_parser("create", help="create user through auth provider") p_user_create.add_argument("username") p_user_create.add_argument("--password", help="password; omit to prompt") p_user_create.set_defaults(func=cmd_user_create) p_user_list = user_sub.add_parser("list", help="list users through auth provider") p_user_list.set_defaults(func=cmd_user_list) p_auth_login = auth_sub.add_parser("login", help="create a session through auth provider") p_auth_login.add_argument("username") p_auth_login.add_argument("--password", help="password; omit to prompt") p_auth_login.set_defaults(func=cmd_auth_login) p_auth_verify = auth_sub.add_parser("verify", help="verify a session token through auth provider") p_auth_verify.add_argument("token") p_auth_verify.set_defaults(func=cmd_auth_verify) p_auth_logout = auth_sub.add_parser("logout", help="logout a session token through auth provider") p_auth_logout.add_argument("token") p_auth_logout.set_defaults(func=cmd_auth_logout) action = sub.add_parser("action", help="run whitelisted local actions") action_sub = action.add_subparsers(dest="action_command", required=True) p_action_list = action_sub.add_parser("list", help="list available actions") p_action_list.set_defaults(func=cmd_action_list) p_action_run = action_sub.add_parser("run", help="run a whitelisted action") p_action_run.add_argument("action_id") p_action_run.add_argument("--params", help='JSON object params, e.g. \'{"message":"hi"}\'') p_action_run.add_argument("--actor", default="local-cli", help="actor id used for permission check, default: local-cli") p_action_run.add_argument("--force", action="store_true", help="bypass permission provider; still audited") p_action_run.set_defaults(func=cmd_action_run) p_action_runs = action_sub.add_parser("runs", help="show recent action runs") p_action_runs.add_argument("--limit", type=int, default=20) p_action_runs.set_defaults(func=cmd_action_runs) recovery = sub.add_parser("recovery", help="local CLI recovery when auth/permission/UI plugins break") recovery_sub = recovery.add_subparsers(dest="recovery_command", required=True) p_recovery_status = recovery_sub.add_parser("status", help="show recovery status") p_recovery_status.set_defaults(func=cmd_recovery_status) p_recovery_disable = recovery_sub.add_parser("disable-plugin", help="disable a plugin and detach it if it is an active provider") p_recovery_disable.add_argument("plugin_id") p_recovery_disable.set_defaults(func=cmd_recovery_disable_plugin) p_recovery_reset_auth = recovery_sub.add_parser("reset-auth", help="detach active auth provider") p_recovery_reset_auth.set_defaults(func=cmd_recovery_reset_auth) p_recovery_reset_permissions = recovery_sub.add_parser("reset-permissions", help="detach active permission provider") p_recovery_reset_permissions.set_defaults(func=cmd_recovery_reset_permissions) return parser def main(argv: list[str] | None = None) -> int: # Be forgiving: allow --json either before or after subcommands. argparse # normally only accepts global options before the first subcommand. if argv is None: argv = sys.argv[1:] else: argv = list(argv) json_requested = "--json" in argv if json_requested: argv = [item for item in argv if item != "--json"] argv = ["--json", *argv] parser = build_parser() args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": raise SystemExit(main())
Save
Back
Encoding: utf-8, size: 31199, truncated: False