#!/usr/bin/env python3 """ k_client_portal — browser-facing portal running in k_client. Serves the single-page UI and thin API shim that delegates every auth and resource operation to k_proxy over the localhost-forwarded TLS endpoint. Persists one preferred username locally; all session and enrollment state lives in k_proxy. """ from __future__ import annotations import argparse import json import ssl import threading import time from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen HTML = """ ChromeCard Client Flow

ChromeCard Client Flow

This page runs in `k_client` and drives the real split-VM flow: register a user, ask the card in `k_proxy` for approval, and then call the protected counter on `k_server` only if auth succeeds.

Browser: k_client Card: k_proxy Resource: k_server
Registration: press yes on the card to enroll. Login: press yes to allow the identity check, or no to deny it. If login is denied, this page will show that `k_server` was not called.
1
Register user
Creates or refreshes the enrolled identity in `k_proxy`.
2
Authenticate with the card
`k_proxy` asks the card for approval. Press `yes` to continue or `no` to reject.
3
Call `k_server`
The protected counter is only reached when login created a valid session.

Client State

Enrolled user: unknown
Session: unknown
Expires: unknown

Registered Users

Loading users...

Flow Result

No flow run yet.

Event Log


      
""" @dataclass class EnrollmentRecord: username: str class ClientState: def __init__( self, proxy_base_url: str, proxy_ca_file: str | None, enroll_db: Path, interactive_timeout_s: float = 90.0, default_timeout_s: float = 10.0, ): self.proxy_base_url = proxy_base_url.rstrip("/") self.proxy_ca_file = proxy_ca_file self.enroll_db = enroll_db # Registration and login both require a physical card touch, which can # take up to ~60 s in practice; 90 s gives a generous margin. self.interactive_timeout_s = interactive_timeout_s self.default_timeout_s = default_timeout_s self.lock = threading.Lock() self.preferred_enrollment: EnrollmentRecord | None = None self.session_token: str | None = None self.session_expires_at: int | None = None # Build the TLS context once; creating it on every request is expensive # and the CA file doesn't change at runtime. self._ssl_ctx: ssl.SSLContext | None = ( ssl.create_default_context(cafile=self.proxy_ca_file) if proxy_base_url.startswith("https://") else None ) self._load_preferred_enrollment() def _ssl_context(self) -> ssl.SSLContext | None: return self._ssl_ctx def _proxy_json( self, method: str, path: str, payload: dict[str, Any] | None = None, *, timeout_s: float | None = None, ) -> tuple[int, dict[str, Any]]: req = Request(f"{self.proxy_base_url}{path}", method=method) req.add_header("Content-Type", "application/json") token = self.get_session_token() if token: req.add_header("Authorization", f"Bearer {token}") body = json.dumps(payload or {}).encode("utf-8") try: with urlopen( req, data=body, timeout=timeout_s or self.default_timeout_s, context=self._ssl_context(), ) as resp: return resp.status, json.loads(resp.read().decode("utf-8")) except HTTPError as exc: try: return exc.code, json.loads(exc.read().decode("utf-8")) except Exception: return exc.code, {"ok": False, "error": f"proxy http error {exc.code}"} except URLError as exc: return 502, {"ok": False, "error": f"proxy unavailable: {exc.reason}"} except Exception as exc: return 502, {"ok": False, "error": f"proxy call failed: {exc}"} def _load_preferred_enrollment(self) -> None: if not self.enroll_db.exists(): return try: data = json.loads(self.enroll_db.read_text()) username = str(data.get("username", "")).strip() if username: self.preferred_enrollment = EnrollmentRecord(username=username) except Exception: self.preferred_enrollment = None def _save_preferred_enrollment_locked(self) -> None: self.enroll_db.parent.mkdir(parents=True, exist_ok=True) payload = {"username": self.preferred_enrollment.username if self.preferred_enrollment else None} self.enroll_db.write_text(json.dumps(payload, indent=2) + "\n") def enroll(self, username: str) -> dict[str, Any]: username = username.strip() if not username: return {"ok": False, "error": "username required"} # Best-effort: invalidate any active session on k_proxy before re-enrolling. # The new credential will differ from what the old session was issued for. with self.lock: old_token = self.session_token if old_token: self._proxy_json("POST", "/session/logout") status, data = self._proxy_json( "POST", "/enroll/register", {"username": username}, timeout_s=self.interactive_timeout_s, ) if status != 200: return data with self.lock: self.preferred_enrollment = EnrollmentRecord(username=username) self._save_preferred_enrollment_locked() self.session_token = None self.session_expires_at = None return { "ok": True, "enrolled_username": username, "proxy_enrollment": data, } def list_enrollments(self) -> tuple[int, dict[str, Any]]: return self._proxy_json("GET", "/enroll/list") def delete_enrollment(self, username: str) -> tuple[int, dict[str, Any]]: username = username.strip() if not username: return 400, {"ok": False, "error": "username required"} status, data = self._proxy_json("POST", "/enroll/delete", {"username": username}) if status == 200: with self.lock: if self.preferred_enrollment and self.preferred_enrollment.username == username: self.preferred_enrollment = None self._save_preferred_enrollment_locked() self.session_token = None self.session_expires_at = None return status, data def snapshot(self) -> dict[str, Any]: with self.lock: return { "ok": True, "enrolled_username": self.preferred_enrollment.username if self.preferred_enrollment else None, "session_active": bool(self.session_token), "session_expires_at": self.session_expires_at, "proxy_base_url": self.proxy_base_url, } def get_session_token(self) -> str | None: with self.lock: return self.session_token def login(self, username: str | None = None) -> tuple[int, dict[str, Any]]: requested = (username or "").strip() with self.lock: if requested: username = requested elif self.preferred_enrollment: username = self.preferred_enrollment.username else: return 400, {"ok": False, "error": "no enrolled user"} status, data = self._proxy_json( "POST", "/session/login", {"username": username}, timeout_s=self.interactive_timeout_s, ) if status == 200 and data.get("session_token"): with self.lock: self.preferred_enrollment = EnrollmentRecord(username=username) self._save_preferred_enrollment_locked() self.session_token = data["session_token"] self.session_expires_at = int(data.get("expires_at", 0)) or None return status, data def status(self) -> tuple[int, dict[str, Any]]: return self._proxy_json("POST", "/session/status") def counter(self) -> tuple[int, dict[str, Any]]: return self._proxy_json("POST", "/resource/counter") def logout(self) -> tuple[int, dict[str, Any]]: status, data = self._proxy_json("POST", "/session/logout") if status == 200: with self.lock: self.session_token = None self.session_expires_at = None return status, data class Handler(BaseHTTPRequestHandler): state: ClientState def _json(self, status: int, payload: dict[str, Any]) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _html(self, body: str) -> None: data = body.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length) if not raw: return {} return json.loads(raw.decode("utf-8")) def _require_json(self) -> dict[str, Any] | None: # Returns None and sends 400 when the body is unparseable; the caller # should return immediately without sending a second response. try: return self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return None def do_GET(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/": self._html(HTML) return if path == "/health": self._json(200, {"ok": True, "service": "k_client_portal", "time": int(time.time())}) return if path == "/api/client/state": self._json(200, self.state.snapshot()) return if path == "/api/enrollments": status, data = self.state.list_enrollments() self._json(status, data) return self.send_error(404) def do_POST(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/api/enroll": data = self._require_json() if data is None: return result = self.state.enroll(str(data.get("username", ""))) self._json(200 if result.get("ok") else 400, result) return if path == "/api/login": data = self._require_json() if data is None: return status, data = self.state.login(str(data.get("username", ""))) self._json(status, data) return if path == "/api/enroll/delete": data = self._require_json() if data is None: return status, data = self.state.delete_enrollment(str(data.get("username", ""))) self._json(status, data) return if path == "/api/status": status, data = self.state.status() self._json(status, data) return if path == "/api/resource/counter": status, data = self.state.counter() self._json(status, data) return if path == "/api/logout": status, data = self.state.logout() self._json(status, data) return self.send_error(404) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run browser-facing client portal in k_client") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8766) parser.add_argument("--proxy-base-url", default="https://127.0.0.1:9771") parser.add_argument("--proxy-ca-file", help="CA certificate used to verify k_proxy HTTPS certificate") parser.add_argument("--enroll-db", default="/home/user/chromecard/k_client_enrollment.json") return parser.parse_args() def main() -> int: args = parse_args() if args.proxy_base_url.startswith("https://") and not args.proxy_ca_file: raise SystemExit("--proxy-ca-file is required when --proxy-base-url uses https") Handler.state = ClientState( proxy_base_url=args.proxy_base_url, proxy_ca_file=args.proxy_ca_file, enroll_db=Path(args.enroll_db), ) server = ThreadingHTTPServer((args.host, args.port), Handler) print(f"k_client_portal listening on http://{args.host}:{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())