From 632f8b937d6147f34b9b898f7fd650821f614f9b Mon Sep 17 00:00:00 2001 From: marwaneltoukhy Date: Thu, 26 Feb 2026 15:32:43 +0200 Subject: [PATCH] Add platform integration: login, link, push, and status commands Browser-based OAuth login flow (cf login) that obtains a long-lived cfk_live_ API key via PKCE and localhost callback. New commands: - cf login/logout/whoami -- authenticate with the ChipFoundry platform - cf link/unlink -- connect local projects to platform projects - cf push -- extended to create/link platform projects on first push - cf status -- extended with platform project panel, --all, --json flags New modules: api.py (PlatformAPI client), auth.py (browser OAuth flow). Updated utils.py with API key and platform project ID helpers. Made-with: Cursor --- chipfoundry_cli/api.py | 180 +++++++++++++ chipfoundry_cli/auth.py | 197 ++++++++++++++ chipfoundry_cli/main.py | 536 ++++++++++++++++++++++++++++++++++++--- chipfoundry_cli/utils.py | 50 ++++ 4 files changed, 928 insertions(+), 35 deletions(-) create mode 100644 chipfoundry_cli/api.py create mode 100644 chipfoundry_cli/auth.py diff --git a/chipfoundry_cli/api.py b/chipfoundry_cli/api.py new file mode 100644 index 0000000..68e71a7 --- /dev/null +++ b/chipfoundry_cli/api.py @@ -0,0 +1,180 @@ +""" +Platform API client for communicating with the chipIgnite backend. + +All CLI commands that need platform data go through this module. +Authentication is via a long-lived API key (cfk_live_...) stored in config.toml. +""" + +import httpx +from typing import Any, Dict, List, Optional + + +class PlatformAPIError(Exception): + """Raised when a platform API call fails.""" + + def __init__(self, message: str, status_code: int = 0): + self.status_code = status_code + super().__init__(message) + + +class PlatformAPI: + """Thin wrapper around httpx for authenticated platform API calls.""" + + def __init__(self, api_url: str, api_key: str, timeout: float = 30.0): + self._base = api_url.rstrip("/") + self._api_key = api_key + self._timeout = timeout + + def _headers(self) -> Dict[str, str]: + return {"Authorization": f"Bearer {self._api_key}"} + + def _url(self, path: str) -> str: + return f"{self._base}/api/v1{path}" + + def _handle_response(self, resp: httpx.Response) -> Any: + if resp.status_code == 401: + raise PlatformAPIError( + "Your API key is no longer valid. Run `cf login` to generate a new one.", + status_code=401, + ) + if resp.status_code == 403: + raise PlatformAPIError( + "You do not have permission to perform this action.", + status_code=403, + ) + if resp.status_code == 404: + raise PlatformAPIError( + "Resource not found. It may have been deleted or you may not have access.", + status_code=404, + ) + if resp.status_code >= 400: + detail = "" + try: + body = resp.json() + detail = body.get("error") or body.get("detail") or str(body) + except Exception: + detail = resp.text[:200] + raise PlatformAPIError( + f"API error ({resp.status_code}): {detail}", + status_code=resp.status_code, + ) + return resp.json() + + # --- Auth --- + + def validate(self) -> Dict[str, Any]: + """Validate the API key and return user info.""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.get(self._url("/auth/validate"), headers=self._headers()) + return self._handle_response(resp) + + def generate_api_key(self, cognito_token: str) -> Dict[str, Any]: + """ + Generate a CLI API key using a transient Cognito access token. + This is called during `cf login` — not with the stored API key. + """ + headers = {"Authorization": f"Bearer {cognito_token}"} + with httpx.Client(timeout=self._timeout) as client: + resp = client.post(self._url("/auth/cli/api-key"), headers=headers) + return self._handle_response(resp) + + def revoke_api_key(self) -> Dict[str, Any]: + """Revoke the current API key (used by `cf logout`).""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.delete(self._url("/auth/cli/api-key"), headers=self._headers()) + return self._handle_response(resp) + + def exchange_portal_code( + self, + code: str, + code_verifier: str, + redirect_uri: str, + state: Optional[str] = None, + ) -> Dict[str, Any]: + """Exchange an OAuth authorization code for tokens via the portal callback.""" + payload: Dict[str, Any] = { + "code": code, + "code_verifier": code_verifier, + "redirect_uri": redirect_uri, + } + if state: + payload["state"] = state + with httpx.Client(timeout=self._timeout) as client: + resp = client.post( + self._url("/auth/portal/callback"), + json=payload, + ) + return self._handle_response(resp) + + def get_authorize_url(self) -> Dict[str, Any]: + """Get the Cognito OAuth authorization URL for the portal.""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.get(self._url("/auth/portal/authorize")) + return self._handle_response(resp) + + # --- Projects --- + + def create_project( + self, + name: str, + shuttle_id: Optional[str] = None, + description: Optional[str] = None, + design_type: Optional[str] = None, + ) -> Dict[str, Any]: + """Create a new project on the platform.""" + payload: Dict[str, Any] = { + "name": name, + "registration_source": "cli", + } + if shuttle_id: + payload["shuttle_id"] = shuttle_id + if description: + payload["description"] = description + if design_type: + payload["design_type"] = design_type + with httpx.Client(timeout=self._timeout) as client: + resp = client.post( + self._url("/projects"), + json=payload, + headers=self._headers(), + ) + return self._handle_response(resp) + + def get_project(self, project_id: str) -> Dict[str, Any]: + """Get a project by ID.""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.get( + self._url(f"/projects/{project_id}"), + headers=self._headers(), + ) + return self._handle_response(resp) + + def list_my_projects(self) -> List[Dict[str, Any]]: + """List all projects belonging to the authenticated user.""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.get( + self._url("/projects/me"), + headers=self._headers(), + ) + return self._handle_response(resp) + + def update_project(self, project_id: str, **fields: Any) -> Dict[str, Any]: + """Update a project on the platform.""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.put( + self._url(f"/projects/{project_id}"), + json=fields, + headers=self._headers(), + ) + return self._handle_response(resp) + + # --- Shuttles --- + + def list_shuttles(self) -> List[Dict[str, Any]]: + """List shuttles available for submission.""" + with httpx.Client(timeout=self._timeout) as client: + resp = client.get( + self._url("/shuttles/available"), + headers=self._headers(), + ) + return self._handle_response(resp) diff --git a/chipfoundry_cli/auth.py b/chipfoundry_cli/auth.py new file mode 100644 index 0000000..87e72ed --- /dev/null +++ b/chipfoundry_cli/auth.py @@ -0,0 +1,197 @@ +""" +Browser-based OAuth login flow for the CLI. + +Implements the localhost callback pattern: +1. Start a temporary HTTP server on a random port. +2. Open the Cognito hosted UI in the user's browser. +3. Receive the callback with the authorization code. +4. Exchange the code for a transient Cognito token. +5. Use that token to generate a long-lived API key. +""" + +import hashlib +import base64 +import secrets +import threading +import webbrowser +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse, parse_qs +from typing import Optional, Tuple + +from chipfoundry_cli.api import PlatformAPI, PlatformAPIError + +_LOGIN_SUCCESS_HTML = """ + +ChipFoundry CLI + + +
+

