#!/usr/bin/env python3 """ Minimal k_proxy service for Phase 5 bring-up. Behavior: - Creates short-lived sessions after a card-backed auth gate. - Reuses valid sessions to access k_server protected counter endpoint. - Supports enrollment, session status, and logout. Notes: - Default runtime still uses the legacy card-presence probe gate. - Experimental direct FIDO2 registration/assertion lives behind `--auth-mode fido2-direct`. - This is still a prototype and not a final production auth design. """ from __future__ import annotations import argparse import base64 import http.client import json import queue import re import secrets import ssl import subprocess import threading import time from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen from fido2.client import Fido2Client, UserInteraction, verify_rp_id from fido2.hid import CtapHidDevice from fido2.server import Fido2Server from fido2.webauthn import ( AttestedCredentialData, PublicKeyCredentialCreationOptions, PublicKeyCredentialRequestOptions, PublicKeyCredentialRpEntity, PublicKeyCredentialUserEntity, UserVerificationRequirement, ) try: from fido2.client import ClientDataCollector, CollectedClientData except ImportError: ClientDataCollector = None CollectedClientData = None HTML = """ ChromeCard Proxy Portal

ChromeCard Proxy Portal

Primary browser entry point for the current prototype. Browser traffic now targets k_proxy directly. Enrollment, card-backed login, session reuse, counter access, and logout all happen on this TLS endpoint.

Enrollment

Stored username: none
Session active: no

