#!/usr/bin/env python3 """ Production Board command-line interface. Stdlib-only; runs on any Python >=3.8 without an extra install. Mirrors the Production Board HTTP API one-to-one - every endpoint you can hit with curl is also a subcommand here. Auth uses Bearer tokens. Usage: prodcli app banner + sign-in state prodcli login --token pat_… sign in with a personal access token prodcli login interactive (token or email + password) prodcli logout prodcli whoami | version | update prodcli list [--filter k=v]... [--sort k] [--limit N] [--all] [--fields a,b] [--cache N] prodcli get [--fields a,b] [--cache N] prodcli create [-- ... | --json '{...}' | --file p | --stdin | --csv p [--map k=COL,…]] [--dry-run] prodcli update [-- ... | --json '{...}' | --file p | --stdin | --csv p] [--dry-run] prodcli upsert --unique [--file p | --stdin | --csv p] [--dry-run] prodcli delete [--yes] | --file p | --stdin [--dry-run] prodcli profile list | use NAME | show | delete NAME prodcli auth list | use NAME | show | delete NAME # alias of profile prodcli telemetry on | off | status prodcli autoupdate on | off | status prodcli schema [] # local cheatsheet, no network prodcli config list | get K | set K V | unset K | keys | reset Global flags (work on any subcommand, before or after the verb): --profile NAME use the named credential profile (also XCLI_PROFILE env) --compact single-line JSON output (default: pretty) --format F output as F: json (default) | ndjson | table --retry N retry transient failures (429/5xx/network) up to N times --backoff exp|linear|off curve for the retry sleep (default: exp) --stderr-json machine-readable error envelopes on stderr (good for pipelines) --idempotency-key K send Idempotency-Key on writes (or --auto-idempotency for per-item UUIDs) Environment overrides (win over saved config, useful for CI): XCLI_NO_AUTOUPDATE=1 skip the once-per-day update check XCLI_NO_TELEMETRY=1 skip the anonymous usage analytics XCLI_PROFILE=NAME pick a profile without --profile XCLI_BASE_URL=… override the server URL (testing only) XCLI_TOKEN=… use this PAT for the current invocation only Exit codes: 0 success 1 generic error 2 bad usage (argparse) 3 unauthenticated (401) 4 forbidden (403) 5 not found (404) 6 validation (400/422) 7 conflict (409) 8 rate limited (429) 9 server error (5xx) 10 network error / cannot reach server """ from __future__ import annotations import argparse import csv as _csv import getpass import hashlib import io import json import os import platform import random import shutil import ssl import sys import tempfile import textwrap import time import urllib.error import urllib.parse import urllib.request import uuid from pathlib import Path from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple # ── Configuration ────────────────────────────────────────────────────── APP_SLUG = "prod" APP_NAME = "Production Board" COMMAND_NAME = "prodcli" CLI_VERSION = "1.2.7" DEFAULT_BASE = "https://qtssystem.com" MODELS: Dict[str, Dict[str, Any]] = json.loads(r"""{"board":{"ops":["list","read","create","update","delete"],"create_fields":["name","description","accent","settings","tags","columns"],"update_fields":["name","description","accent","settings","tags","columns"],"allowed_filters":["data__name","data__accent","data__tags","status","is_archived","owned_by"],"allowed_sorts":["created_at","updated_at","data__name"],"default_sort":"created_at","max_limit":50,"fields":[{"name":"name","type":"string","max_len":200},{"name":"tags","type":"tags"},{"name":"accent","type":"enum","values":["slate","gray","blue","indigo","violet","fuchsia","amber","orange","emerald","green","rose","red"]},{"name":"settings","type":"dict"},{"name":"description","type":"string","max_len":2000}]},"card":{"ops":["list","read","create","update","delete"],"create_fields":["title","description","status","position","priority","tags","assignee","due_date","board_id"],"update_fields":["title","description","status","position","priority","tags","assignee","due_date","board_id"],"allowed_filters":["data__status","data__priority","data__tags","data__assignee","data__board_id","status","is_archived","owned_by"],"allowed_sorts":["created_at","updated_at","data__position","data__status","data__priority","data__due_date"],"default_sort":"data__position","max_limit":200,"fields":[{"name":"tags","type":"tags"},{"name":"title","type":"string","max_len":200},{"name":"status","type":"string","max_len":64},{"name":"assignee","type":"string","max_len":64},{"name":"board_id","type":"string","max_len":64,"ref":{"type":"board","owned":true,"optional":true}},{"name":"due_date","type":"string","max_len":32},{"name":"position","type":"number"},{"name":"priority","type":"enum","values":["low","medium","high","critical"]},{"name":"description","type":"string","max_len":4000}]}}""") REFRESH_HEADER = "x-auth-refresh-token" # Exit codes - documented in --help. EXIT_OK = 0 EXIT_ERROR = 1 EXIT_USAGE = 2 # argparse default EXIT_AUTH = 3 EXIT_FORBIDDEN = 4 EXIT_NOT_FOUND = 5 EXIT_VALIDATION = 6 EXIT_CONFLICT = 7 EXIT_RATE_LIMITED = 8 EXIT_SERVER = 9 EXIT_NETWORK = 10 # Top-level columns the server lets us query directly. Anything else on # `--filter` / `--fields` is assumed to live under `data.*`. TOP_LEVEL_COLUMNS = frozenset(( "id", "type", "status", "owned_by", "created_by", "updated_by", "is_archived", "is_deleted", "created_at", "updated_at", "data", )) # Field names whose ``--`` would collide with one of our reserved # CLI flags. We skip flag generation for these and let the user supply # them via ``--json`` / ``--file`` instead - safer than crashing argparse # at startup if a model declares a field with one of these names. RESERVED_FIELD_FLAGS = frozenset(( # write helpers "json", "file", "stdin", "csv", "map", "dry-run", "dry_run", "continue-on-error", "continue_on_error", "idempotency-key", "idempotency_key", "auto-idempotency", "auto_idempotency", # upsert "unique", # list-only (won't actually clash on writes, but reserve to keep # behaviour identical regardless of which subcommand the field is on) "limit", "offset", "sort", "filter", "all", "fields", "where", "cache", "q", # global "profile", "compact", "retry", "backoff", "stderr-json", "stderr_json", "format", # login "password-stdin", "password_stdin", # delete "yes", # always-reserved "help", )) # Module-level toggles flipped by the global flags during ``main()``. # Putting them here keeps the rest of the code free of arg-passing # noise; argparse-driven config is the only reason any of them mutate. _compact_output: bool = False _stderr_json: bool = False _retry_count: int = 0 _backoff_kind: str = "exp" # "exp" | "linear" | "off" _output_format: str = "json" # "json" | "ndjson" | "table" _idempotency_key: Optional[str] = None _auto_idempotency: bool = False # ── Root state directory + legacy migration ──────────────────────────── def _root_dir() -> Path: p = Path(os.path.expanduser("~")) / f".prodcli" p.mkdir(parents=True, exist_ok=True) try: os.chmod(p, 0o700) except OSError: pass return p def _profiles_root() -> Path: p = _root_dir() / "profiles" p.mkdir(parents=True, exist_ok=True) return p def _profile_dir(name: str) -> Path: safe = name.strip().replace("/", "_").replace("\\", "_") or "default" p = _profiles_root() / safe p.mkdir(parents=True, exist_ok=True) try: os.chmod(p, 0o700) except OSError: pass return p def _migrate_legacy_state() -> None: """ Earlier versions kept credentials.json / device.json / update_check.json directly under the state root. Move them under profiles/default/ on first run so a quiet upgrade keeps the user signed in. Idempotent: if files already exist in the destination, we leave them alone. """ root = _root_dir() dest = _profile_dir("default") for name in ("credentials.json", "device.json", "update_check.json"): src = root / name if src.exists() and not (dest / name).exists(): try: shutil.move(str(src), str(dest / name)) except OSError: pass # ── Active profile resolution ────────────────────────────────────────── _active_profile: str = "default" def _resolve_active_profile(flag: Optional[str]) -> str: if flag: return flag env = os.environ.get("XCLI_PROFILE") if env: return env return _load_config().get("active_profile") or "default" # ── Global config (telemetry / autoupdate / active_profile) ──────────── def _config_path() -> Path: return _root_dir() / "config.json" def _load_config() -> Dict[str, Any]: p = _config_path() if not p.exists(): return {} try: return json.loads(p.read_text(encoding="utf-8") or "{}") except (json.JSONDecodeError, OSError): return {} def _save_config(cfg: Dict[str, Any]) -> None: p = _config_path() tmp = p.with_suffix(p.suffix + ".tmp") tmp.write_text(json.dumps(cfg, indent=2), encoding="utf-8") os.replace(tmp, p) try: os.chmod(p, 0o600) except OSError: pass def _config_set(key: str, value: Any) -> None: cfg = _load_config() if value is None: cfg.pop(key, None) else: cfg[key] = value _save_config(cfg) # ── Per-profile state files ──────────────────────────────────────────── def _read_profile_json(name: str) -> Dict[str, Any]: f = _profile_dir(_active_profile) / name if not f.exists(): return {} try: return json.loads(f.read_text(encoding="utf-8") or "{}") except (json.JSONDecodeError, OSError): return {} def _write_profile_json(name: str, data: Dict[str, Any]) -> None: f = _profile_dir(_active_profile) / name tmp = f.with_suffix(f.suffix + ".tmp") tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") os.replace(tmp, f) try: os.chmod(f, 0o600) except OSError: pass def _load_creds() -> Dict[str, Any]: return _read_profile_json("credentials.json") def _save_creds(data: Dict[str, Any]) -> None: _write_profile_json("credentials.json", data) def _clear_creds() -> None: f = _profile_dir(_active_profile) / "credentials.json" if f.exists(): try: f.unlink() except OSError: pass def _device_id() -> str: blob = _read_profile_json("device.json") did = blob.get("device_id") if isinstance(did, str) and len(did) >= 32: return did did = str(uuid.uuid4()) _write_profile_json("device.json", {"device_id": did}) return did _session_id_cache: Optional[str] = None def _session_id() -> str: global _session_id_cache if _session_id_cache is None: _session_id_cache = str(uuid.uuid4()) return _session_id_cache def _base_url() -> str: return os.environ.get("XCLI_BASE_URL") or DEFAULT_BASE def _auth_token() -> Optional[str]: return os.environ.get("XCLI_TOKEN") or _load_creds().get("token") or None # ── Telemetry + auto-update toggles ──────────────────────────────────── # Resolution: explicit env wins (CI determinism), then saved config, # then default-on. The off paths return False unconditionally so users # in restricted environments stay opted-out. def _telemetry_enabled() -> bool: if os.environ.get("XCLI_NO_TELEMETRY", "").lower() in ("1", "true", "yes"): return False saved = _load_config().get("telemetry") if saved == "off": return False return True def _autoupdate_enabled() -> bool: if os.environ.get("XCLI_NO_AUTOUPDATE", "").lower() in ("1", "true", "yes"): return False saved = _load_config().get("autoupdate") if saved == "off": return False return True # ── HTTP transport ───────────────────────────────────────────────────── class ApiError(RuntimeError): def __init__(self, status: int, message: str, body: Any = None): super().__init__(f"HTTP {status}: {message}") self.status = status self.message = message self.body = body def _exit_code_for_status(status: int) -> int: if status == 401: return EXIT_AUTH if status == 403: return EXIT_FORBIDDEN if status == 404: return EXIT_NOT_FOUND if status in (400, 422): return EXIT_VALIDATION if status == 409: return EXIT_CONFLICT if status == 429: return EXIT_RATE_LIMITED if status >= 500: return EXIT_SERVER if status == 0: return EXIT_NETWORK return EXIT_ERROR _USER_AGENT = f"prodcli/1.2.7 ({platform.system().lower()}; py{platform.python_version()})" def _maybe_persist_refresh(headers: Any) -> None: """ Slide the saved session token if the server rotated it. Skipped for PAT credentials (the BE never refreshes those) and for one-shot env overrides (XCLI_TOKEN is intentionally transient). """ if os.environ.get("XCLI_TOKEN"): return try: new_tok = headers.get(REFRESH_HEADER) except AttributeError: new_tok = None if not new_tok: return creds = _load_creds() if creds.get("kind") != "session": return creds["token"] = new_tok creds["saved_at"] = int(time.time()) _save_creds(creds) # Statuses + classes of failure we'll retry when the user opts in. _RETRYABLE_STATUSES = frozenset((408, 425, 429, 500, 502, 503, 504)) def _backoff_delay(attempt: int, retry_after: Optional[float]) -> float: """ Sleep duration before the next retry. ``Retry-After`` (seconds) wins when the server provided one; otherwise the configured backoff curve drives. Exponential is bounded at 60 s and gets a little jitter so a fleet of CLIs doesn't synchronise. """ if retry_after is not None and retry_after > 0: return min(retry_after, 60.0) if _backoff_kind == "off": return 0.0 if _backoff_kind == "linear": return min(1.0 + attempt * 1.0, 30.0) base = min(2.0 ** attempt, 60.0) return base + random.uniform(0, base * 0.1) def _parse_retry_after(headers: Any) -> Optional[float]: try: raw = headers.get("Retry-After") except AttributeError: raw = None if not raw: return None try: return float(raw) except ValueError: # HTTP-date form is also legal; we don't bother parsing it # accurately because rate-limit responses overwhelmingly use # the integer-seconds variant. Fall through to backoff curve. return None class _StripAuthRedirectHandler(urllib.request.HTTPRedirectHandler): """ Default redirect handler keeps every header across hops, including the Authorization line. If the redirect target is on a different origin we strip the Bearer token before following - protects against a misconfigured proxy bouncing requests to an internal host where the PAT shouldn't land. """ def redirect_request(self, req, fp, code, msg, headers, newurl): new_req = super().redirect_request(req, fp, code, msg, headers, newurl) if new_req is None: return None try: from_host = urllib.parse.urlparse(req.full_url).netloc.lower() to_host = urllib.parse.urlparse(newurl).netloc.lower() except Exception: return new_req if from_host != to_host: for h in ("Authorization", "authorization"): try: new_req.headers.pop(h, None) new_req.unredirected_hdrs.pop(h, None) except Exception: pass return new_req def _ensure_opener() -> None: """ Install our custom redirect handler as the default opener once. Idempotent - safe to call from multiple entry points (main, the autoupdate fetch in ``_maybe_autoupdate``, etc). """ if getattr(_ensure_opener, "_done", False): return opener = urllib.request.build_opener(_StripAuthRedirectHandler()) urllib.request.install_opener(opener) setattr(_ensure_opener, "_done", True) def _http( method: str, path: str, *, params: Optional[Dict[str, Any]] = None, body: Any = None, auth: bool = True, timeout: float = 30.0, ) -> Any: """ Issue one HTTP request and return parsed JSON (or None for 204). Honours the global retry / backoff policy on transient failures (429, 5xx, network errors) when ``--retry N`` is set. """ url = _base_url().rstrip("/") + path if params: # Lists → repeated keys (matches the wire-form filter DSL). flat: List[Tuple[str, str]] = [] for k, v in params.items(): if v is None: continue if isinstance(v, (list, tuple)): for item in v: flat.append((k, str(item))) else: flat.append((k, str(v))) url += "?" + urllib.parse.urlencode(flat) data = None headers = {"Accept": "application/json", "User-Agent": _USER_AGENT} if _telemetry_enabled(): headers["X-Analytics-Device-Id"] = _device_id() headers["X-Analytics-Session-Id"] = _session_id() headers["X-CLI-Channel"] = "cli" headers["X-CLI-Version"] = CLI_VERSION if body is not None: data = json.dumps(body).encode("utf-8") headers["Content-Type"] = "application/json" if auth: tok = _auth_token() if not tok: raise ApiError(401, f"Not signed in. Run `prodcli login` first.") headers["Authorization"] = f"Bearer {tok}" # Idempotency-Key on writes (POST/PATCH/DELETE). Caller can pass an # explicit value or rely on --auto-idempotency for a per-item UUID. # Server may not act on it today; emitting it is forward-compatible # and free for safe-retry consumers. if method.upper() in ("POST", "PATCH", "DELETE"): if _idempotency_key: headers["Idempotency-Key"] = _idempotency_key elif _auto_idempotency: headers["Idempotency-Key"] = str(uuid.uuid4()) _ensure_opener() ctx = ssl.create_default_context() last_err: Optional[ApiError] = None attempts = max(1, _retry_count + 1) for attempt in range(attempts): req = urllib.request.Request(url, data=data, method=method, headers=headers) try: with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: # Refresh-token persistence is gated on auth=True so an # accidental refresh header on an open route never # rewrites our credentials file. if auth: _maybe_persist_refresh(resp.headers) raw = resp.read() if not raw: return None return json.loads(raw.decode("utf-8")) except urllib.error.HTTPError as e: raw = b"" try: raw = e.read() or b"" except Exception: pass parsed: Any = None msg = e.reason or "request failed" try: parsed = json.loads(raw.decode("utf-8")) if raw else None if isinstance(parsed, dict): i18n = parsed.get("i18n") if isinstance(i18n, dict) and i18n.get("en"): msg = i18n["en"] elif parsed.get("detail"): msg = str(parsed["detail"]) except (UnicodeDecodeError, json.JSONDecodeError): pass last_err = ApiError(e.code, msg, parsed) if e.code in _RETRYABLE_STATUSES and attempt + 1 < attempts: time.sleep(_backoff_delay(attempt, _parse_retry_after(e.headers))) continue raise last_err except urllib.error.URLError as e: last_err = ApiError(0, f"network error: {e.reason}") if attempt + 1 < attempts: time.sleep(_backoff_delay(attempt, None)) continue raise last_err # Unreachable - the loop either returns or re-raises. if last_err: raise last_err raise ApiError(0, "request failed") # ── Telemetry events ─────────────────────────────────────────────────── def _emit_event(event_type: str, **fields: Any) -> None: """Fire-and-forget analytics. Never raises into the caller.""" if not _telemetry_enabled(): return try: evt: Dict[str, Any] = { "type": event_type, "ts_client": int(time.time()), "meta": {k: v for k, v in fields.items() if v is not None}, } body = { "device_id": _device_id(), "session_id": _session_id(), "events": [evt], "meta": { "channel": "cli", "cli_version": CLI_VERSION, "command_name": COMMAND_NAME, "os": f"{platform.system()} {platform.release()}", "py": platform.python_version(), }, } _http("POST", "/xapi2/analytics/track", body=body, auth=False, timeout=4.0) except Exception: pass # ── Auto-update ──────────────────────────────────────────────────────── def _check_due() -> bool: blob = _read_profile_json("update_check.json") last = blob.get("checked_at") if not isinstance(last, (int, float)): return True return (time.time() - float(last)) >= 86400 def _record_check(latest: Optional[str]) -> None: _write_profile_json("update_check.json", {"checked_at": int(time.time()), "latest": latest}) def _cleanup_stale_old() -> None: """Remove any leftover ``.old`` from a previous Windows-style update.""" try: target = Path(__file__).resolve() except OSError: return leftover = target.with_name(target.name + ".old") if leftover.exists(): try: leftover.unlink() except OSError: pass def _self_replace(new_source: bytes) -> None: """ Atomically swap the running file for the new source. POSIX is straightforward (``os.replace`` honours an open file handle). On Windows the in-use file lock can block ``os.replace`` outright - we rename the live file aside first, write the new one in place, then remove the leftover at the next CLI invocation via ``_cleanup_stale_old``. """ target = Path(__file__).resolve() fd, tmp_name = tempfile.mkstemp(prefix=COMMAND_NAME + ".", dir=str(target.parent)) try: with os.fdopen(fd, "wb") as fh: fh.write(new_source) os.chmod(tmp_name, 0o755) except OSError as e: try: os.unlink(tmp_name) except OSError: pass raise e aside = target.with_name(target.name + ".old") try: os.replace(tmp_name, target) return except OSError: pass # Windows path: move current aside, then move new into place. try: if aside.exists(): aside.unlink() except OSError: pass try: os.replace(target, aside) os.replace(tmp_name, target) except OSError as e: # Best effort: try to clean up the temp file we wrote. try: os.unlink(tmp_name) except OSError: pass raise e def _looks_like_valid_client(blob: bytes) -> bool: """Sanity-check the bytes the server returned before we replace ourselves.""" if not blob or len(blob) < 1024: return False if not blob.startswith(b"#!"): return False for marker in (b"COMMAND_NAME", b"APP_SLUG", b"CLI_VERSION", b"def main"): if marker not in blob: return False try: compile(blob, "", "exec") except SyntaxError: return False return True def _maybe_autoupdate(force: bool = False) -> Optional[str]: if not force: if not _autoupdate_enabled(): return None if not _check_due(): return None try: info = _http("GET", "/xapi2/cli/version", auth=False, timeout=5.0) except ApiError: _record_check(None) return None if not isinstance(info, dict): _record_check(None) return None latest = info.get("version") _record_check(latest if isinstance(latest, str) else None) if not isinstance(latest, str) or latest == CLI_VERSION: return None try: url = _base_url().rstrip("/") + "/xapi2/cli/script" req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT}) with urllib.request.urlopen(req, timeout=10.0) as resp: new_source = resp.read() except Exception: return None if not _looks_like_valid_client(new_source): return None try: _self_replace(new_source) except OSError: return None return latest # ── Output ───────────────────────────────────────────────────────────── def _is_tty() -> bool: return sys.stdout.isatty() def _color(text: str, code: str) -> str: if not _is_tty(): return text return f"\033[{code}m{text}\033[0m" def _ok(msg: str) -> None: print(_color("✓", "32") + " " + msg) def _warn(msg: str) -> None: print(_color("!", "33") + " " + msg, file=sys.stderr) def _die(msg: str, code: int = EXIT_ERROR, *, status: Optional[int] = None, hint: Optional[str] = None) -> None: if _stderr_json: envelope: Dict[str, Any] = { "error": msg, "exit_code": code, } # Omit status when it's 0 (network) - "status: 0" is confusing # in machine-readable output. The exit_code already carries the # network signal (10). if status is not None and status != 0: envelope["http_status"] = status if hint: envelope["hint"] = hint print(json.dumps(envelope, ensure_ascii=False), file=sys.stderr) else: print(_color("✗", "31") + " " + msg, file=sys.stderr) if hint: print(" " + hint, file=sys.stderr) sys.exit(code) def _die_api(e: ApiError) -> None: hint = None if e.status == 401: hint = f"run `prodcli login` (token may have expired)" elif e.status == 403: hint = "if using a personal access token, check that the token's scope includes this op (Integrations menu)" elif e.status == 404: hint = "the object doesn't exist or you can't read it (the two cases are conflated for security)" elif e.status == 409: hint = "use `upsert --unique ` instead of `create` to make this idempotent" elif e.status in (400, 422): hint = "use --dry-run to inspect the request body the server is rejecting" elif e.status == 429: hint = "rate limited; pass --retry N --backoff exp to auto-retry" elif e.status == 0: hint = "could not reach the server; check your network and `--profile` settings" _die(e.message, _exit_code_for_status(e.status), status=e.status, hint=hint) def _print_json(value: Any) -> None: """Pure JSON sink. Prefer ``_emit`` for command output - it respects the ``--format`` choice. Use this directly only for machine-only blobs (dry-run envelopes, schema dumps that have to stay JSON regardless of user format).""" if _compact_output: print(json.dumps(value, separators=(",", ":"), ensure_ascii=False)) else: print(json.dumps(value, indent=2, ensure_ascii=False)) def _emit(value: Any, *, fields: Optional[List[str]] = None) -> None: """ Render command output per ``--format``. Falls back to JSON for shapes the chosen format can't represent (e.g. ``table`` on a non-list response). """ fmt = _output_format if fmt == "ndjson": _emit_ndjson(value) return if fmt == "table": if _emit_table(value, fields): return # Fallthrough: the value wasn't tabular; honour --compact. _print_json(value) def _emit_ndjson(value: Any) -> None: """One JSON object per line. Lists are unwrapped; envelopes too.""" rows: Iterable[Any] if isinstance(value, list): rows = value elif isinstance(value, dict) and isinstance(value.get("data"), list): rows = value["data"] else: rows = [value] for row in rows: sys.stdout.write(json.dumps(row, separators=(",", ":"), ensure_ascii=False)) sys.stdout.write("\n") sys.stdout.flush() def _flatten_for_table(obj: Any, fields: Optional[List[str]]) -> Optional[Dict[str, Any]]: """ Resolve a list of paths into a flat key/value dict. Returns ``None`` when the object isn't a tabular row (e.g. dry-run envelopes, error blobs, schema dumps) so the caller can fall back to JSON. """ if not isinstance(obj, dict): return None if fields: return {p: _resolve_path(obj, p) for p in fields} # Heuristic: tabular rows look like CRUD objects - they carry an # ``id`` and usually a ``data`` dict. Anything else (dry-run blobs, # error envelopes, schema responses) gets None so JSON wins. if "id" not in obj or not isinstance(obj.get("data"), dict): return None out: Dict[str, Any] = {"id": obj["id"]} if "status" in obj: out["status"] = obj["status"] data = obj["data"] for key in ("name", "email", "title", "label"): if key in data: out[key] = data[key] if len(out) <= (2 if "status" in out else 1): for k, v in list(data.items())[:4]: if isinstance(v, (str, int, float, bool)) or v is None: out[k] = v return out def _emit_table(value: Any, fields: Optional[List[str]]) -> bool: """Render value as a fixed-width ASCII table. Returns False when the value can't sensibly be tabulated; the caller falls back to JSON.""" rows: List[Dict[str, Any]] if isinstance(value, list): rows = [] for item in value: flat = _flatten_for_table(item, fields) if flat is None: return False rows.append(flat) elif isinstance(value, dict) and isinstance(value.get("data"), list): rows = [] for item in value["data"]: flat = _flatten_for_table(item, fields) if flat is None: return False rows.append(flat) elif isinstance(value, dict): flat = _flatten_for_table(value, fields) if flat is None: return False rows = [flat] else: return False if not rows: print("(no rows)") return True # Stable column order: union of keys, preserving first-seen order. cols: List[str] = [] seen: set = set() for row in rows: for k in row: if k not in seen: seen.add(k); cols.append(k) def _cell(v: Any) -> str: if v is None: return "" if isinstance(v, (dict, list)): return json.dumps(v, separators=(",", ":"), ensure_ascii=False) s = str(v) return s if len(s) <= 60 else s[:57] + "…" widths = [len(c) for c in cols] text_rows: List[List[str]] = [] for row in rows: text = [_cell(row.get(c)) for c in cols] for i, t in enumerate(text): if len(t) > widths[i]: widths[i] = len(t) text_rows.append(text) sep = " " header = sep.join(c.ljust(widths[i]) for i, c in enumerate(cols)) rule = sep.join("-" * widths[i] for i in range(len(cols))) print(header) print(rule) for text in text_rows: print(sep.join(text[i].ljust(widths[i]) for i in range(len(cols)))) return True # ── Field projection (--fields) ──────────────────────────────────────── def _resolve_path(value: Any, path: str) -> Any: """ Walk a dotted path. Unprefixed names that aren't a top-level column auto-resolve to ``data.`` so users can write ``--fields name`` instead of ``--fields data.name``. """ if "." not in path and path not in TOP_LEVEL_COLUMNS: path = f"data.{path}" cur: Any = value for part in path.split("."): if isinstance(cur, dict): cur = cur.get(part) else: return None return cur def _project_one(obj: Dict[str, Any], paths: List[str]) -> Dict[str, Any]: out: Dict[str, Any] = {} for p in paths: out[p] = _resolve_path(obj, p) return out def _project_response(resp: Any, paths: Optional[List[str]]) -> Any: if not paths: return resp if isinstance(resp, dict) and isinstance(resp.get("data"), list): return { **{k: v for k, v in resp.items() if k != "data"}, "data": [_project_one(item, paths) if isinstance(item, dict) else item for item in resp["data"]], } if isinstance(resp, list): return [_project_one(item, paths) if isinstance(item, dict) else item for item in resp] if isinstance(resp, dict): return _project_one(resp, paths) return resp # ── Friendly filter mapping ──────────────────────────────────────────── def _friendly_filter_map(type_name: str) -> Dict[str, str]: """ Build {friendly_key: wire_key}. Wire keys live in ``allowed_filters`` (e.g. ``data__name``); the friendly form drops the ``data__`` prefix and converts the remaining ``__`` to dots so nested paths read naturally. """ out: Dict[str, str] = {} cfg = MODELS.get(type_name) or {} for wire in (cfg.get("allowed_filters") or []): if not isinstance(wire, str): continue if wire.startswith("data__"): friendly = wire[len("data__"):].replace("__", ".") else: friendly = wire out.setdefault(friendly, wire) return out def _resolve_filter_key(type_name: str, key: str) -> str: """ Translate a user-supplied key to the wire form. Pass-through if the key already looks wire-form (``data__…`` or a top-level column), otherwise look it up in the friendly map. """ if key.startswith("data__") or key in TOP_LEVEL_COLUMNS: return key fmap = _friendly_filter_map(type_name) return fmap.get(key, key) # ── Bulk input loader ────────────────────────────────────────────────── def _load_bulk_input(file_arg: Optional[str], stdin_arg: bool) -> Optional[Any]: """ Returns the parsed JSON from --file / --stdin, or None when neither was supplied. ``--file -`` is also accepted as a stdin alias so `cmd create --file - < foo.json` works. UTF-8 BOMs (common from Windows editors) are silently stripped. """ if not file_arg and not stdin_arg: return None raw: str src_label: str if stdin_arg or file_arg == "-": raw = sys.stdin.read() src_label = "" else: try: raw = Path(file_arg or "").read_text(encoding="utf-8-sig") except OSError as e: _die(f"--file: {e}") return None # unreachable src_label = file_arg or "" if raw.startswith(""): raw = raw[1:] try: return json.loads(raw) except json.JSONDecodeError as e: _die(f"{src_label}: invalid JSON ({e})", EXIT_VALIDATION) return None # unreachable def _load_csv_input(csv_arg: Optional[str], mapping_arg: Optional[str]) -> Optional[List[Dict[str, Any]]]: """ Read a CSV file (or stdin if ``csv_arg == '-'``) and return a list of {column: value} dicts. ``--map "out=In Header,email=Email"`` lets callers rename headers without editing the source file - keys land in the API body under the *output* names. """ if not csv_arg: return None if csv_arg == "-": raw = sys.stdin.read() src_label = "" else: try: raw = Path(csv_arg).read_text(encoding="utf-8-sig") except OSError as e: _die(f"--csv: {e}") return None # unreachable src_label = csv_arg if raw.startswith(""): raw = raw[1:] rename: Dict[str, str] = {} if mapping_arg: for pair in mapping_arg.split(","): pair = pair.strip() if not pair: continue if "=" not in pair: _die(f"--map: expected key=value, got {pair!r}", EXIT_USAGE) k, _, v = pair.partition("=") rename[v.strip()] = k.strip() # rename FROM CSV header → output key try: reader = _csv.DictReader(io.StringIO(raw)) rows: List[Dict[str, Any]] = [] for row in reader: cleaned: Dict[str, Any] = {} for csv_header, value in row.items(): if csv_header is None: continue key = rename.get(csv_header, csv_header) if value is None or value == "": continue cleaned[key] = _coerce_value(value) if cleaned: rows.append(cleaned) return rows except _csv.Error as e: _die(f"{src_label}: invalid CSV ({e})", EXIT_VALIDATION) return None # unreachable # ── Banner ───────────────────────────────────────────────────────────── def _banner() -> None: title = f"Production Board CLI v1.2.7" rule = "─" * len(title) print() print(_color(title, "1")) print(rule) print(f"server: {_base_url()}") print(f"profile: {_active_profile}") creds = _load_creds() if creds.get("token"): kind = creds.get("kind") or ( "personal access token" if str(creds.get("token", "")).startswith("pat_") else "session" ) ident = creds.get("email") or creds.get("user_id") or "?" print("signed in as " + _color(str(ident), "1") + f" ({kind})") else: print("not signed in - run `" + COMMAND_NAME + " login` to start.") print() print("Common commands:") print(f" prodcli login --token pat_…") print(f" prodcli whoami") for tname in list(MODELS.keys())[:3]: print(f" prodcli {tname} list") if len(MODELS) > 3: print(f" …and {len(MODELS) - 3} more models. Run `prodcli --help`.") print() print(f"Docs: {_base_url().rstrip('/')}/docs/cli") # ── Auth commands ────────────────────────────────────────────────────── def _validate_token_and_save(token: str, *, kind: str, email: Optional[str] = None) -> Dict[str, Any]: _save_creds({"token": token, "kind": kind}) try: me = _http("GET", "/xapi2/auth/me") except ApiError as e: _clear_creds() _die(f"login failed: {e.message}", _exit_code_for_status(e.status)) if not isinstance(me, dict): _clear_creds() _die("login failed: unexpected response") _save_creds({ "token": token, "kind": kind, "user_id": me.get("id"), "email": me.get("email") or email, "saved_at": int(time.time()), }) return me def _login_with_password(email: str, password: str) -> Dict[str, Any]: body = {"email": email, "password": password} try: resp = _http("POST", "/xapi2/auth/login", body=body, auth=False) except ApiError as e: _die(f"login failed: {e.message}", _exit_code_for_status(e.status)) if not isinstance(resp, dict) or not resp.get("access_token"): _die("login failed: unexpected response") user = resp.get("user") or {} _save_creds({ "token": resp["access_token"], "kind": "session", "user_id": user.get("id"), "email": user.get("email") or email, "saved_at": int(time.time()), }) return user if isinstance(user, dict) else {} def cmd_login(args: argparse.Namespace) -> int: token: Optional[str] = args.token email: Optional[str] = args.email password: Optional[str] = args.password if getattr(args, "password_stdin", False): # Read until EOF and strip the trailing newline shells add. # `printf` / `echo -n` produce different results; tolerating # both keeps `echo "$P" | login --password-stdin` working # without surprises. password = sys.stdin.read().rstrip("\r\n") if not password: _die("--password-stdin: stdin was empty", EXIT_USAGE) if not token and not email: print(f"Sign in to Production Board (profile: {_active_profile})") print("Paste a personal access token, or press Enter to use email + password.") try: token = (input("Token: ").strip() or None) except (EOFError, KeyboardInterrupt): print(); return 130 if not token: try: email = input("Email: ").strip() or None except (EOFError, KeyboardInterrupt): print(); return 130 if not email: _die("email is required", EXIT_USAGE) if token: kind = "pat" if token.startswith("pat_") else "session" me = _validate_token_and_save(token, kind=kind) _emit_event("cli.login", method="token") _ok(f"signed in as {me.get('email') or me.get('id') or '?'} (profile: {_active_profile})") return 0 # Email path. Prompt for the password if it wasn't passed - flagged # passwords leak into shell history, so the prompt is actually the # ergonomic default, not a fallback. if email and not password: try: password = getpass.getpass("Password: ") except (EOFError, KeyboardInterrupt): print(); return 130 if not email or not password: _die("email and password required (or use --token)", EXIT_USAGE) user = _login_with_password(email, password) _emit_event("cli.login", method="password") _ok(f"signed in as {user.get('email') or email} (profile: {_active_profile})") return 0 def cmd_logout(_args: argparse.Namespace) -> int: if not _load_creds().get("token"): _warn("not signed in") return 0 _clear_creds() _emit_event("cli.logout") _ok(f"signed out (profile: {_active_profile})") return 0 def cmd_whoami(_args: argparse.Namespace) -> int: if not _auth_token(): _die("not signed in", EXIT_AUTH) try: me = _http("GET", "/xapi2/auth/me") except ApiError as e: _die_api(e) _print_json(me) return 0 def cmd_version(_args: argparse.Namespace) -> int: print(f"prodcli 1.2.7") print(f"server: {_base_url()}") print(f"profile: {_active_profile}") return 0 def cmd_update(_args: argparse.Namespace) -> int: if not _autoupdate_enabled(): # Be honest: the user explicitly asked us to update, so we run # the check anyway, but tell them their saved/env setting is # off so they understand why future runs won't auto-pick it up. _warn("autoupdate is disabled in your config / environment - running this check anyway.") _warn(f"to enable persistent auto-updates: `prodcli autoupdate on`") new_version = _maybe_autoupdate(force=True) if not new_version: print("already up to date") return 0 _ok(f"updated to {new_version}") # POSIX: re-exec into the new bytes so the user sees an immediate # `version` output. Windows: `os.execv` has historic quirks (parent # detach, lost stdout) - just print + exit, next invocation picks # up the new bytes naturally. if sys.platform != "win32": try: os.execv(sys.executable, [sys.executable, str(Path(__file__).resolve()), "version"]) except OSError: pass print(f" next run of `prodcli` will use the new version.") return 0 # ── Profile + telemetry + autoupdate commands ───────────────────────── def cmd_profile(args: argparse.Namespace) -> int: op = args.profile_op if op == "list" or op is None: # Union of on-disk profile dirs + the active profile name so a # freshly-`profile use'd` name appears even before its first # login materialises a directory. names = {p.name for p in _profiles_root().iterdir() if p.is_dir()} names.add(_active_profile) names.add("default") for n in sorted(names): marker = "* " if n == _active_profile else " " creds = _profile_dir(n) / "credentials.json" state = " (signed in)" if creds.exists() else "" print(f"{marker}{n}{state}") return 0 if op == "show": creds = _load_creds() print(json.dumps({ "profile": _active_profile, "signed_in": bool(creds.get("token")), "kind": creds.get("kind"), "email": creds.get("email"), "user_id": creds.get("user_id"), }, indent=2)) return 0 if op == "use": if not args.name: _die("profile use: NAME required", EXIT_USAGE) _config_set("active_profile", args.name) _ok(f"active profile: {args.name}") return 0 if op == "delete": if not args.name: _die("profile delete: NAME required", EXIT_USAGE) target = _profiles_root() / args.name if not target.exists(): _die(f"no such profile: {args.name}", EXIT_NOT_FOUND) if args.name == "default" and not args.yes: _die("refusing to delete the default profile without --yes", EXIT_USAGE) try: shutil.rmtree(target) except OSError as e: _die(f"could not delete profile: {e}") cfg = _load_config() if cfg.get("active_profile") == args.name: cfg.pop("active_profile", None) _save_config(cfg) _ok(f"deleted profile {args.name}") return 0 _die(f"unknown profile op: {op}", EXIT_USAGE) return EXIT_USAGE # unreachable def cmd_schema(args: argparse.Namespace) -> int: """ Surface the same model metadata the docs page renders, locally, without a network round-trip. Useful for hand-driven exploration and as the AI-prompt-ready cheatsheet a user pastes into a chat. """ name = getattr(args, "model", None) if name and name not in MODELS: _die( f"unknown model: {name}", EXIT_NOT_FOUND, hint=f"try `prodcli schema` to list available models", ) if not name: rows: List[Dict[str, Any]] = [] for tname, cfg in MODELS.items(): rows.append({ "model": tname, "ops": ", ".join(cfg.get("ops") or []), "max": cfg.get("max_limit"), "fields": len(cfg.get("create_fields") or []), }) _emit(rows) return 0 cfg = MODELS[name] friendly = sorted(_friendly_filter_map(name).keys()) schema = { "model": name, "ops": list(cfg.get("ops") or []), "create_fields": list(cfg.get("create_fields") or []), "update_fields": list(cfg.get("update_fields") or []), "filter_keys": friendly, "sort_keys": list(cfg.get("allowed_sorts") or []), "default_sort": cfg.get("default_sort"), "max_limit": cfg.get("max_limit"), "docs_url": f"{_base_url().rstrip('/')}/docs/types/{name}", } _print_json(schema) return 0 _CONFIG_KEYS_DOC = ( "format default --format (json | ndjson | table)\n" "compact default --compact (true / false)\n" "stderr_json default --stderr-json (true / false)\n" "retry default --retry N (integer)\n" "backoff default --backoff (exp | linear | off)\n" "default_fields. default --fields for that model (e.g. id,name,email)\n" "telemetry on / off (also via `telemetry on|off`)\n" "autoupdate on / off (also via `autoupdate on|off`)\n" "active_profile current profile name (also via `profile use NAME`)" ) def _coerce_config_value(key: str, raw: str) -> Any: bool_keys = {"compact", "stderr_json"} int_keys = {"retry"} if key in bool_keys: if raw.lower() in ("true", "1", "yes", "on"): return True if raw.lower() in ("false", "0", "no", "off"): return False _die(f"`{key}` must be a boolean (true/false)", EXIT_USAGE) if key in int_keys: try: return int(raw) except ValueError: _die(f"`{key}` must be an integer", EXIT_USAGE) return raw def cmd_config(args: argparse.Namespace) -> int: op = args.config_op or "list" cfg = _load_config() if op in ("list", "get") and not getattr(args, "key", None): view = { "saved": cfg, "effective": { "telemetry": "on" if _telemetry_enabled() else "off", "autoupdate": "on" if _autoupdate_enabled() else "off", "active_profile": _active_profile, }, } _print_json(view) return 0 if op == "get": cur: Any = cfg for part in args.key.split("."): if isinstance(cur, dict): cur = cur.get(part) else: cur = None break _print_json({args.key: cur}) return 0 if op == "set": if not args.key or args.value is None: _die("config set: KEY VALUE required", EXIT_USAGE) coerced = _coerce_config_value(args.key.split(".")[-1], args.value) cur = cfg parts = args.key.split(".") for part in parts[:-1]: nxt = cur.get(part) if not isinstance(nxt, dict): nxt = {} cur[part] = nxt cur = nxt cur[parts[-1]] = coerced _save_config(cfg) _ok(f"set {args.key} = {json.dumps(coerced)}") return 0 if op == "unset": if not args.key: _die("config unset: KEY required", EXIT_USAGE) cur = cfg parts = args.key.split(".") for part in parts[:-1]: nxt = cur.get(part) if not isinstance(nxt, dict): _ok(f"{args.key} was not set") return 0 cur = nxt if parts[-1] in cur: del cur[parts[-1]] _save_config(cfg) _ok(f"unset {args.key}") else: _ok(f"{args.key} was not set") return 0 if op == "keys": print(_CONFIG_KEYS_DOC) return 0 if op == "reset": if not args.yes and _is_tty(): try: ok = input("clear all saved config? [y/N] ").strip().lower() except (EOFError, KeyboardInterrupt): print(); return 130 if ok not in ("y", "yes"): print("aborted") return 0 elif not args.yes: _die("config reset: pass --yes (no TTY for confirmation prompt)", EXIT_USAGE) _save_config({}) _ok("config reset") return 0 _die(f"unknown config op: {op}", EXIT_USAGE) return EXIT_USAGE def _toggle_cmd(key: str, label: str): def run(args: argparse.Namespace) -> int: op = args.toggle_op or "status" if op == "status": env_off = ( os.environ.get("XCLI_NO_TELEMETRY", "").lower() in ("1", "true", "yes") if key == "telemetry" else os.environ.get("XCLI_NO_AUTOUPDATE", "").lower() in ("1", "true", "yes") ) saved = _load_config().get(key) effective = ( _telemetry_enabled() if key == "telemetry" else _autoupdate_enabled() ) print(f"{label}: " + ("on" if effective else "off")) if env_off: env_var = "XCLI_NO_TELEMETRY" if key == "telemetry" else "XCLI_NO_AUTOUPDATE" print(f" ({env_var} is set in the environment - it overrides any saved setting)") elif saved: print(f" (saved setting: {saved})") else: print(" (default; no saved setting)") return 0 if op in ("on", "off"): _config_set(key, op) _ok(f"{label}: {op}") return 0 _die(f"{label}: expected on, off, or status", EXIT_USAGE) return EXIT_USAGE return run # ── CRUD ────────────────────────────────────────────────────────────── def _coerce_value(raw: str) -> Any: if raw == "true": return True if raw == "false": return False if raw == "null": return None try: return int(raw) except ValueError: pass try: return float(raw) except ValueError: pass if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")): try: return json.loads(raw) except json.JSONDecodeError: pass return raw def _collect_body_one(args: argparse.Namespace, fields: List[str]) -> Dict[str, Any]: """Single-object body from --json + --field flags. Flags win on overlap.""" body: Dict[str, Any] = {} if getattr(args, "json", None): try: parsed = json.loads(args.json) except json.JSONDecodeError as e: _die(f"--json: {e}", EXIT_VALIDATION) if not isinstance(parsed, dict): _die("--json must be a JSON object", EXIT_VALIDATION) body.update(parsed) for f in fields: val = getattr(args, "_field_" + f, None) if val is not None: body[f] = _coerce_value(val) return body def _bulk_items_from_input(blob: Any, *, expected: str) -> List[Any]: """ Normalise --file / --stdin payload into a list of items. Accepts either a JSON array (one body per element) or a single object (treated as a one-element array). """ if blob is None: return [] if isinstance(blob, list): return blob if isinstance(blob, dict): return [blob] _die(f"--file/--stdin: expected JSON array or object, got {type(blob).__name__}", EXIT_VALIDATION) return [] # unreachable def _do_one(method: str, path: str, body: Any, *, dry_run: bool) -> Any: if dry_run: # Return the planned request envelope; the caller's normal # ``_print_json`` path emits it. Bulk loops naturally collect # one envelope per item into an array. return {"dry_run": True, "method": method, "path": path, "body": body} return _http(method, path, body=body) def _bulk_run( items: List[Any], operation: Callable[[Any], Any], *, continue_on_error: bool, ) -> Tuple[List[Any], int]: """ Run ``operation(item)`` over a list. ``operation`` returns the API response on success or raises ApiError. With ``continue_on_error`` we capture errors per-item and keep going; otherwise the first ApiError aborts the whole batch. Returns ``(results, error_count)`` so the caller can pick a non-zero exit code on partial failure. """ out: List[Any] = [] errors = 0 for i, item in enumerate(items): try: result = operation(item) out.append(result) except ApiError as e: if not continue_on_error: # Stream partial results to stdout so a downstream # consumer can resume; the error lands on stderr. if out: _print_json({"completed": out, "failed_at": i, "error": e.message}) _die_api(e) out.append({"error": e.message, "status": e.status, "input": item}) errors += 1 return out, errors # ── Per-model default --fields (set via ` config`) ────────────── def _default_fields_for(type_name: str) -> Optional[List[str]]: cfg = _load_config() blob = cfg.get("default_fields") if isinstance(cfg.get("default_fields"), dict) else None if not blob: return None raw = blob.get(type_name) if not isinstance(raw, str) or not raw.strip(): return None return _split_fields(raw) # ── Local read-cache (--cache TTL) ───────────────────────────────────── # Per-profile JSON cache keyed by the request shape + a fingerprint of # the active token. Off by default; opt-in per call via `--cache N`. # Cache is best-effort: a corrupt entry is just discarded. def _cache_dir() -> Path: p = _profile_dir(_active_profile) / "cache" p.mkdir(parents=True, exist_ok=True) return p def _cache_fingerprint(method: str, path: str, params: Optional[Dict[str, Any]]) -> str: tok = (_auth_token() or "")[:16] blob = json.dumps({"m": method, "p": path, "q": params or {}, "t": tok, "b": _base_url()}, sort_keys=True) return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:32] def _cache_get(method: str, path: str, params: Optional[Dict[str, Any]], ttl: int) -> Optional[Any]: if ttl <= 0: return None f = _cache_dir() / (_cache_fingerprint(method, path, params) + ".json") if not f.exists(): return None try: blob = json.loads(f.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None if (time.time() - float(blob.get("at") or 0)) >= ttl: return None return blob.get("body") def _cache_set(method: str, path: str, params: Optional[Dict[str, Any]], body: Any) -> None: f = _cache_dir() / (_cache_fingerprint(method, path, params) + ".json") tmp = f.with_suffix(f.suffix + ".tmp") try: tmp.write_text(json.dumps({"at": int(time.time()), "body": body}), encoding="utf-8") os.replace(tmp, f) except OSError: pass # Hard cap so the cache directory can't grow without bound. The TTL on # each entry is the user-visible knob; this is the safety net. _CACHE_MAX_AGE_SECONDS = 7 * 24 * 60 * 60 # 1 week _CACHE_MAX_FILES = 200 def _prune_cache() -> None: """Best-effort sweep at startup. Drops entries older than the hard cap and trims to ``_CACHE_MAX_FILES`` newest. Silent on every failure.""" try: # Use the *active* profile's cache dir if it exists. Don't # touch other profiles - they'll prune themselves on next use. d = _profile_dir(_active_profile) / "cache" if not d.exists(): return now = time.time() files = [] for f in d.iterdir(): if not f.is_file() or not f.name.endswith(".json"): continue try: age = now - f.stat().st_mtime except OSError: continue if age > _CACHE_MAX_AGE_SECONDS: try: f.unlink() except OSError: pass continue files.append((f.stat().st_mtime, f)) if len(files) > _CACHE_MAX_FILES: files.sort(reverse=True) for _, victim in files[_CACHE_MAX_FILES:]: try: victim.unlink() except OSError: pass except OSError: pass def _make_list_cmd(type_name: str): def run(args: argparse.Namespace) -> int: # Build params from --filter (with friendly→wire mapping), --sort, # --limit, --offset, --q. params: Dict[str, Any] = {} if args.limit is not None: params["limit"] = args.limit if args.offset is not None: params["offset"] = args.offset if args.sort: params["sort"] = args.sort if args.q: params["q"] = args.q for entry in args.filter or []: if "=" not in entry: _die(f"--filter expects key=value, got: {entry!r}", EXIT_USAGE) k, _, v = entry.partition("=") wire = _resolve_filter_key(type_name, k) cur = params.get(wire) if cur is None: params[wire] = [v] elif isinstance(cur, list): cur.append(v) else: params[wire] = [cur, v] fields = _split_fields(args.fields) or _default_fields_for(type_name) ttl = int(getattr(args, "cache", 0) or 0) path = f"/xapi2/data/{type_name}" if not args.all: cached = _cache_get("GET", path, params, ttl) if cached is not None: _emit_event("cli.crud", model=type_name, op="list", cached=True) _emit(_project_response(cached, fields), fields=fields) return 0 try: resp = _http("GET", path, params=params) except ApiError as e: _die_api(e) if ttl > 0: _cache_set("GET", path, params, resp) _emit_event("cli.crud", model=type_name, op="list") _emit(_project_response(resp, fields), fields=fields) return 0 # --all: walk pages until has_more is false. Keyset pagination # via ``after=`` is more efficient than offset and # matches what the FE store does. The cache key includes the # ``--all`` flavour so a single-page hit doesn't satisfy a # full-walk request. all_key = {**params, "_all": True} cached = _cache_get("GET", path, all_key, ttl) if cached is not None: _emit_event("cli.crud", model=type_name, op="list_all", cached=True) _emit(_project_response(cached, fields), fields=fields) return 0 # Stream-friendly when ``--format ndjson``: emit each page's # rows as we receive them so big tables don't have to fit in # memory. Other formats accumulate (jq + table need the whole # set anyway). streaming = (_output_format == "ndjson") merged: List[Any] = [] page_meta: Dict[str, Any] = {} cursor: Optional[str] = None pages = 0 try: while True: if cursor: params["after"] = cursor resp = _http("GET", path, params=params) pages += 1 if not isinstance(resp, dict): break data = resp.get("data") or [] if streaming: for row in (data if isinstance(data, list) else []): projected = _project_one(row, fields) if fields and isinstance(row, dict) else row sys.stdout.write(json.dumps(projected, separators=(",", ":"), ensure_ascii=False)) sys.stdout.write("\n") sys.stdout.flush() else: merged.extend(data if isinstance(data, list) else []) meta = resp.get("meta") or {} page_meta = meta if isinstance(meta, dict) else {} if not page_meta.get("has_more") or not data: break last = data[-1] cursor = (last or {}).get("id") if isinstance(last, dict) else None if not cursor: break except ApiError as e: _die_api(e) if streaming: _emit_event("cli.crud", model=type_name, op="list_all", pages=pages, streamed=True) return 0 envelope = {"data": merged, "meta": {**page_meta, "count": len(merged), "has_more": False, "pages": pages}} if ttl > 0: _cache_set("GET", path, all_key, envelope) _emit_event("cli.crud", model=type_name, op="list_all", pages=pages) _emit(_project_response(envelope, fields), fields=fields) return 0 return run def _split_fields(value: Optional[str]) -> Optional[List[str]]: if not value: return None parts = [p.strip() for p in value.split(",") if p.strip()] return parts or None def _make_get_cmd(type_name: str): def run(args: argparse.Namespace) -> int: ttl = int(getattr(args, "cache", 0) or 0) path = f"/xapi2/data/{type_name}/{args.id}" fields = _split_fields(args.fields) or _default_fields_for(type_name) cached = _cache_get("GET", path, None, ttl) if cached is not None: _emit_event("cli.crud", model=type_name, op="read", cached=True) _emit(_project_response(cached, fields), fields=fields) return 0 try: resp = _http("GET", path) except ApiError as e: _die_api(e) if ttl > 0: _cache_set("GET", path, None, resp) _emit_event("cli.crud", model=type_name, op="read") _emit(_project_response(resp, fields), fields=fields) return 0 return run def _resolve_bulk_items(args: argparse.Namespace) -> Optional[List[Any]]: """Pick whichever bulk source the user supplied (json or csv).""" csv_arg = getattr(args, "csv", None) items: Optional[List[Any]] if csv_arg: rows = _load_csv_input(csv_arg, getattr(args, "map", None)) items = rows if rows is not None else [] else: bulk = _load_bulk_input(getattr(args, "file", None), getattr(args, "stdin", False)) if bulk is None: return None items = _bulk_items_from_input(bulk, expected="object") # Empty bulk inputs are almost always a bug - a misnamed file, an # empty CSV with just headers, an upstream filter that produced # nothing. Refuse instead of silently exiting 0. if not items: _die( "no items to process (input was empty)", EXIT_VALIDATION, hint="check the --file / --csv / --stdin source; pipe `--dry-run` first to verify shape", ) return items def _exit_for_bulk(errors: int) -> int: """1 = at least one item failed (continue-on-error path), 0 otherwise.""" return EXIT_ERROR if errors > 0 else EXIT_OK def _writable_fields_hint(fields: List[str]) -> str: """User-facing list of available field flags, falling back to bulk inputs.""" if not fields: return "--json, --file, --stdin, or --csv" flags = " | ".join("--" + f.replace("_", "-") for f in fields) return f"{flags} (or --json, --file, --stdin, --csv)" def _make_create_cmd(type_name: str, type_cfg: Dict[str, Any]): fields = list(type_cfg.get("create_fields") or []) def run(args: argparse.Namespace) -> int: items = _resolve_bulk_items(args) if items is not None: results, errors = _bulk_run( items, lambda item: _do_one("POST", f"/xapi2/data/{type_name}", item, dry_run=args.dry_run), continue_on_error=args.continue_on_error, ) _emit_event("cli.crud", model=type_name, op="create", bulk=len(items), failed=errors or None) _emit(results) return _exit_for_bulk(errors) body = _collect_body_one(args, fields) if not body: _die(f"nothing to send: pass {_writable_fields_hint(fields)}", EXIT_USAGE) try: resp = _do_one("POST", f"/xapi2/data/{type_name}", body, dry_run=args.dry_run) except ApiError as e: _die_api(e) _emit_event("cli.crud", model=type_name, op="create") _emit(resp) return 0 return run def _make_update_cmd(type_name: str, type_cfg: Dict[str, Any]): fields = list(type_cfg.get("update_fields") or []) def run(args: argparse.Namespace) -> int: items = _resolve_bulk_items(args) if items is not None: def op(item: Any) -> Any: if not isinstance(item, dict) or not item.get("id"): raise ApiError(EXIT_VALIDATION, "each bulk update item needs an `id` field") obj_id = item["id"] body = {k: v for k, v in item.items() if k != "id"} return _do_one("PATCH", f"/xapi2/data/{type_name}/{obj_id}", body, dry_run=args.dry_run) results, errors = _bulk_run(items, op, continue_on_error=args.continue_on_error) _emit_event("cli.crud", model=type_name, op="update", bulk=len(items), failed=errors or None) _emit(results) return _exit_for_bulk(errors) if not args.id: _die("update: is required (or use --file/--stdin/--csv)", EXIT_USAGE) body = _collect_body_one(args, fields) if not body: _die(f"nothing to update: pass {_writable_fields_hint(fields)}", EXIT_USAGE) try: resp = _do_one("PATCH", f"/xapi2/data/{type_name}/{args.id}", body, dry_run=args.dry_run) except ApiError as e: _die_api(e) _emit_event("cli.crud", model=type_name, op="update") _emit(resp) return 0 return run def _make_delete_cmd(type_name: str): def run(args: argparse.Namespace) -> int: bulk = _load_bulk_input(args.file, args.stdin) if bulk is not None: items = bulk if isinstance(bulk, list) else [bulk] ids: List[str] = [] for it in items: if isinstance(it, str): ids.append(it) elif isinstance(it, dict) and isinstance(it.get("id"), str): ids.append(it["id"]) else: _die("--file/--stdin for delete must be a list of ids or {id} objects", EXIT_VALIDATION) if not args.yes and not args.dry_run: _die(f"refusing to delete {len(ids)} item(s) without --yes", EXIT_USAGE) results, errors = _bulk_run( ids, lambda obj_id: _do_one("DELETE", f"/xapi2/data/{type_name}/{obj_id}", None, dry_run=args.dry_run) or {"id": obj_id, "deleted": True}, continue_on_error=args.continue_on_error, ) _emit_event("cli.crud", model=type_name, op="delete", bulk=len(ids), failed=errors or None) _emit(results) return _exit_for_bulk(errors) if not args.id: _die("delete: is required (or use --file/--stdin)", EXIT_USAGE) # Single-id delete: confirm interactively, refuse silently in scripts. if not args.yes and not args.dry_run: if not _is_tty(): _die("delete refused: pass --yes (no TTY for confirmation prompt)", EXIT_USAGE) try: confirm = input(f"delete {type_name} {args.id}? [y/N] ").strip().lower() except (EOFError, KeyboardInterrupt): print(); return 130 if confirm not in ("y", "yes"): print("aborted") return 0 try: _do_one("DELETE", f"/xapi2/data/{type_name}/{args.id}", None, dry_run=args.dry_run) except ApiError as e: _die_api(e) _emit_event("cli.crud", model=type_name, op="delete") if not args.dry_run: _ok(f"{type_name} {args.id} deleted") return 0 return run def _make_upsert_cmd(type_name: str, type_cfg: Dict[str, Any]): create_fields = set(type_cfg.get("create_fields") or []) update_fields = set(type_cfg.get("update_fields") or []) def run(args: argparse.Namespace) -> int: unique = args.unique if not unique: _die("upsert: --unique is required", EXIT_USAGE) items = _resolve_bulk_items(args) if items is None: # Allow a single inline object too (--json + --field flags). body = _collect_body_one(args, list(create_fields | update_fields)) if not body: _die("nothing to upsert: pass --json, --file, --stdin, or --csv", EXIT_USAGE) items = [body] # Filter map for the lookup query - the wire form lives on the # query string as ?data__=. wire_lookup = _resolve_filter_key(type_name, unique) def op(item: Any) -> Any: if not isinstance(item, dict): raise ApiError(EXIT_VALIDATION, "upsert items must be JSON objects") value = item.get(unique) if value is None: raise ApiError(EXIT_VALIDATION, f"upsert item missing --unique key {unique!r}") # Dry-run: skip the lookup entirely and emit a single # "would upsert" envelope - keeps the operation usable for # offline validation (no auth, no server round-trip). if args.dry_run: return { "dry_run": True, "operation": "upsert", "lookup": { "method": "GET", "path": f"/xapi2/data/{type_name}", "params": {wire_lookup: value, "limit": 1}, }, "body": item, } existing = _http( "GET", f"/xapi2/data/{type_name}", params={wire_lookup: value, "limit": 1}, ) data = (existing or {}).get("data") or [] if isinstance(existing, dict) else [] if data: obj_id = data[0].get("id") if isinstance(data[0], dict) else None if obj_id: body = {k: v for k, v in item.items() if k != "id"} return _do_one("PATCH", f"/xapi2/data/{type_name}/{obj_id}", body, dry_run=False) return _do_one("POST", f"/xapi2/data/{type_name}", item, dry_run=False) results, errors = _bulk_run(items, op, continue_on_error=args.continue_on_error) _emit_event("cli.crud", model=type_name, op="upsert", bulk=len(items), failed=errors or None) _emit(results) return _exit_for_bulk(errors) return run # ── argparse wiring ──────────────────────────────────────────────────── def _add_field_flags(p: argparse.ArgumentParser, fields: List[str]) -> None: """ Generate one ``--`` for each create/update field. Fields whose name would collide with a reserved CLI flag (``--json`` / ``--file`` / etc.) are skipped silently - the user can still supply them via ``--json`` or ``--file`` so no functionality is lost. """ for f in fields: flag = f.replace("_", "-") if flag in RESERVED_FIELD_FLAGS or f in RESERVED_FIELD_FLAGS: continue p.add_argument( "--" + flag, dest="_field_" + f, metavar=f.upper(), default=None, help=f"set `{f}`", ) p.add_argument("--json", default=None, metavar="JSON", help="full JSON body (merged with --field flags; flags win on overlap)") def _add_bulk_flags(p: argparse.ArgumentParser) -> None: p.add_argument("--file", default=None, metavar="PATH", help="read body from a JSON file (array → bulk; `-` reads stdin)") p.add_argument("--stdin", action="store_true", help="read body from stdin (alias of --file -)") p.add_argument("--csv", default=None, metavar="PATH", help="read rows from a CSV file (`-` reads stdin); each row is one item") p.add_argument("--map", default=None, metavar="K=COL,K=COL", help="rename CSV columns: --map name=Full Name,email=E-Mail") p.add_argument("--continue-on-error", dest="continue_on_error", action="store_true", help="bulk: don't stop at the first failed item (exit 1 if any failed)") p.add_argument("--idempotency-key", dest="idempotency_key", default=None, metavar="KEY", help="send Idempotency-Key header on every write in this command") p.add_argument("--auto-idempotency", dest="auto_idempotency", action="store_true", help="generate a fresh idempotency key per item (safe to retry blind)") def _add_dry_run(p: argparse.ArgumentParser) -> None: p.add_argument("--dry-run", dest="dry_run", action="store_true", help="print the request(s) that would be sent and exit 0") def _add_global_flags(p: argparse.ArgumentParser) -> None: """ Mirror the global flags onto every subparser so users don't have to remember they belong "before the verb". Defaults are ``argparse.SUPPRESS`` so a subparser's no-flag default doesn't silently overwrite a value set on the parent (the well-known argparse-subparser-default footgun). """ p.add_argument("--profile", default=argparse.SUPPRESS, metavar="NAME", help="credential profile (also XCLI_PROFILE env)") p.add_argument("--compact", action="store_true", default=argparse.SUPPRESS, help="single-line JSON output (default: pretty)") p.add_argument("--retry", type=int, default=argparse.SUPPRESS, metavar="N", help="retry transient failures (429/5xx/network) up to N times") p.add_argument("--backoff", choices=("exp", "linear", "off"), default=argparse.SUPPRESS, help="backoff curve when --retry > 0 (default: exp)") p.add_argument("--stderr-json", dest="stderr_json", action="store_true", default=argparse.SUPPRESS, help="emit machine-readable error envelopes on stderr") p.add_argument("--format", choices=("json", "ndjson", "table"), default=argparse.SUPPRESS, help="output format (default: json; ndjson streams list --all)") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog=COMMAND_NAME, description=f"Production Board command-line interface (v1.2.7).", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(f"""\ Run `prodcli` with no arguments to see your sign-in state. Full reference: {DEFAULT_BASE.rstrip('/')}/docs/cli Exit codes: 0 ok; 2 usage; 3 auth; 4 forbidden; 5 not found; 6 validation; 7 conflict; 8 rate limited; 9 server; 10 network. """), ) _add_global_flags(parser) sub = parser.add_subparsers(dest="cmd") def _sub(name: str, **kw: Any) -> argparse.ArgumentParser: sp = sub.add_parser(name, **kw) _add_global_flags(sp) return sp p_login = _sub("login", help="sign in with a token or email + password") p_login.add_argument("--token", default=None, help="personal access token (pat_…)") p_login.add_argument("--email", default=None) p_login.add_argument("--password", default=None, help="prompted via getpass when omitted (don't put on the command line)") p_login.add_argument("--password-stdin", dest="password_stdin", action="store_true", help="read the password from stdin (CI-friendly, no shell history)") p_login.set_defaults(_run=cmd_login) p_logout = _sub("logout", help="forget the saved token (current profile)") p_logout.set_defaults(_run=cmd_logout) p_who = _sub("whoami", help="show the signed-in user") p_who.set_defaults(_run=cmd_whoami) p_ver = _sub("version", help="print the CLI version") p_ver.set_defaults(_run=cmd_version) p_upd = _sub("update", help="check for and install a newer client") p_upd.set_defaults(_run=cmd_update) # ── profile ── def _build_profile_sub(parent: argparse.ArgumentParser) -> None: sp = parent.add_subparsers(dest="profile_op") for op_name, h in ( ("list", "list profiles"), ("show", "show the active profile"), ("use", "set the active profile"), ("delete", "delete a profile"), ): p = sp.add_parser(op_name, help=h) _add_global_flags(p) if op_name in ("use", "delete"): p.add_argument("name", nargs="?") if op_name == "delete": p.add_argument("--yes", action="store_true", help="confirm deleting the default profile") p_prof = _sub("profile", help="manage credential profiles") _build_profile_sub(p_prof) p_prof.set_defaults(_run=cmd_profile) # `auth` is a familiar alias for `profile` so muscle-memory from # other tools (`gh auth`, `claude auth`, `gcloud auth`) lands. p_auth = _sub("auth", help="alias of `profile` (use / show / list / delete)") _build_profile_sub(p_auth) p_auth.set_defaults(_run=cmd_profile) # ── telemetry / autoupdate ── for key, label in (("telemetry", "telemetry"), ("autoupdate", "autoupdate")): p_t = _sub(key, help=f"manage {label} (status / on / off)") p_t.add_argument("toggle_op", nargs="?", choices=("on", "off", "status"), default=None) p_t.set_defaults(_run=_toggle_cmd(key, label)) # ── schema introspection ── p_sch = _sub("schema", help="show fields / ops / limits for one model (or all)") p_sch.add_argument("model", nargs="?") p_sch.set_defaults(_run=cmd_schema) # ── persistent config ── p_cfg = _sub("config", help="manage persistent CLI defaults (format / fields / …)") p_cfg_sub = p_cfg.add_subparsers(dest="config_op") for op_name, h in ( ("list", "show every saved value (and effective env-overridden ones)"), ("get", "read one key (dotted: default_fields.contact)"), ("set", "write one key (`config set format ndjson`)"), ("unset", "remove one key"), ("keys", "list known keys + what they do"), ("reset", "wipe all saved config"), ): cfg_p = p_cfg_sub.add_parser(op_name, help=h) _add_global_flags(cfg_p) if op_name in ("get", "set", "unset"): cfg_p.add_argument("key", nargs="?") if op_name == "set": cfg_p.add_argument("value", nargs="?") if op_name == "reset": cfg_p.add_argument("--yes", action="store_true", help="skip the confirmation prompt") p_cfg.set_defaults(_run=cmd_config) # ── per-model subcommands ── for type_name, cfg in MODELS.items(): ops = set(cfg.get("ops") or []) tp = _sub(type_name, help=f"manage {type_name} objects") tp_sub = tp.add_subparsers(dest="op") def _verb(parent_sub, name: str, **kw: Any) -> argparse.ArgumentParser: sp = parent_sub.add_parser(name, **kw) _add_global_flags(sp) return sp if "list" in ops: p = _verb(tp_sub, "list", help=f"list {type_name}") p.add_argument("--limit", type=int, default=None) p.add_argument("--offset", type=int, default=None) p.add_argument("--sort", default=None, help="sort key (prefix - for desc)") p.add_argument("--q", default=None, help="substring search") p.add_argument("--filter", action="append", default=None, help="repeatable: key=value (friendly key auto-mapped to wire form)") p.add_argument("--all", action="store_true", help="auto-paginate until done") p.add_argument("--fields", default=None, metavar="A,B,C", help="comma-separated dotted paths to keep (e.g. id,data.name)") p.add_argument("--cache", type=int, default=0, metavar="SECONDS", help="serve from local on-disk cache for SECONDS (default: off)") p.set_defaults(_run=_make_list_cmd(type_name)) if "read" in ops: p = _verb(tp_sub, "get", help=f"read one {type_name}") p.add_argument("id") p.add_argument("--fields", default=None, metavar="A,B,C", help="comma-separated dotted paths to keep") p.add_argument("--cache", type=int, default=0, metavar="SECONDS", help="serve from local on-disk cache for SECONDS (default: off)") p.set_defaults(_run=_make_get_cmd(type_name)) if "create" in ops: p = _verb(tp_sub, "create", help=f"create a {type_name}") _add_field_flags(p, list(cfg.get("create_fields") or [])) _add_bulk_flags(p) _add_dry_run(p) p.set_defaults(_run=_make_create_cmd(type_name, cfg)) if "update" in ops: p = _verb(tp_sub, "update", help=f"update a {type_name}") p.add_argument("id", nargs="?") _add_field_flags(p, list(cfg.get("update_fields") or [])) _add_bulk_flags(p) _add_dry_run(p) p.set_defaults(_run=_make_update_cmd(type_name, cfg)) if "delete" in ops: p = _verb(tp_sub, "delete", help=f"delete a {type_name}") p.add_argument("id", nargs="?") p.add_argument("-y", "--yes", action="store_true", help="skip confirmation prompt") _add_bulk_flags(p) _add_dry_run(p) p.set_defaults(_run=_make_delete_cmd(type_name)) # Upsert is synthetic - fine to expose anywhere create+update+list are open. if {"create", "update", "list"}.issubset(ops): p = _verb(tp_sub, "upsert", help=f"create or update a {type_name} by --unique key") p.add_argument("--unique", default=None, metavar="FIELD", help="field used to look up existing rows (required)") _add_field_flags(p, sorted(set(cfg.get("create_fields") or []) | set(cfg.get("update_fields") or []))) _add_bulk_flags(p) _add_dry_run(p) p.set_defaults(_run=_make_upsert_cmd(type_name, cfg)) # Per-model schema verb. `` contact schema`` is the # discovery path users reach for first ("what fields does this # have?") - without it they have to know about the top-level # `` schema contact`` form. ``describe`` is the same thing # under the more SQL-flavoured name. for verb_name in ("schema", "describe"): p = _verb(tp_sub, verb_name, help=f"show fields / ops / limits for {type_name}") p.set_defaults(_run=cmd_schema, model=type_name) return parser # Subcommands cheap enough that we skip the autoupdate hop on them. _AUTOUPDATE_SKIP = frozenset(( "version", "logout", "profile", "auth", "telemetry", "autoupdate", "schema", "config", )) def main(argv: Optional[List[str]] = None) -> int: global _active_profile, _compact_output, _stderr_json, _retry_count, _backoff_kind global _output_format, _idempotency_key, _auto_idempotency argv = list(sys.argv[1:] if argv is None else argv) _migrate_legacy_state() _cleanup_stale_old() _prune_cache() if not argv: _active_profile = _resolve_active_profile(None) _banner() _emit_event("cli.banner") return 0 parser = build_parser() args = parser.parse_args(argv) _active_profile = _resolve_active_profile(getattr(args, "profile", None)) # Saved config provides defaults; explicit flags win. cfg = _load_config() _compact_output = bool(getattr(args, "compact", cfg.get("compact", False))) _stderr_json = bool(getattr(args, "stderr_json", cfg.get("stderr_json", False))) _output_format = (getattr(args, "format", None) or cfg.get("format") or "json") if _output_format not in ("json", "ndjson", "table"): _output_format = "json" _idempotency_key = getattr(args, "idempotency_key", None) _auto_idempotency = bool(getattr(args, "auto_idempotency", False)) retry_arg = getattr(args, "retry", None) if retry_arg is None: retry_arg = cfg.get("retry") if isinstance(retry_arg, int) and retry_arg > 0: _retry_count = retry_arg _backoff_kind = (getattr(args, "backoff", None) or cfg.get("backoff") or "exp") else: backoff = getattr(args, "backoff", None) or cfg.get("backoff") if backoff: _backoff_kind = backoff runner = getattr(args, "_run", None) if runner is None: parser.print_help() return EXIT_USAGE if args.cmd not in _AUTOUPDATE_SKIP: try: _maybe_autoupdate(force=False) except Exception: pass try: return int(runner(args) or 0) except KeyboardInterrupt: print() return 130 if __name__ == "__main__": sys.exit(main())