Login successful!

+

You can close this tab and return to the terminal.

+
""" + +_LOGIN_ERROR_HTML = """ + +ChipFoundry CLI + + +
+

Login failed

+

{error}

+
""" + + +def _generate_pkce() -> Tuple[str, str]: + """Generate PKCE code_verifier and code_challenge (S256).""" + code_verifier = secrets.token_urlsafe(64) + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + code_challenge = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + return code_verifier, code_challenge + + +class _CallbackHandler(BaseHTTPRequestHandler): + """HTTP handler that captures the OAuth callback.""" + + auth_code: Optional[str] = None + auth_state: Optional[str] = None + error: Optional[str] = None + + def do_GET(self): # noqa: N802 + parsed = urlparse(self.path) + params = parse_qs(parsed.query) + + error = params.get("error") + if error: + _CallbackHandler.error = error[0] + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write( + _LOGIN_ERROR_HTML.format(error=error[0]).encode("utf-8") + ) + else: + _CallbackHandler.auth_code = (params.get("code") or [None])[0] + _CallbackHandler.auth_state = (params.get("state") or [None])[0] + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write(_LOGIN_SUCCESS_HTML.encode("utf-8")) + + # Shut down the server after handling the callback + threading.Thread(target=self.server.shutdown, daemon=True).start() + + def log_message(self, format, *args): + pass # suppress request logging + + +def browser_login(api_url: str) -> str: + """ + Run the full browser-based login flow. + + Returns the plaintext API key (cfk_live_...) on success. + Raises PlatformAPIError on failure. + """ + api = PlatformAPI(api_url=api_url, api_key="") + + # Start a temporary server on a random port + server = HTTPServer(("127.0.0.1", 0), _CallbackHandler) + port = server.server_address[1] + redirect_uri = f"http://localhost:{port}/callback" + + # Get the authorize URL from the backend (includes Cognito domain, client_id) + try: + auth_info = api.get_authorize_url() + except PlatformAPIError as e: + raise PlatformAPIError(f"Could not get authorization URL: {e}") + + authorize_url: str = auth_info["authorization_url"] + backend_code_verifier: str = auth_info.get("code_verifier", "") + + # Generate our own PKCE pair for the CLI redirect + code_verifier, code_challenge = _generate_pkce() + + # Rebuild the authorize URL to use our localhost redirect and PKCE + from urllib.parse import urlencode, urlparse as _urlparse, parse_qs as _parse_qs + + parsed_auth = _urlparse(authorize_url) + auth_params = {k: v[0] for k, v in _parse_qs(parsed_auth.query).items()} + state = secrets.token_urlsafe(32) + auth_params.update({ + "redirect_uri": redirect_uri, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "state": state, + }) + final_url = f"{parsed_auth.scheme}://{parsed_auth.netloc}{parsed_auth.path}?{urlencode(auth_params)}" + + # Reset handler state + _CallbackHandler.auth_code = None + _CallbackHandler.auth_state = None + _CallbackHandler.error = None + + # Open browser and wait for callback + webbrowser.open(final_url) + + try: + server.serve_forever() + except KeyboardInterrupt: + server.server_close() + raise PlatformAPIError("Login cancelled.") + finally: + server.server_close() + + if _CallbackHandler.error: + raise PlatformAPIError(f"Authentication failed: {_CallbackHandler.error}") + + auth_code = _CallbackHandler.auth_code + if not auth_code: + raise PlatformAPIError("No authorization code received.") + + callback_state = _CallbackHandler.auth_state + if callback_state != state: + raise PlatformAPIError("State mismatch — possible CSRF attack. Login aborted.") + + # Exchange code for tokens via the backend + try: + token_data = api.exchange_portal_code( + code=auth_code, + code_verifier=code_verifier, + redirect_uri=redirect_uri, + state=state, + ) + except PlatformAPIError: + raise + except Exception as e: + raise PlatformAPIError(f"Token exchange failed: {e}") + + access_token = token_data.get("access_token") + if not access_token: + raise PlatformAPIError("No access token in response.") + + # Use the transient access token to generate a long-lived API key + try: + key_data = api.generate_api_key(cognito_token=access_token) + except PlatformAPIError: + raise + except Exception as e: + raise PlatformAPIError(f"API key generation failed: {e}") + + api_key = key_data.get("api_key") + if not api_key: + raise PlatformAPIError("No API key in response.") + + return api_key diff --git a/chipfoundry_cli/main.py b/chipfoundry_cli/main.py index d202562..8e70ad9 100644 --- a/chipfoundry_cli/main.py +++ b/chipfoundry_cli/main.py @@ -7,8 +7,11 @@ open_html_in_browser, download_with_progress, update_repo_files, fetch_versions_from_upstream, parse_user_defines_v, update_user_defines_v, get_gpio_config_from_project_json, save_gpio_config_to_project_json, - GPIO_MODES, GPIO_MODE_DESCRIPTIONS, GPIO_HEX_TO_MODE + GPIO_MODES, GPIO_MODE_DESCRIPTIONS, GPIO_HEX_TO_MODE, + get_api_key, get_api_url, get_platform_project_id, set_platform_project_id, + remove_platform_project_id, load_project_json, ) +from chipfoundry_cli.api import PlatformAPI, PlatformAPIError import os from pathlib import Path from rich.console import Console @@ -294,6 +297,207 @@ def keyview(): console.print("3. Wait for account approval") console.print("4. Use 'cf config' to configure your SFTP credentials") +@main.command('login') +def login_cmd(): + """Authenticate with the ChipFoundry platform via browser.""" + from chipfoundry_cli.auth import browser_login + + api_url = get_api_url() + config = load_user_config() + existing_key = config.get("api_key") + if existing_key: + console.print("[yellow]You are already logged in. Logging in again will replace your existing API key.[/yellow]") + proceed = console.input("Continue? (y/N): ").strip().lower() + if proceed != "y": + console.print("Cancelled.") + return + + console.print(f"Opening browser for authentication...") + console.print("[dim]Waiting for authentication... (press Ctrl+C to cancel)[/dim]") + + try: + api_key = browser_login(api_url=api_url) + except PlatformAPIError as e: + console.print(f"[red]{e}[/red]") + raise click.Abort() + except KeyboardInterrupt: + console.print("\n[yellow]Login cancelled.[/yellow]") + raise click.Abort() + + config["api_key"] = api_key + if "api_url" not in config: + config["api_url"] = api_url + save_user_config(config) + + # Validate and show who we logged in as + try: + api = PlatformAPI(api_url=api_url, api_key=api_key) + user_info = api.validate() + email = user_info.get("email", "unknown") + console.print(f"[green]Logged in as {email}.[/green]") + except Exception: + console.print("[green]API key saved.[/green]") + + console.print(f"[dim]API key saved to {get_config_path()}[/dim]") + + +@main.command('logout') +@click.option('--revoke', is_flag=True, help='Also revoke the API key on the server.') +def logout_cmd(revoke): + """Log out of the ChipFoundry platform.""" + config = load_user_config() + api_key = config.get("api_key") + if not api_key: + console.print("[yellow]Not logged in.[/yellow]") + return + + if revoke: + try: + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + api.revoke_api_key() + console.print("[dim]API key revoked on the server.[/dim]") + except PlatformAPIError as e: + console.print(f"[yellow]Could not revoke key on server: {e}[/yellow]") + + config.pop("api_key", None) + save_user_config(config) + console.print("[green]Logged out.[/green]") + + +@main.command('whoami') +def whoami_cmd(): + """Show the currently authenticated user.""" + api_key = get_api_key() + if not api_key: + console.print("[yellow]Not logged in. Run 'cf login' to authenticate.[/yellow]") + raise click.Abort() + + try: + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + user_info = api.validate() + except PlatformAPIError as e: + console.print(f"[red]{e}[/red]") + raise click.Abort() + + lines = [] + lines.append(f"[bold]Email:[/bold] {user_info.get('email', '—')}") + name_parts = [user_info.get("first_name"), user_info.get("last_name")] + name = " ".join(p for p in name_parts if p) or "—" + lines.append(f"[bold]Name:[/bold] {name}") + lines.append(f"[bold]Organization:[/bold] {user_info.get('organization') or '—'}") + lines.append(f"[bold]SFTP Username:[/bold] {user_info.get('sftp_username') or '—'}") + + console.print(Panel("\n".join(lines), title="ChipFoundry Account", border_style="cyan")) + + +@main.command('link') +@click.option('--id', 'project_uuid', required=False, help='Platform project UUID to link directly.') +@click.option('--name', 'project_name_filter', required=False, help='Search your projects by name.') +@click.option('--project-root', required=False, type=click.Path(exists=True, file_okay=False), help='Project directory (defaults to current directory).') +def link_cmd(project_uuid, project_name_filter, project_root): + """Link this local project to a platform project.""" + if not project_root: + project_root = os.getcwd() + + api_key = get_api_key() + if not api_key: + console.print("[yellow]Not logged in. Run 'cf login' first.[/yellow]") + raise click.Abort() + + # Check if project.json exists; if not, prompt to run cf init + project_json_path = Path(project_root) / ".cf" / "project.json" + if not project_json_path.exists(): + console.print("[yellow]No .cf/project.json found. Run 'cf init' first to create a local project.[/yellow]") + raise click.Abort() + + existing_id = get_platform_project_id(project_root) + if existing_id: + console.print(f"[yellow]This project is already linked to platform project {existing_id}.[/yellow]") + proceed = console.input("Replace the link? (y/N): ").strip().lower() + if proceed != "y": + console.print("Cancelled.") + return + + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + + if project_uuid: + try: + proj = api.get_project(project_uuid) + except PlatformAPIError as e: + console.print(f"[red]{e}[/red]") + raise click.Abort() + platform_id = str(proj["id"]) + else: + try: + projects = api.list_my_projects() + except PlatformAPIError as e: + console.print(f"[red]{e}[/red]") + raise click.Abort() + + if project_name_filter: + projects = [p for p in projects if project_name_filter.lower() in p.get("name", "").lower()] + + if not projects: + console.print("[yellow]No matching projects found on the platform.[/yellow]") + raise click.Abort() + + if len(projects) == 1 and project_name_filter: + proj = projects[0] + platform_id = str(proj["id"]) + else: + console.print("\n[bold]Your platform projects:[/bold]") + for i, p in enumerate(projects, 1): + status_str = p.get("status", "?") + shuttle_name = p.get("shuttle_name") or "—" + console.print(f" [bold][{i}][/bold] {p.get('name', '?')} — {status_str} (shuttle: {shuttle_name})") + console.print("") + pick = console.input(f"Select project (1-{len(projects)}): ").strip() + try: + idx = int(pick) - 1 + if 0 <= idx < len(projects): + proj = projects[idx] + platform_id = str(proj["id"]) + else: + console.print("[red]Invalid selection.[/red]") + raise click.Abort() + except ValueError: + console.print("[red]Invalid selection.[/red]") + raise click.Abort() + + set_platform_project_id(project_root, platform_id) + + console.print(f"[green]✓ Linked to platform project '{proj.get('name', platform_id)}'[/green]") + console.print(f" Platform ID: {platform_id}") + console.print(f" Status: {proj.get('status', '?')}") + shuttle_name = proj.get("shuttle_name") or "—" + console.print(f" Shuttle: {shuttle_name}") + portal_url = f"{get_api_url().rstrip('/')}/projects/{platform_id}" + console.print(f" Portal: {portal_url}") + + +@main.command('unlink') +@click.option('--project-root', required=False, type=click.Path(exists=True, file_okay=False), help='Project directory (defaults to current directory).') +def unlink_cmd(project_root): + """Remove the platform link from this local project.""" + if not project_root: + project_root = os.getcwd() + + platform_id = get_platform_project_id(project_root) + if not platform_id: + console.print("[yellow]This project is not linked to any platform project.[/yellow]") + return + + console.print(f"This will remove the link to platform project {platform_id}.") + console.print("[dim]The project will not be deleted on the platform.[/dim]") + proceed = console.input("Continue? (y/N): ").strip().lower() + if proceed != "y": + console.print("Cancelled.") + return + + remove_platform_project_id(project_root) + console.print("[green]✓ Platform link removed.[/green]") + + @main.command('init') @click.option('--project-root', required=False, type=click.Path(file_okay=False), help='Directory to create the project in (defaults to current directory).') def init(project_root): @@ -348,6 +552,11 @@ def init(project_root): json.dump(data, f, indent=2) console.print(f"[green]Initialized project at {project_json_path}[/green]") + if get_api_key(): + console.print("[dim]Tip: Run 'cf push' to upload and register this project on the platform.[/dim]") + else: + console.print("[dim]Tip: Run 'cf login' to connect this project to the ChipFoundry platform.[/dim]") + @main.command('gpio-config') @click.option('--project-root', required=False, type=click.Path(exists=True, file_okay=False), help='Path to the project directory (defaults to current directory).') @click.option('--view', is_flag=True, help='Display current GPIO configuration summary without editing.') @@ -1220,7 +1429,11 @@ def action_quit(self) -> None: @click.option('--project-type', help='Project type (auto-detected if not provided).', default=None) @click.option('--force-overwrite', is_flag=True, help='Overwrite existing files on SFTP without prompting.') @click.option('--dry-run', is_flag=True, help='Preview actions without uploading files.') -def push(project_root, sftp_host, sftp_username, sftp_key, project_id, project_name, project_type, force_overwrite, dry_run): +@click.option('--create-project', is_flag=True, help='Create a new project on the platform (skip prompt).') +@click.option('--shuttle', 'shuttle_flag', required=False, help='Shuttle name or ID for project creation.') +@click.option('--link-project', required=False, help='Link to an existing platform project by UUID (skip prompt).') +@click.option('--skip-platform', is_flag=True, help='Push to SFTP only, skip platform integration.') +def push(project_root, sftp_host, sftp_username, sftp_key, project_id, project_name, project_type, force_overwrite, dry_run, create_project, shuttle_flag, link_project, skip_platform): """Upload your project files to the ChipFoundry SFTP server.""" # If .cf/project.json exists in cwd, use it as default project_root and project_name cwd_root, cwd_project_name = get_project_json_from_cwd() @@ -1306,6 +1519,116 @@ def push(project_root, sftp_host, sftp_username, sftp_key, project_id, project_n existing_json_path=collected.get(".cf/project.json") ) + # --- Platform integration (first-push create/link/skip) --- + api_key = get_api_key() + platform_id = get_platform_project_id(project_root) + + if api_key and not platform_id and not dry_run and not skip_platform: + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + try: + user_info = api.validate() + user_email = user_info.get("email", "unknown") + except PlatformAPIError: + user_email = "unknown" + + if create_project: + choice = "1" + elif link_project: + choice = "2" + else: + console.print(f"\n[cyan]You're logged in as {user_email} but this project isn't connected to the platform yet.[/cyan]\n") + console.print(" [bold][1][/bold] Create a new project on the platform") + console.print(" [bold][2][/bold] Link to an existing platform project") + console.print(" [bold][3][/bold] Push to SFTP only (skip platform)\n") + choice = console.input("Choice (1/2/3): ").strip() + + if choice == "1": + try: + shuttles = api.list_shuttles() + except PlatformAPIError as e: + console.print(f"[yellow]Could not fetch shuttles: {e}[/yellow]") + shuttles = [] + + selected_shuttle_id = None + if shuttle_flag: + for s in shuttles: + if shuttle_flag in (s.get("name", ""), str(s.get("id", ""))): + selected_shuttle_id = str(s["id"]) + break + if not selected_shuttle_id: + console.print(f"[yellow]Shuttle '{shuttle_flag}' not found. Skipping shuttle selection.[/yellow]") + elif shuttles: + console.print("\n[bold]Select a shuttle:[/bold]") + for i, s in enumerate(shuttles, 1): + tapeout = s.get("tapeout_date") or "TBD" + console.print(f" [bold][{i}][/bold] {s['name']} ({s.get('process_node', '?')}, tapeout: {tapeout})") + console.print(f" [bold][{len(shuttles) + 1}][/bold] Skip — choose later\n") + shuttle_choice = console.input(f"Choice (1-{len(shuttles) + 1}): ").strip() + try: + idx = int(shuttle_choice) - 1 + if 0 <= idx < len(shuttles): + selected_shuttle_id = str(shuttles[idx]["id"]) + except (ValueError, IndexError): + pass + + local_name = project_name or Path(project_root).name + try: + proj = api.create_project( + name=local_name, + shuttle_id=selected_shuttle_id, + design_type=detected_type, + ) + platform_id = str(proj["id"]) + set_platform_project_id(project_root, platform_id) + console.print(f"[green]✓ Created platform project '{proj.get('name', local_name)}'[/green]") + console.print(f" Platform ID: {platform_id}") + shuttle_name = proj.get("shuttle_name") or "(none)" + console.print(f" Shuttle: {shuttle_name}") + console.print(f" Status: Draft") + portal_url = f"{get_api_url().rstrip('/')}/projects/{platform_id}" + console.print(f" Portal: {portal_url}\n") + except PlatformAPIError as e: + console.print(f"[yellow]Could not create platform project: {e}[/yellow]") + console.print("[dim]Proceeding with SFTP upload only.[/dim]\n") + + elif choice == "2": + try: + if link_project: + proj = api.get_project(link_project) + platform_id = str(proj["id"]) + set_platform_project_id(project_root, platform_id) + console.print(f"[green]✓ Linked to platform project '{proj.get('name')}'[/green]\n") + else: + projects = api.list_my_projects() + if not projects: + console.print("[yellow]No projects found on the platform. Proceeding with SFTP only.[/yellow]\n") + else: + console.print("\n[bold]Your platform projects:[/bold]") + for i, p in enumerate(projects, 1): + status_str = p.get("status", "?") + console.print(f" [bold][{i}][/bold] {p.get('name', '?')} — {status_str}") + console.print("") + pick = console.input(f"Select project (1-{len(projects)}): ").strip() + try: + idx = int(pick) - 1 + if 0 <= idx < len(projects): + selected = projects[idx] + platform_id = str(selected["id"]) + set_platform_project_id(project_root, platform_id) + console.print(f"[green]✓ Linked to platform project '{selected.get('name')}'[/green]\n") + else: + console.print("[yellow]Invalid choice. Proceeding with SFTP only.[/yellow]\n") + except ValueError: + console.print("[yellow]Invalid choice. Proceeding with SFTP only.[/yellow]\n") + except PlatformAPIError as e: + console.print(f"[yellow]Could not link project: {e}[/yellow]") + console.print("[dim]Proceeding with SFTP upload only.[/dim]\n") + else: + console.print("[dim]Skipping platform registration.[/dim]\n") + + elif not api_key and not dry_run: + console.print("[dim]Tip: Run 'cf login' to register this project on the platform.[/dim]") + # SFTP upload or dry-run final_project_name = project_name or ( cli_overrides.get("project_name") or Path(project_root).name @@ -1356,7 +1679,26 @@ def push(project_root, sftp_host, sftp_username, sftp_key, project_id, project_n force_overwrite=force_overwrite ) console.print(f"[green]✓ Uploaded to {sftp_base}[/green]") - + + # Update platform project metadata after successful upload + platform_id = get_platform_project_id(project_root) + if platform_id and api_key: + try: + pj_data = load_project_json(project_json_path) + gds_hash = pj_data.get("project", {}).get("user_project_wrapper_hash") + cli_version = pj_data.get("project", {}).get("version") + update_fields = {} + if gds_hash: + update_fields["gds_hash"] = gds_hash + if cli_version: + update_fields["cli_project_version"] = cli_version + if update_fields: + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + api.update_project(platform_id, **update_fields) + console.print("[dim]Platform project updated with latest GDS hash.[/dim]") + except Exception: + pass # non-critical + except Exception as e: console.print(f"[red]Upload failed: {e}[/red]") raise click.Abort() @@ -1463,22 +1805,162 @@ def pull(project_name, output_dir, sftp_host, sftp_username, sftp_key): transport.close() console.print(f"[dim]Disconnected from {sftp_host}[/dim]") +STATUS_COLORS = { + "draft": "blue", + "submitted": "yellow", + "in_review": "yellow", + "approved": "green", + "changes_requested": "red", + "confirmed": "cyan", + "in_production": "magenta", + "completed": "green", + "cancelled": "dim", + "archived": "dim", +} + +STATUS_HINTS = { + "draft": "Run `cf push` to upload your design files.", + "submitted": "Your project is awaiting review.", + "in_review": "Your project is being reviewed.", + "approved": "Your project has been approved! Run `cf confirm` to proceed with tapeout.", + "changes_requested": "Changes requested by the review team. See notes above.", + "confirmed": "Your project is confirmed for tapeout.", + "in_production": "Your project is in production.", + "completed": "Fabrication complete.", +} + + +def _show_platform_status(project_data): + """Render a Rich panel with platform project status.""" + status_val = (project_data.get("status") or "unknown").lower() + color = STATUS_COLORS.get(status_val, "white") + + lines = [] + lines.append(f"[bold]Name:[/bold] {project_data.get('name', '—')}") + lines.append(f"[bold]Status:[/bold] [{color}]{status_val.upper()}[/{color}]") + + shuttle_name = project_data.get("shuttle_name") + if shuttle_name: + shuttle_info = shuttle_name + milestones = project_data.get("shuttle_milestones") or {} + tapeout = milestones.get("tapeout_date") or project_data.get("shuttle_tapeout_date") + if tapeout: + shuttle_info += f" (tapeout: {tapeout})" + lines.append(f"[bold]Shuttle:[/bold] {shuttle_info}") + + design_type = project_data.get("design_type") + if design_type: + lines.append(f"[bold]Design Type:[/bold] {design_type}") + + pdk = project_data.get("technology_pdk") or project_data.get("shuttle_technology_pdk") + process = project_data.get("process_node") or project_data.get("shuttle_process_node") + if process or pdk: + lines.append(f"[bold]Process/PDK:[/bold] {process or '—'} / {pdk or '—'}") + + created = project_data.get("created_at", "")[:10] if project_data.get("created_at") else "—" + updated = project_data.get("updated_at", "")[:10] if project_data.get("updated_at") else "—" + lines.append(f"[bold]Created:[/bold] {created} [bold]Updated:[/bold] {updated}") + + gds_hash = project_data.get("gds_hash") + if gds_hash: + lines.append(f"[bold]GDS Hash:[/bold] {gds_hash[:16]}...") + + notes = project_data.get("admin_review_notes") + if notes: + note_color = "red" if status_val == "changes_requested" else "yellow" + lines.append("") + lines.append(f"[{note_color}][bold]Review Notes:[/bold] {notes}[/{note_color}]") + + hint = STATUS_HINTS.get(status_val) + if hint: + lines.append("") + lines.append(f"[dim]{hint}[/dim]") + + portal_url = f"{get_api_url().rstrip('/')}/projects/{project_data.get('id', '')}" + lines.append(f"\n[dim]Portal: {portal_url}[/dim]") + + console.print(Panel("\n".join(lines), title="Platform Project", border_style=color)) + + @main.command('status') @click.option('--sftp-host', default=DEFAULT_SFTP_HOST, show_default=True, help='SFTP server hostname.') @click.option('--sftp-username', required=False, help='SFTP username (defaults to config).') @click.option('--sftp-key', type=click.Path(exists=True, dir_okay=False), help='Path to SFTP private key file (defaults to config).', default=None, show_default=False) -def status(sftp_host, sftp_username, sftp_key): - """Show all projects and outputs for the user on the SFTP server.""" +@click.option('--all', 'show_all', is_flag=True, help='List all platform projects.') +@click.option('--json', 'output_json', is_flag=True, help='Output platform project data as JSON.') +def status(sftp_host, sftp_username, sftp_key, show_all, output_json): + """Show project status (platform + SFTP).""" config = load_user_config() + api_key = config.get("api_key") + + # --- --all: list all platform projects --- + if show_all: + if not api_key: + console.print("[yellow]Not logged in. Run 'cf login' to see platform projects.[/yellow]") + raise click.Abort() + try: + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + projects = api.list_my_projects() + except PlatformAPIError as e: + console.print(f"[red]{e}[/red]") + raise click.Abort() + + if output_json: + console.print_json(data=projects) + return + + if not projects: + console.print("[yellow]No projects found on the platform.[/yellow]") + return + + table = Table(title="Your Platform Projects") + table.add_column("Name", style="cyan", no_wrap=True) + table.add_column("Shuttle", style="white") + table.add_column("Status", no_wrap=True) + table.add_column("Updated", style="dim") + for p in projects: + s = (p.get("status") or "?").lower() + color = STATUS_COLORS.get(s, "white") + table.add_row( + p.get("name", "?"), + p.get("shuttle_name") or "—", + f"[{color}]{s.upper()}[/{color}]", + (p.get("updated_at") or "")[:10], + ) + console.print(table) + return + + # --- Platform project status panel (if linked) --- + project_root = os.getcwd() + platform_id = get_platform_project_id(project_root) + + if platform_id and api_key: + try: + api = PlatformAPI(api_url=get_api_url(), api_key=api_key) + project_data = api.get_project(platform_id) + if output_json: + console.print_json(data=project_data) + return + _show_platform_status(project_data) + console.print("") + except PlatformAPIError as e: + console.print(f"[yellow]Could not reach the platform API. Showing SFTP status only.[/yellow]") + console.print(f"[dim]{e}[/dim]\n") + elif output_json: + console.print_json(data={"error": "No platform project linked."}) + return + + # --- SFTP status (existing behavior) --- if not sftp_username: sftp_username = config.get("sftp_username") if not sftp_username: + if platform_id and api_key: + return # already showed platform status, SFTP not configured console.print("[red]No SFTP username provided and not found in config. Please run 'cf config' or provide --sftp-username.[/red]") raise click.Abort() if not sftp_key: sftp_key = config.get("sftp_key") - # Always resolve key_path to absolute path if set if sftp_key: key_path = os.path.abspath(os.path.expanduser(sftp_key)) else: @@ -1501,7 +1983,6 @@ def status(sftp_host, sftp_username, sftp_key): console.print(f"[red]Failed to connect to SFTP: {e}[/red]") raise click.Abort() try: - # List projects in incoming/projects/, outgoing/results/, and archive/ incoming_projects_dir = f"incoming/projects" outgoing_results_dir = f"outgoing/results" archive_dir = f"archive" @@ -1520,59 +2001,40 @@ def status(sftp_host, sftp_username, sftp_key): pass try: archived_items = sftp.listdir(archive_dir) - # Filter for project directories and parse timestamps for item in archived_items: if '_' in item and len(item.split('_')) >= 3: - # Try to parse timestamp from format like "serial_example_20250813_150354" parts = item.split('_') if len(parts) >= 3: - # Check if the last two parts look like date and time date_part = parts[-2] time_part = parts[-1] if len(date_part) == 8 and len(time_part) == 6 and date_part.isdigit() and time_part.isdigit(): - # This looks like a timestamped archive - project_name = '_'.join(parts[:-2]) # Everything except date and time + project_name = '_'.join(parts[:-2]) timestamp_str = f"{date_part}_{time_part}" archived_projects.append((project_name, timestamp_str, item)) except Exception: pass - # Create main status table table = Table(title=f"SFTP Status for {sftp_username}") table.add_column("Project Name", style="cyan", no_wrap=True) table.add_column("Has Input", style="yellow") table.add_column("Has Output", style="green") table.add_column("Last Tapeout Run", style="blue") - # Find the most recent archived project (latest tapeout) latest_tapeout = None if archived_projects: - # Sort by timestamp to find the most recent - archived_projects.sort(key=lambda x: x[1], reverse=True) # Sort by timestamp descending + archived_projects.sort(key=lambda x: x[1], reverse=True) latest_tapeout = archived_projects[0] - # Parse timestamp to human-readable format try: - # timestamp format is "20250813_150354" date_part, time_part = latest_tapeout[1].split('_') - year = date_part[:4] - month = date_part[4:6] - day = date_part[6:8] - hour = time_part[:2] - minute = time_part[2:4] - second = time_part[4:6] - - formatted_time = f"{year}-{month}-{day} {hour}:{minute}:{second}" - except: + formatted_time = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]} {time_part[:2]}:{time_part[2:4]}:{time_part[4:6]}" + except Exception: formatted_time = latest_tapeout[1] - # Show only the latest tapeout run - # Check if this project has input and output files has_input = "Yes" if latest_tapeout[0] in projects else "No" has_output = "Yes" if latest_tapeout[0] in results else "No" table.add_row(latest_tapeout[0], has_input, has_output, formatted_time) else: - # No tapeout runs yet, show active projects with their status all_projects = set(projects) | set(results) for proj in sorted(all_projects): has_input = "Yes" if proj in projects else "No" @@ -1585,11 +2047,15 @@ def status(sftp_host, sftp_username, sftp_key): else: console.print("[yellow]No projects or results found on SFTP server.[/yellow]") - # Add informative message about tapeout status - if not archived_projects and all_projects: - console.print("\n[cyan]Note: No tapeout runs have started yet. Your projects are waiting in the queue.[/cyan]") - elif not archived_projects and not all_projects: - console.print("\n[cyan]Note: No projects found and no tapeout runs have started yet.[/cyan]") + if not archived_projects: + all_sftp = set(projects) | set(results) + if all_sftp: + console.print("\n[cyan]Note: No tapeout runs have started yet. Your projects are waiting in the queue.[/cyan]") + elif not all_sftp: + console.print("\n[cyan]Note: No projects found and no tapeout runs have started yet.[/cyan]") + + if not platform_id: + console.print("\n[dim]Tip: Run 'cf push' to register this project on the platform, or 'cf link' to connect to an existing one.[/dim]") finally: if transport: sftp.close() diff --git a/chipfoundry_cli/utils.py b/chipfoundry_cli/utils.py index 8b85c1b..34fb22d 100644 --- a/chipfoundry_cli/utils.py +++ b/chipfoundry_cli/utils.py @@ -411,6 +411,56 @@ def save_user_config(config: dict): config_path.parent.mkdir(parents=True, exist_ok=True) with open(config_path, 'w') as f: toml.dump(config, f) + try: + os.chmod(config_path, 0o600) + except OSError: + pass + + +DEFAULT_API_URL = "https://platform.chipfoundry.io" + + +def get_api_key() -> Optional[str]: + """Read the API key from config.toml, or None if absent.""" + return load_user_config().get("api_key") + + +def get_api_url() -> str: + """Read the API base URL from config.toml, with a sensible default.""" + return load_user_config().get("api_url", DEFAULT_API_URL) + + +def get_platform_project_id(project_root: str) -> Optional[str]: + """Read platform_project_id from .cf/project.json, or None if absent.""" + project_json_path = Path(project_root) / ".cf" / "project.json" + if not project_json_path.exists(): + return None + data = load_project_json(str(project_json_path)) + return data.get("project", {}).get("platform_project_id") + + +def set_platform_project_id(project_root: str, project_id: str): + """Write platform_project_id into .cf/project.json, preserving all existing fields.""" + project_json_path = str(Path(project_root) / ".cf" / "project.json") + if Path(project_json_path).exists(): + data = load_project_json(project_json_path) + else: + data = {"project": {}} + if "project" not in data: + data["project"] = {} + data["project"]["platform_project_id"] = project_id + save_project_json(project_json_path, data) + + +def remove_platform_project_id(project_root: str): + """Remove platform_project_id from .cf/project.json.""" + project_json_path = str(Path(project_root) / ".cf" / "project.json") + if not Path(project_json_path).exists(): + return + data = load_project_json(project_json_path) + if "project" in data and "platform_project_id" in data["project"]: + del data["project"]["platform_project_id"] + save_project_json(project_json_path, data) def open_html_in_browser(html_path: str): """