#!/usr/bin/env python3
"""
k_client_portal — browser-facing portal running in k_client.
Serves the single-page UI and thin API shim that delegates every auth and
resource operation to k_proxy over the localhost-forwarded TLS endpoint.
Persists one preferred username locally; all session and enrollment state
lives in k_proxy.
"""
from __future__ import annotations
import argparse
import json
import ssl
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
HTML = """
ChromeCard Client Flow
ChromeCard Client Flow
This page runs in `k_client` and drives the real split-VM flow:
register a user, ask the card in `k_proxy` for approval, and then call
the protected counter on `k_server` only if auth succeeds.
Browser: k_clientCard: k_proxyResource: k_server
Registration: press yes on the card to enroll.
Login: press yes to allow the identity check, or
no to deny it. If login is denied, this page will
show that `k_server` was not called.
1
Register user
Creates or refreshes the enrolled identity in `k_proxy`.
2
Authenticate with the card
`k_proxy` asks the card for approval. Press `yes` to continue or `no` to reject.
3
Call `k_server`
The protected counter is only reached when login created a valid session.
Client State
Enrolled user: unknown
Session: unknown
Expires: unknown
Registered Users
Loading users...
Flow Result
No flow run yet.
Event Log
"""
@dataclass
class EnrollmentRecord:
username: str
class ClientState:
def __init__(
self,
proxy_base_url: str,
proxy_ca_file: str | None,
enroll_db: Path,
interactive_timeout_s: float = 90.0,
default_timeout_s: float = 10.0,
):
self.proxy_base_url = proxy_base_url.rstrip("/")
self.proxy_ca_file = proxy_ca_file
self.enroll_db = enroll_db
# Registration and login both require a physical card touch, which can
# take up to ~60 s in practice; 90 s gives a generous margin.
self.interactive_timeout_s = interactive_timeout_s
self.default_timeout_s = default_timeout_s
self.lock = threading.Lock()
self.preferred_enrollment: EnrollmentRecord | None = None
self.session_token: str | None = None
self.session_expires_at: int | None = None
# Build the TLS context once; creating it on every request is expensive
# and the CA file doesn't change at runtime.
self._ssl_ctx: ssl.SSLContext | None = (
ssl.create_default_context(cafile=self.proxy_ca_file)
if proxy_base_url.startswith("https://")
else None
)
self._load_preferred_enrollment()
def _ssl_context(self) -> ssl.SSLContext | None:
return self._ssl_ctx
def _proxy_json(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
*,
timeout_s: float | None = None,
) -> tuple[int, dict[str, Any]]:
req = Request(f"{self.proxy_base_url}{path}", method=method)
req.add_header("Content-Type", "application/json")
token = self.get_session_token()
if token:
req.add_header("Authorization", f"Bearer {token}")
body = json.dumps(payload or {}).encode("utf-8")
try:
with urlopen(
req,
data=body,
timeout=timeout_s or self.default_timeout_s,
context=self._ssl_context(),
) as resp:
return resp.status, json.loads(resp.read().decode("utf-8"))
except HTTPError as exc:
try:
return exc.code, json.loads(exc.read().decode("utf-8"))
except Exception:
return exc.code, {"ok": False, "error": f"proxy http error {exc.code}"}
except URLError as exc:
return 502, {"ok": False, "error": f"proxy unavailable: {exc.reason}"}
except Exception as exc:
return 502, {"ok": False, "error": f"proxy call failed: {exc}"}
def _load_preferred_enrollment(self) -> None:
if not self.enroll_db.exists():
return
try:
data = json.loads(self.enroll_db.read_text())
username = str(data.get("username", "")).strip()
if username:
self.preferred_enrollment = EnrollmentRecord(username=username)
except Exception:
self.preferred_enrollment = None
def _save_preferred_enrollment_locked(self) -> None:
self.enroll_db.parent.mkdir(parents=True, exist_ok=True)
payload = {"username": self.preferred_enrollment.username if self.preferred_enrollment else None}
self.enroll_db.write_text(json.dumps(payload, indent=2) + "\n")
def enroll(self, username: str) -> dict[str, Any]:
username = username.strip()
if not username:
return {"ok": False, "error": "username required"}
# Best-effort: invalidate any active session on k_proxy before re-enrolling.
# The new credential will differ from what the old session was issued for.
with self.lock:
old_token = self.session_token
if old_token:
self._proxy_json("POST", "/session/logout")
status, data = self._proxy_json(
"POST",
"/enroll/register",
{"username": username},
timeout_s=self.interactive_timeout_s,
)
if status != 200:
return data
with self.lock:
self.preferred_enrollment = EnrollmentRecord(username=username)
self._save_preferred_enrollment_locked()
self.session_token = None
self.session_expires_at = None
return {
"ok": True,
"enrolled_username": username,
"proxy_enrollment": data,
}
def list_enrollments(self) -> tuple[int, dict[str, Any]]:
return self._proxy_json("GET", "/enroll/list")
def delete_enrollment(self, username: str) -> tuple[int, dict[str, Any]]:
username = username.strip()
if not username:
return 400, {"ok": False, "error": "username required"}
status, data = self._proxy_json("POST", "/enroll/delete", {"username": username})
if status == 200:
with self.lock:
if self.preferred_enrollment and self.preferred_enrollment.username == username:
self.preferred_enrollment = None
self._save_preferred_enrollment_locked()
self.session_token = None
self.session_expires_at = None
return status, data
def snapshot(self) -> dict[str, Any]:
with self.lock:
return {
"ok": True,
"enrolled_username": self.preferred_enrollment.username if self.preferred_enrollment else None,
"session_active": bool(self.session_token),
"session_expires_at": self.session_expires_at,
"proxy_base_url": self.proxy_base_url,
}
def get_session_token(self) -> str | None:
with self.lock:
return self.session_token
def login(self, username: str | None = None) -> tuple[int, dict[str, Any]]:
requested = (username or "").strip()
with self.lock:
if requested:
username = requested
elif self.preferred_enrollment:
username = self.preferred_enrollment.username
else:
return 400, {"ok": False, "error": "no enrolled user"}
status, data = self._proxy_json(
"POST",
"/session/login",
{"username": username},
timeout_s=self.interactive_timeout_s,
)
if status == 200 and data.get("session_token"):
with self.lock:
self.preferred_enrollment = EnrollmentRecord(username=username)
self._save_preferred_enrollment_locked()
self.session_token = data["session_token"]
self.session_expires_at = int(data.get("expires_at", 0)) or None
return status, data
def status(self) -> tuple[int, dict[str, Any]]:
return self._proxy_json("POST", "/session/status")
def counter(self) -> tuple[int, dict[str, Any]]:
return self._proxy_json("POST", "/resource/counter")
def logout(self) -> tuple[int, dict[str, Any]]:
status, data = self._proxy_json("POST", "/session/logout")
if status == 200:
with self.lock:
self.session_token = None
self.session_expires_at = None
return status, data
class Handler(BaseHTTPRequestHandler):
state: ClientState
def _json(self, status: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _html(self, body: str) -> None:
data = body.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def _require_json(self) -> dict[str, Any] | None:
# Returns None and sends 400 when the body is unparseable; the caller
# should return immediately without sending a second response.
try:
return self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return None
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/":
self._html(HTML)
return
if path == "/health":
self._json(200, {"ok": True, "service": "k_client_portal", "time": int(time.time())})
return
if path == "/api/client/state":
self._json(200, self.state.snapshot())
return
if path == "/api/enrollments":
status, data = self.state.list_enrollments()
self._json(status, data)
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/api/enroll":
data = self._require_json()
if data is None:
return
result = self.state.enroll(str(data.get("username", "")))
self._json(200 if result.get("ok") else 400, result)
return
if path == "/api/login":
data = self._require_json()
if data is None:
return
status, data = self.state.login(str(data.get("username", "")))
self._json(status, data)
return
if path == "/api/enroll/delete":
data = self._require_json()
if data is None:
return
status, data = self.state.delete_enrollment(str(data.get("username", "")))
self._json(status, data)
return
if path == "/api/status":
status, data = self.state.status()
self._json(status, data)
return
if path == "/api/resource/counter":
status, data = self.state.counter()
self._json(status, data)
return
if path == "/api/logout":
status, data = self.state.logout()
self._json(status, data)
return
self.send_error(404)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run browser-facing client portal in k_client")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8766)
parser.add_argument("--proxy-base-url", default="https://127.0.0.1:9771")
parser.add_argument("--proxy-ca-file", help="CA certificate used to verify k_proxy HTTPS certificate")
parser.add_argument("--enroll-db", default="/home/user/chromecard/k_client_enrollment.json")
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.proxy_base_url.startswith("https://") and not args.proxy_ca_file:
raise SystemExit("--proxy-ca-file is required when --proxy-base-url uses https")
Handler.state = ClientState(
proxy_base_url=args.proxy_base_url,
proxy_ca_file=args.proxy_ca_file,
enroll_db=Path(args.enroll_db),
)
server = ThreadingHTTPServer((args.host, args.port), Handler)
print(f"k_client_portal listening on http://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())