""" Drop-in Python client library for the Production Board HTTP API. Save this file alongside your source as ``prod_client.py`` and import the operation functions you need: from prod_client import set_token, account_list, account_create set_token("pat_...") rows = account_list(limit=20, sort="-created_at") new = account_create({{"name": "Example GmbH"}}) Every endpoint exposed by the HTTP API is wrapped as a ``_`` function. List endpoints take keyword filters; get/update/delete endpoints take the row id as their first positional argument. Provided as-is, with no warranty. Vendor freely; modify if you need to. Requires Python 3.8+. DO NOT EDIT THIS FILE MANUALLY - re-download from the docs site instead. Local edits will be overwritten by the once-per-day version check. """ # ruff: noqa: E402 from __future__ import annotations import json import os import platform import sys import tempfile import threading import time import urllib.error import urllib.parse import urllib.request import uuid from pathlib import Path from typing import Any, Dict, Optional # ── Identity (substituted at generation time) ───────────────────────── APP_SLUG = "prod" APP_NAME = "Production Board" MODULE_NAME = "prod_client" CLIENT_VERSION = "0.3.12" LANGUAGE = "python" DEFAULT_BASE = "https://qtssystem.com" # Per-type metadata baked at generation time. Useful at runtime when # code needs to know the legal filters / sort columns / max_limit for a # model without a second round-trip. TYPES: Dict[str, Dict[str, Any]] = json.loads(r"""{"board":{"ops":["list","read","create","update","delete"],"create_fields":["name","description","accent","settings","tags","columns"],"update_fields":["name","description","accent","settings","tags","columns"],"allowed_filters":["data__name","data__accent","data__tags","status","is_archived","owned_by"],"allowed_sorts":["created_at","updated_at","data__name"],"default_sort":"created_at","max_limit":50,"fields":[{"name":"name","type":"string","max_len":200},{"name":"tags","type":"tags"},{"name":"accent","type":"enum","values":["slate","gray","blue","indigo","violet","fuchsia","amber","orange","emerald","green","rose","red"]},{"name":"settings","type":"dict"},{"name":"description","type":"string","max_len":2000}]},"card":{"ops":["list","read","create","update","delete"],"create_fields":["title","description","status","position","priority","tags","assignee","due_date","board_id"],"update_fields":["title","description","status","position","priority","tags","assignee","due_date","board_id"],"allowed_filters":["data__status","data__priority","data__tags","data__assignee","data__board_id","status","is_archived","owned_by"],"allowed_sorts":["created_at","updated_at","data__position","data__status","data__priority","data__due_date"],"default_sort":"data__position","max_limit":200,"fields":[{"name":"tags","type":"tags"},{"name":"title","type":"string","max_len":200},{"name":"status","type":"string","max_len":64},{"name":"assignee","type":"string","max_len":64},{"name":"board_id","type":"string","max_len":64,"ref":{"type":"board","owned":true,"optional":true}},{"name":"due_date","type":"string","max_len":32},{"name":"position","type":"number"},{"name":"priority","type":"enum","values":["low","medium","high","critical"]},{"name":"description","type":"string","max_len":4000}]}}""") # ── Token + base-URL configuration ──────────────────────────────────── # Resolution order on every request: explicit set_token() at runtime → # XCLIENT_TOKEN env → None (request fails with 401). The env-var path # means CI can hand a token to the process without it ever touching # disk. _token: Optional[str] = None def set_token(token: Optional[str]) -> None: """Provide a personal access token (PAT) for every subsequent call. Pass ``None`` to clear. The token is held in process memory only.""" global _token _token = (token or "").strip() or None def get_token() -> Optional[str]: return _token or os.environ.get("XCLIENT_TOKEN") or None def _base_url() -> str: return (os.environ.get("XCLIENT_BASE_URL") or DEFAULT_BASE).rstrip("/") # ── Persistent device + ephemeral session ids ───────────────────────── # The library plants a `device_id` next to the user's home directory so # behavioural patterns can be matched up across runs. Permission to # collect this is part of the contract the operating company signed # when integrating this app. # # The session id is generated once per process. Mirrors the FE's # analytics-state shape exactly so the same dashboard can split web vs # library usage on a single column. def _resolve_state_dir() -> Optional[Path]: """Locate the per-library state dir under the user's home. Returns None if no usable home directory is set (sandboxes, locked-down containers) so the caller can fall back to in-memory only.""" try: return Path.home() / f".prod_client" except (RuntimeError, OSError): pass for var in ("HOME", "USERPROFILE"): v = os.environ.get(var) if v: try: return Path(v) / f".prod_client" except (TypeError, ValueError): continue return None _state_dir = _resolve_state_dir() def _ensure_state_dir() -> Optional[Path]: if _state_dir is None: return None try: _state_dir.mkdir(parents=True, exist_ok=True) try: os.chmod(_state_dir, 0o700) except OSError: pass except OSError: return None return _state_dir def _device_id() -> str: d = _ensure_state_dir() if d is None: return str(uuid.uuid4()) f = d / "device.json" try: blob = json.loads(f.read_text(encoding="utf-8")) did = blob.get("device_id") if isinstance(did, str) and len(did) >= 32: return did except Exception: pass did = str(uuid.uuid4()) try: f.write_text(json.dumps({"device_id": did}), encoding="utf-8") try: os.chmod(f, 0o600) except OSError: pass except OSError: pass return did _session_id_cache: Optional[str] = None def _session_id() -> str: global _session_id_cache if _session_id_cache is None: _session_id_cache = str(uuid.uuid4()) return _session_id_cache def _autoupdate_enabled() -> bool: return os.environ.get("XCLIENT_NO_AUTOUPDATE", "").lower() not in ("1", "true", "yes") # ── Environment fingerprint ─────────────────────────────────────────── # Reported once per session in the analytics meta block. Helps the team # running this app understand which tooling versions to support. Best # effort: every probe is wrapped in try/except so a hostile environment # (no /usr/bin/which, no $PATH) can't break the request. def _has_command(cmd: str) -> bool: try: for entry in (os.environ.get("PATH") or "").split(os.pathsep): if not entry: continue for ext in ("", ".exe", ".cmd", ".bat") if sys.platform == "win32" else ("",): p = os.path.join(entry, cmd + ext) if os.path.isfile(p): return True except Exception: pass return False def _editor_fingerprint() -> Dict[str, Any]: """Light fingerprint of the developer's environment. Strings only, no paths, no machine names. Helps the team running this app understand what tooling the integration runs under.""" out: Dict[str, Any] = {} try: out["term_program"] = os.environ.get("TERM_PROGRAM") or None out["editor_env"] = os.environ.get("EDITOR") or None out["ci"] = bool(os.environ.get("CI") or os.environ.get("GITHUB_ACTIONS")) out["claude_code"] = bool(os.environ.get("CLAUDECODE") or os.environ.get("CLAUDE_CODE_ENTRYPOINT")) or _has_command("claude") out["codex"] = bool(os.environ.get("CODEX_HOME")) or _has_command("codex") # IDE detection. TERM_PROGRAM covers vscode + cursor; the env # vars below are unique to each fork. tp = (out["term_program"] or "").lower() out["vscode"] = tp == "vscode" and not bool(os.environ.get("CURSOR_TRACE_ID")) out["cursor"] = bool(os.environ.get("CURSOR_TRACE_ID")) or _has_command("cursor") out["antigravity"] = bool(os.environ.get("ANTIGRAVITY_TRACE_ID")) or _has_command("antigravity") out["jetbrains"] = "jetbrains" in tp or _has_command("idea") or _has_command("pycharm") except Exception: pass return out # ── HTTP transport ──────────────────────────────────────────────────── # Stdlib urllib so the library has zero install. The redirect handler # strips Authorization on cross-origin hops (a misconfigured proxy # bouncing requests to an internal host can't leak the PAT). class ApiError(RuntimeError): def __init__(self, status: int, message: str, body: Any = None): super().__init__(f"HTTP {status}: {message}") self.status = status self.message = message self.body = body class _StripAuthRedirect(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): new = super().redirect_request(req, fp, code, msg, headers, newurl) if new is None: return None try: old = urllib.parse.urlparse(req.full_url).netloc cur = urllib.parse.urlparse(new.full_url).netloc if old != cur and new.has_header("Authorization"): new.headers.pop("Authorization", None) except Exception: pass return new _opener = urllib.request.build_opener(_StripAuthRedirect()) urllib.request.install_opener(_opener) _USER_AGENT = f"prod_client/{CLIENT_VERSION} (lib/python; {platform.system().lower()}; py{platform.python_version()})" _RETRYABLE_STATUSES = frozenset((408, 425, 429, 500, 502, 503, 504)) _DEFAULT_TIMEOUT = 30.0 _MAX_RETRIES = 3 def _backoff(attempt: int, retry_after: Optional[float]) -> float: if retry_after is not None and retry_after >= 0: return min(retry_after, 60.0) return min(2.0 ** attempt, 60.0) def _request(method: str, path: str, *, params: Optional[Dict[str, Any]] = None, body: Optional[Any] = None, expect_empty: bool = False, timeout: float = _DEFAULT_TIMEOUT) -> Any: """Generic transport. Every per-type wrapper forwards through here. Retries on 408/425/429/5xx + connection errors with exponential backoff capped at 60s; honours ``Retry-After`` when present.""" _maybe_autoupdate_once() url = _base_url() + path if params: qs = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None}, doseq=True) if qs: sep = "&" if "?" in url else "?" url = f"{url}{sep}{qs}" headers: Dict[str, str] = { "Accept": "application/json", "User-Agent": _USER_AGENT, "X-Client-Channel": "client_" + LANGUAGE, "X-Client-Version": CLIENT_VERSION, # Identifier headers mirror the CLI surface so the analytics # writer can count library traffic without the body. "X-Analytics-Device-Id": _device_id(), "X-Analytics-Session-Id": _session_id(), } token = get_token() if token: headers["Authorization"] = f"Bearer {token}" data: Optional[bytes] = None if body is not None: data = json.dumps(body, separators=(",", ":")).encode("utf-8") headers["Content-Type"] = "application/json" last_err: Optional[Exception] = None for attempt in range(_MAX_RETRIES): req = urllib.request.Request(url, data=data, method=method.upper(), headers=headers) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() _maybe_persist_refresh(resp.headers) _emit_call_event(method, path, resp.status, ok=True) if expect_empty or not raw: return None ctype = (resp.headers.get("Content-Type") or "").lower() if "application/json" in ctype: return json.loads(raw.decode("utf-8")) return raw.decode("utf-8", errors="replace") except urllib.error.HTTPError as e: try: raw = e.read() ctype = (e.headers.get("Content-Type") or "").lower() if e.headers else "" parsed: Any = None if "application/json" in ctype and raw: try: parsed = json.loads(raw.decode("utf-8")) except Exception: parsed = None msg = ( (parsed.get("detail") if isinstance(parsed, dict) else None) or (parsed.get("message") if isinstance(parsed, dict) else None) or e.reason or "request failed" ) except Exception: msg = e.reason or "request failed" parsed = None if e.code in _RETRYABLE_STATUSES and attempt + 1 < _MAX_RETRIES: ra = None try: if e.headers and e.headers.get("Retry-After"): ra = float(e.headers.get("Retry-After")) except Exception: ra = None time.sleep(_backoff(attempt, ra)) continue _emit_call_event(method, path, e.code, ok=False) raise ApiError(e.code, str(msg), body=parsed) except (urllib.error.URLError, TimeoutError, OSError) as e: last_err = e if attempt + 1 < _MAX_RETRIES: time.sleep(_backoff(attempt, None)) continue _emit_call_event(method, path, 0, ok=False) raise ApiError(0, str(e)) if last_err: _emit_call_event(method, path, 0, ok=False) raise ApiError(0, str(last_err)) raise ApiError(0, "request failed") def _maybe_persist_refresh(headers: Any) -> None: """The HTTP API rotates session JWTs in-flight via x-auth-refresh-token. PATs aren't refreshed, so this is a no-op for the typical config - we still pick up the header in case a session token is in use.""" try: new_tok = headers.get("x-auth-refresh-token") except Exception: new_tok = None if new_tok: global _token _token = new_tok # ── Analytics ───────────────────────────────────────────────────────── # Fire-and-forget; never raises. Mirrors the CLI / MCP shapes so the # admin panel can split traffic by ``meta.channel`` without a schema # change. _meta_sent_once = False _meta_lock = threading.Lock() def _emit_call_event(method: str, path: str, status: int, *, ok: bool) -> None: """Schedule an analytics ping on a daemon thread so the calling request returns as soon as the API response is parsed. The ping has its own 4s timeout; if the analytics endpoint is slow or unreachable the user code never feels it. ``_meta_sent_once`` flips under a lock so concurrent callers don't both attach the env fingerprint.""" global _meta_sent_once with _meta_lock: include_env = not _meta_sent_once if include_env: _meta_sent_once = True def _send() -> None: try: meta: Dict[str, Any] = { "channel": "client_" + LANGUAGE, "client_version": CLIENT_VERSION, "module_name": MODULE_NAME, "language": LANGUAGE, "os": f"{platform.system()} {platform.release()}", "py": platform.python_version(), } if include_env: meta["env"] = _editor_fingerprint() evt: Dict[str, Any] = { "type": "client.call", "ts_client": int(time.time()), "meta": { "method": method.upper(), "path": path.split("?")[0][:128], "status": int(status), "ok": bool(ok), }, } body = { "device_id": _device_id(), "session_id": _session_id(), "events": [evt], "meta": meta, } url = _base_url() + "/xapi2/analytics/track" data = json.dumps(body).encode("utf-8") req = urllib.request.Request(url, data=data, method="POST", headers={ "Content-Type": "application/json", "User-Agent": _USER_AGENT, }) urllib.request.urlopen(req, timeout=4.0).close() except Exception: pass try: threading.Thread(target=_send, name=f"prod_client-analytics", daemon=True).start() except Exception: # Threading can fail in odd embedded interpreters; swallow so we # never break the caller's request over a telemetry hiccup. pass # ── Auto-update ─────────────────────────────────────────────────────── # Once-per-24h check. Atomically replace this file on disk if a newer # version is reported. The currently running process keeps the old code # (Python caches modules); the next import picks up the new bytes. # Runs on a daemon thread so the user's first wrapper call never blocks # on the version probe + script refetch. def _check_due() -> bool: if _state_dir is None: return True f = _state_dir / "update_check.json" try: blob = json.loads(f.read_text(encoding="utf-8")) last = blob.get("checked_at") if isinstance(last, (int, float)): return (time.time() - float(last)) >= 86400 except Exception: pass return True def _record_check() -> None: if _state_dir is None: return f = _state_dir / "update_check.json" try: f.write_text(json.dumps({"checked_at": int(time.time())}), encoding="utf-8") except OSError: pass def _looks_like_valid_client(blob: bytes) -> bool: """Sanity-check candidate replacement bytes before swapping them in. We refuse to overwrite ourselves with anything that doesn't carry the marker tokens *and* compile cleanly. Marker tokens are neutral identifiers - they describe what the file *is*, not how it got here.""" try: text = blob.decode("utf-8") except UnicodeDecodeError: return False for marker in (b"MODULE_NAME", b"CLIENT_VERSION", b"APP_SLUG", b"def _request"): if marker not in blob: return False try: compile(text, "", "exec") return True except SyntaxError: return False def _run_autoupdate() -> None: try: url = _base_url() + "/xapi2/clients/version" with urllib.request.urlopen(url, timeout=4.0) as resp: payload = json.loads(resp.read().decode("utf-8")) latest = payload.get("version") _record_check() if not isinstance(latest, str) or latest == CLIENT_VERSION: return url2 = _base_url() + "/xapi2/clients/script." + LANGUAGE with urllib.request.urlopen(url2, timeout=10.0) as resp2: new_source = resp2.read() if not _looks_like_valid_client(new_source): return # Atomic replace next to ourselves on disk. target = Path(__file__).resolve() d = target.parent fd, tmp = tempfile.mkstemp(prefix=".__update_", dir=str(d)) try: with os.fdopen(fd, "wb") as f: f.write(new_source) os.replace(tmp, target) except OSError: try: os.unlink(tmp) except OSError: pass except Exception: pass # Schedule auto-update on first call. Never blocks the request - # the version probe + script refetch run on a daemon thread so the # main thread can keep serving the actual API call. _autoupdate_attempted = False _autoupdate_lock = threading.Lock() def _maybe_autoupdate_once() -> None: global _autoupdate_attempted with _autoupdate_lock: if _autoupdate_attempted: return _autoupdate_attempted = True if not _autoupdate_enabled() or not _check_due(): return t = threading.Thread(target=_run_autoupdate, name=f"prod_client-autoupdate", daemon=True) t.start() # ── Generated per-type wrapper functions ────────────────────────────── # Every model that exposes an op gets one ``_`` function # below. The runtime above does the heavy lifting; these wrappers just # pin the URL + HTTP verb. def board_list(*, limit: int = 20, offset: int = 0, sort: Optional[str] = None, q: Optional[str] = None, **filters: Any) -> Dict[str, Any]: """List ``board`` rows. Extra kwargs become query-string filters.""" params: Dict[str, Any] = {'limit': limit, 'offset': offset} if sort: params['sort'] = sort if q: params['q'] = q for k, v in filters.items(): if v is not None: params[k] = v return _request('GET', '/xapi2/data/board', params=params) def board_get(obj_id: str) -> Dict[str, Any]: """Fetch one ``board`` row by id.""" return _request('GET', '/xapi2/data/board/' + obj_id) def board_create(data: Dict[str, Any]) -> Dict[str, Any]: """Create a new ``board`` row.""" return _request('POST', '/xapi2/data/board', body=data) def board_update(obj_id: str, data: Dict[str, Any]) -> Dict[str, Any]: """Patch an existing ``board`` row.""" return _request('PATCH', '/xapi2/data/board/' + obj_id, body=data) def board_delete(obj_id: str) -> bool: """Delete a ``board`` row. Returns True on success.""" _request('DELETE', '/xapi2/data/board/' + obj_id, expect_empty=True) return True def card_list(*, limit: int = 20, offset: int = 0, sort: Optional[str] = None, q: Optional[str] = None, **filters: Any) -> Dict[str, Any]: """List ``card`` rows. Extra kwargs become query-string filters.""" params: Dict[str, Any] = {'limit': limit, 'offset': offset} if sort: params['sort'] = sort if q: params['q'] = q for k, v in filters.items(): if v is not None: params[k] = v return _request('GET', '/xapi2/data/card', params=params) def card_get(obj_id: str) -> Dict[str, Any]: """Fetch one ``card`` row by id.""" return _request('GET', '/xapi2/data/card/' + obj_id) def card_create(data: Dict[str, Any]) -> Dict[str, Any]: """Create a new ``card`` row.""" return _request('POST', '/xapi2/data/card', body=data) def card_update(obj_id: str, data: Dict[str, Any]) -> Dict[str, Any]: """Patch an existing ``card`` row.""" return _request('PATCH', '/xapi2/data/card/' + obj_id, body=data) def card_delete(obj_id: str) -> bool: """Delete a ``card`` row. Returns True on success.""" _request('DELETE', '/xapi2/data/card/' + obj_id, expect_empty=True) return True