Session Flow


  
""" @dataclass class Session: username: str expires_at: float @dataclass class Enrollment: username: str display_name: str | None created_at: int updated_at: int user_id_b64: str | None = None credential_data_b64: str | None = None USERNAME_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9._-]{1,30}[a-z0-9])?$") AUTH_MODE_PROBE = "probe" AUTH_MODE_FIDO2_DIRECT = "fido2-direct" def normalize_username(raw: str) -> str: username = raw.strip().lower() if not USERNAME_PATTERN.fullmatch(username): raise ValueError( "username must be 3-32 chars of lowercase letters, digits, dot, underscore, or dash" ) return username def normalize_display_name(raw: str | None) -> str | None: value = (raw or "").strip() if not value: return None if len(value) > 64: raise ValueError("display_name must be 64 characters or fewer") return value def b64u_encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def b64u_decode(data: str) -> bytes: pad = "=" * ((4 - len(data) % 4) % 4) return base64.urlsafe_b64decode((data + pad).encode("ascii")) def enrollment_payload(enrollment: "Enrollment", *, created: bool | None = None) -> dict[str, Any]: payload: dict[str, Any] = { "ok": True, "username": enrollment.username, "display_name": enrollment.display_name, "created_at": enrollment.created_at, "updated_at": enrollment.updated_at, "has_credential": bool(enrollment.credential_data_b64), } if created is not None: payload["created"] = created return payload if ClientDataCollector is not None and CollectedClientData is not None: class ProxyClientDataCollector(ClientDataCollector): def __init__(self, origin: str, rp_id: str): if not verify_rp_id(rp_id, origin): raise ValueError(f"origin {origin!r} is not valid for rp_id {rp_id!r}") self.origin = origin self.rp_id = rp_id def collect_client_data( self, options: PublicKeyCredentialCreationOptions | PublicKeyCredentialRequestOptions, ) -> tuple[CollectedClientData, str]: if isinstance(options, PublicKeyCredentialCreationOptions): request_type = "webauthn.create" requested_rp_id = options.rp.id challenge = options.challenge elif isinstance(options, PublicKeyCredentialRequestOptions): request_type = "webauthn.get" requested_rp_id = options.rp_id challenge = options.challenge else: raise TypeError(f"unsupported options type: {type(options)!r}") if requested_rp_id != self.rp_id: raise ValueError(f"rp_id mismatch: expected {self.rp_id}, got {requested_rp_id}") return CollectedClientData.create( type=request_type, challenge=challenge, origin=self.origin, ), self.rp_id else: ProxyClientDataCollector = None class ProxyUserInteraction(UserInteraction): def prompt_up(self) -> None: print("Touch the ChromeCard to continue...", flush=True) super().prompt_up() def request_pin(self, permissions, rp_id: str | None) -> str | None: print("Authenticator PIN is required but not supported by this prototype.", flush=True) return super().request_pin(permissions, rp_id) class ProxyState: def __init__( self, session_ttl_s: int, auth_mode: str, auth_command: str, server_base_url: str, server_ca_file: str | None, server_max_connections: int, proxy_token: str, enrollment_db: Path, rp_id: str, rp_name: str, origin: str, ): self.session_ttl_s = session_ttl_s self.auth_mode = auth_mode self.auth_command = auth_command self.server_base_url = server_base_url.rstrip("/") self.server_ca_file = server_ca_file self.proxy_token = proxy_token self.enrollment_db = enrollment_db self.rp_id = rp_id self.origin = origin self.lock = threading.Lock() self.sessions: dict[str, Session] = {} self.enrollments: dict[str, Enrollment] = {} self.rp = PublicKeyCredentialRpEntity(id=rp_id, name=rp_name) self.fido_server = Fido2Server(self.rp) self.client_data_collector = ( ProxyClientDataCollector(origin=origin, rp_id=rp_id) if ProxyClientDataCollector else None ) self.upstream = UpstreamPool( server_base_url=self.server_base_url, server_ca_file=self.server_ca_file, max_connections=server_max_connections, ) self._load_enrollments() def uses_direct_fido2(self) -> bool: return self.auth_mode == AUTH_MODE_FIDO2_DIRECT def auth_mode_label(self) -> str: return "fido2_assertion" if self.uses_direct_fido2() else "card_presence_probe" def _now(self) -> float: return time.time() def _gc_locked(self) -> None: now = self._now() dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now] for token in dead: del self.sessions[token] def create_session(self, username: str) -> tuple[str, float]: token = secrets.token_urlsafe(32) now = self._now() expires_at = now + self.session_ttl_s with self.lock: self._gc_locked() self.sessions[token] = Session(username=username, expires_at=expires_at) return token, expires_at def get_session(self, token: str) -> Session | None: with self.lock: self._gc_locked() return self.sessions.get(token) def invalidate_session(self, token: str) -> bool: with self.lock: return self.sessions.pop(token, None) is not None def active_session_count(self) -> int: with self.lock: self._gc_locked() return len(self.sessions) def _load_enrollments(self) -> None: if not self.enrollment_db.exists(): return try: payload = json.loads(self.enrollment_db.read_text()) users = payload.get("users", []) for item in users: username = str(item.get("username", "")).strip() if not username: continue created_at = int(item.get("created_at", item.get("enrolled_at", int(self._now())))) updated_at = int(item.get("updated_at", created_at)) self.enrollments[username] = Enrollment( username=username, display_name=normalize_display_name(item.get("display_name")), created_at=created_at, updated_at=updated_at, user_id_b64=item.get("user_id_b64"), credential_data_b64=item.get("credential_data_b64"), ) except Exception: self.enrollments = {} def _save_enrollments_locked(self) -> None: self.enrollment_db.parent.mkdir(parents=True, exist_ok=True) users = [ { "username": enrollment.username, "display_name": enrollment.display_name, "created_at": enrollment.created_at, "updated_at": enrollment.updated_at, "user_id_b64": enrollment.user_id_b64, "credential_data_b64": enrollment.credential_data_b64, } for enrollment in sorted(self.enrollments.values(), key=lambda item: item.username) ] self.enrollment_db.write_text(json.dumps({"users": users}, indent=2) + "\n") def _new_fido_client(self) -> Fido2Client: try: device = next(CtapHidDevice.list_devices()) except StopIteration as exc: raise RuntimeError("no CTAP HID devices found") from exc # Newer python-fido2 builds accept a custom client-data collector, while the # VM-side package still expects an origin string plus verifier callback. if self.client_data_collector is not None: return Fido2Client(device, self.client_data_collector, ProxyUserInteraction()) return Fido2Client(device, self.origin, verify=verify_rp_id, user_interaction=ProxyUserInteraction()) def _user_entity(self, username: str, display_name: str | None, user_id: bytes) -> PublicKeyCredentialUserEntity: return PublicKeyCredentialUserEntity( id=user_id, name=username, display_name=display_name or username, ) def _register_metadata_only(self, username: str, display_name: str | None) -> Enrollment: canonical = normalize_username(username) pretty = normalize_display_name(display_name) now = int(self._now()) with self.lock: existing = self.enrollments.get(canonical) if existing: raise FileExistsError("user already enrolled") enrollment = Enrollment( username=canonical, display_name=pretty, created_at=now, updated_at=now, ) self.enrollments[canonical] = enrollment self._save_enrollments_locked() return enrollment def _register_direct_fido2(self, username: str, display_name: str | None) -> Enrollment: canonical = normalize_username(username) pretty = normalize_display_name(display_name) now = int(self._now()) with self.lock: existing = self.enrollments.get(canonical) if existing and existing.credential_data_b64: raise FileExistsError("user already enrolled") user_id = b64u_decode(existing.user_id_b64) if existing and existing.user_id_b64 else secrets.token_bytes(32) created_at = existing.created_at if existing else now options, state = self.fido_server.register_begin( self._user_entity(canonical, pretty, user_id), user_verification=UserVerificationRequirement.DISCOURAGED, ) try: auth_data = self.fido_server.register_complete( state, self._new_fido_client().make_credential(options.public_key), ) except Exception as exc: raise RuntimeError(f"card registration failed: {exc}") from exc credential_data = auth_data.credential_data if credential_data is None: raise RuntimeError("card registration returned no credential data") enrollment = Enrollment( username=canonical, display_name=pretty, created_at=created_at, updated_at=now, user_id_b64=b64u_encode(user_id), credential_data_b64=b64u_encode(bytes(credential_data)), ) with self.lock: self.enrollments[canonical] = enrollment self._save_enrollments_locked() return enrollment def register_enrollment(self, username: str, display_name: str | None) -> Enrollment: if self.uses_direct_fido2(): return self._register_direct_fido2(username, display_name) return self._register_metadata_only(username, display_name) def update_enrollment(self, username: str, display_name: str | None) -> Enrollment: canonical = normalize_username(username) pretty = normalize_display_name(display_name) now = int(self._now()) with self.lock: existing = self.enrollments.get(canonical) if not existing: raise KeyError("user not enrolled") existing.display_name = pretty existing.updated_at = now self._save_enrollments_locked() return existing def delete_enrollment(self, username: str) -> Enrollment: canonical = normalize_username(username) with self.lock: existing = self.enrollments.pop(canonical, None) if not existing: raise KeyError("user not enrolled") dead_tokens = [token for token, sess in self.sessions.items() if sess.username == canonical] for token in dead_tokens: del self.sessions[token] self._save_enrollments_locked() return existing def list_enrollments(self) -> list[Enrollment]: with self.lock: return [self.enrollments[key] for key in sorted(self.enrollments)] def get_enrollment(self, username: str) -> Enrollment | None: try: canonical = normalize_username(username) except ValueError: return None with self.lock: return self.enrollments.get(canonical) def has_enrollment(self, username: str) -> bool: return self.get_enrollment(username) is not None def _authenticate_with_probe(self) -> tuple[bool, str]: try: proc = subprocess.run( self.auth_command, shell=True, capture_output=True, text=True, timeout=10, check=False, ) except Exception as exc: return False, f"auth command failed: {exc}" if proc.returncode != 0: stderr = proc.stderr.strip() stdout = proc.stdout.strip() details = stderr if stderr else stdout return False, details or f"auth command exit code {proc.returncode}" return True, "card presence check succeeded" def _authenticate_with_direct_fido2(self, username: str) -> tuple[bool, str]: enrollment = self.get_enrollment(username) if not enrollment: return False, "user not enrolled" if not enrollment.credential_data_b64: return False, "user has no registered credential" try: credential = AttestedCredentialData(b64u_decode(enrollment.credential_data_b64)) # Keep UV explicitly discouraged here. On the current card/library stack, # asking for stronger UV flows immediately trips PIN/UV capability errors. options, state = self.fido_server.authenticate_begin( [credential], user_verification=UserVerificationRequirement.DISCOURAGED, ) selection = self._new_fido_client().get_assertion(options.public_key) assertion = selection.get_response(0) self.fido_server.authenticate_complete(state, [credential], assertion) except Exception as exc: return False, f"assertion verification failed: {exc}" return True, "assertion verified" def authenticate_with_card(self, username: str) -> tuple[bool, str]: if not self.uses_direct_fido2(): return self._authenticate_with_probe() return self._authenticate_with_direct_fido2(username) def fetch_counter(self) -> tuple[int, dict[str, Any]]: return self.upstream.request_json( path="/resource/counter", headers={"X-Proxy-Token": self.proxy_token}, payload={}, ) class UpstreamPool: def __init__(self, server_base_url: str, server_ca_file: str | None, max_connections: int = 4): parsed = urlparse(server_base_url) self.scheme = parsed.scheme self.host = parsed.hostname or "127.0.0.1" self.port = parsed.port or (443 if parsed.scheme == "https" else 80) self.base_path = parsed.path.rstrip("/") self.server_ca_file = server_ca_file self.timeout = 5 self.max_connections = max_connections self.idle: queue.LifoQueue[http.client.HTTPConnection] = queue.LifoQueue() self.capacity = threading.BoundedSemaphore(max_connections) def _new_connection(self) -> http.client.HTTPConnection: if self.scheme == "https": context = ssl.create_default_context(cafile=self.server_ca_file) return http.client.HTTPSConnection( self.host, self.port, timeout=self.timeout, context=context, ) return http.client.HTTPConnection(self.host, self.port, timeout=self.timeout) def _acquire(self) -> http.client.HTTPConnection: self.capacity.acquire() try: return self.idle.get_nowait() except queue.Empty: return self._new_connection() def _release(self, conn: http.client.HTTPConnection | None, reusable: bool) -> None: try: if conn is not None and reusable: self.idle.put(conn) elif conn is not None: conn.close() finally: self.capacity.release() def request_json(self, path: str, headers: dict[str, str], payload: dict[str, Any]) -> tuple[int, dict[str, Any]]: conn = self._acquire() reusable = False full_path = f"{self.base_path}{path}" if self.base_path else path try: body = json.dumps(payload).encode("utf-8") req_headers = {"Content-Type": "application/json", **headers} conn.request("POST", full_path, body=body, headers=req_headers) resp = conn.getresponse() raw = resp.read() reusable = not resp.will_close try: data = json.loads(raw.decode("utf-8")) if raw else {} except Exception: data = {"ok": False, "error": f"server http error {resp.status}"} return resp.status, data except (http.client.HTTPException, OSError, ssl.SSLError) as exc: return 502, {"ok": False, "error": f"server unavailable: {exc}"} except Exception as exc: return 502, {"ok": False, "error": f"server call failed: {exc}"} finally: self._release(conn, reusable) class Handler(BaseHTTPRequestHandler): state: ProxyState protocol_version = "HTTP/1.1" def _json(self, status: int, payload: dict[str, Any]) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _html(self, body: str) -> None: data = body.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length) if not raw: return {} return json.loads(raw.decode("utf-8")) def _discard_request_body(self) -> None: length = int(self.headers.get("Content-Length", "0")) if length > 0: self.rfile.read(length) def _bearer_token(self) -> str | None: value = self.headers.get("Authorization", "") if not value.startswith("Bearer "): return None token = value[7:].strip() return token or None def _require_session(self) -> tuple[str, Session] | None: token = self._bearer_token() if not token: self._json(401, {"ok": False, "error": "missing bearer token"}) return None session = self.state.get_session(token) if not session: self._json(401, {"ok": False, "error": "invalid or expired session"}) return None return token, session def do_GET(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/": self._html(HTML) return if path == "/health": self._json( 200, { "ok": True, "service": "k_proxy", "active_sessions": self.state.active_session_count(), "time": int(time.time()), }, ) return if path.startswith("/enroll/status"): self._enroll_status() return if path == "/enroll/list": self._enroll_list() return self.send_error(404) def do_POST(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/session/login": self._session_login() return if path == "/enroll/register": self._enroll_register() return if path == "/enroll/update": self._enroll_update() return if path == "/enroll/delete": self._enroll_delete() return if path == "/session/status": self._session_status() return if path == "/session/logout": self._session_logout() return if path == "/resource/counter": self._resource_counter() return self.send_error(404) def _session_login(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: username = normalize_username(str(data.get("username", ""))) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return if not self.state.has_enrollment(username): self._json(403, {"ok": False, "error": "user not enrolled", "username": username}) return ok, message = self.state.authenticate_with_card(username) if not ok: self._json(401, {"ok": False, "error": "card auth failed", "details": message}) return token, expires_at = self.state.create_session(username) self._json( 200, { "ok": True, "username": username, "session_token": token, "expires_at": int(expires_at), "ttl_seconds": self.state.session_ttl_s, "auth_mode": self.state.auth_mode_label(), }, ) def _enroll_register(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: enrollment = self.state.register_enrollment( str(data.get("username", "")), data.get("display_name"), ) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return except FileExistsError: self._json(409, {"ok": False, "error": "user already enrolled"}) return except RuntimeError as exc: self._json(401, {"ok": False, "error": str(exc)}) return self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at)) def _enroll_update(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: enrollment = self.state.update_enrollment( str(data.get("username", "")), data.get("display_name"), ) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return except KeyError: self._json(404, {"ok": False, "error": "user not enrolled"}) return self._json(200, enrollment_payload(enrollment)) def _enroll_delete(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: enrollment = self.state.delete_enrollment(str(data.get("username", ""))) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return except KeyError: self._json(404, {"ok": False, "error": "user not enrolled"}) return self._json(200, {"ok": True, "username": enrollment.username, "deleted": True}) def _enroll_status(self) -> None: parsed = urlparse(self.path) query = {} if parsed.query: for chunk in parsed.query.split("&"): if "=" not in chunk: continue key, value = chunk.split("=", 1) query[key] = value username = query.get("username", "").strip() if not username: self._json(400, {"ok": False, "error": "username query required"}) return enrollment = self.state.get_enrollment(username) if not enrollment: self._json(404, {"ok": False, "error": "user not enrolled", "username": username}) return self._json(200, enrollment_payload(enrollment)) def _enroll_list(self) -> None: users = [enrollment_payload(item) for item in self.state.list_enrollments()] self._json(200, {"ok": True, "users": users}) def _session_status(self) -> None: self._discard_request_body() got = self._require_session() if not got: return _, session = got self._json( 200, { "ok": True, "username": session.username, "expires_at": int(session.expires_at), "seconds_remaining": max(0, int(session.expires_at - time.time())), }, ) def _session_logout(self) -> None: self._discard_request_body() token = self._bearer_token() if not token: self._json(401, {"ok": False, "error": "missing bearer token"}) return removed = self.state.invalidate_session(token) self._json(200, {"ok": True, "invalidated": removed}) def _resource_counter(self) -> None: self._discard_request_body() got = self._require_session() if not got: return _, session = got status, upstream = self.state.fetch_counter() if status != 200: self._json(status, {"ok": False, "error": "upstream failed", "upstream": upstream}) return self._json( 200, { "ok": True, "username": session.username, "session_reused": True, "upstream": upstream, }, ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run k_proxy session gateway") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8770) parser.add_argument("--tls-certfile", help="PEM certificate chain for HTTPS listener") parser.add_argument("--tls-keyfile", help="PEM private key for HTTPS listener") parser.add_argument("--session-ttl", type=int, default=300, help="Session TTL in seconds") parser.add_argument( "--auth-mode", choices=(AUTH_MODE_PROBE, AUTH_MODE_FIDO2_DIRECT), default=AUTH_MODE_PROBE, help="Session auth mode: legacy card-presence probe or experimental direct FIDO2 registration/assertion", ) parser.add_argument( "--auth-command", default="python3 /home/user/chromecard/fido2_probe.py --json", help="Command used for legacy probe auth mode", ) parser.add_argument( "--rp-id", default="localhost", help="Relying party ID used for direct card-backed registration and assertion verification", ) parser.add_argument( "--rp-name", default="ChromeCard Proxy", help="Relying party name used for direct card-backed registration", ) parser.add_argument( "--origin", default="https://localhost", help="Synthetic origin used by the local FIDO2 client when talking directly to the attached card", ) parser.add_argument( "--server-base-url", default="http://127.0.0.1:8780", help="Base URL for k_server", ) parser.add_argument( "--server-ca-file", help="CA certificate used to verify HTTPS certificate presented by k_server", ) parser.add_argument( "--server-max-connections", type=int, default=1, help="Maximum concurrent pooled upstream connections from k_proxy to k_server", ) parser.add_argument( "--proxy-token", default="dev-proxy-token", help="Shared token to authorize requests to k_server", ) parser.add_argument( "--enrollment-db", default="/home/user/chromecard/k_proxy_enrollments.json", help="JSON file used to persist enrolled usernames for the prototype", ) return parser.parse_args() def main() -> int: args = parse_args() if bool(args.tls_certfile) != bool(args.tls_keyfile): raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS") if args.server_base_url.startswith("https://") and not args.server_ca_file: raise SystemExit("--server-ca-file is required when --server-base-url uses https") state = ProxyState( session_ttl_s=args.session_ttl, auth_mode=args.auth_mode, auth_command=args.auth_command, server_base_url=args.server_base_url, server_ca_file=args.server_ca_file, server_max_connections=args.server_max_connections, proxy_token=args.proxy_token, enrollment_db=Path(args.enrollment_db), rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin, ) Handler.state = state server = ThreadingHTTPServer((args.host, args.port), Handler) scheme = "http" if args.tls_certfile: context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(certfile=args.tls_certfile, keyfile=args.tls_keyfile) server.socket = context.wrap_socket(server.socket, server_side=True) scheme = "https" print(f"k_proxy listening on {scheme}://{args.host}:{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())