#!/usr/bin/env python3 """ Minimal k_proxy service for Phase 5 bring-up. Behavior: - Creates short-lived sessions after a card-backed auth gate. - Reuses valid sessions to access k_server protected counter endpoint. - Supports enrollment, session status, and logout. Notes: - Default runtime still uses the legacy card-presence probe gate. - Experimental direct FIDO2 registration/assertion lives behind `--auth-mode fido2-direct`. - This is still a prototype and not a final production auth design. """ from __future__ import annotations import argparse import base64 import http.client import json import queue import re import secrets import ssl import subprocess import threading import time from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen import fido2.features from fido2.client import Fido2Client, UserInteraction, verify_rp_id from fido2.ctap2 import Ctap2 from fido2.hid import CtapHidDevice from fido2.hid.linux import get_descriptor, open_connection from fido2.server import Fido2Server from fido2.webauthn import ( AttestedCredentialData, AttestationObject, AuthenticatorAssertionResponse, AuthenticatorAttestationResponse, AuthenticationResponse, CollectedClientData, PublicKeyCredentialCreationOptions, PublicKeyCredentialRequestOptions, PublicKeyCredentialRpEntity, PublicKeyCredentialUserEntity, RegistrationResponse, UserVerificationRequirement, ) if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None: fido2.features.webauthn_json_mapping.enabled = True HTML = """ ChromeCard Proxy Portal

ChromeCard Proxy Portal

Primary browser entry point for the current prototype. Browser traffic now targets k_proxy directly. Enrollment, card-backed login, session reuse, counter access, and logout all happen on this TLS endpoint.

Enrollment

Stored username: none
Session active: no

Session Flow


  
""" @dataclass class Session: username: str expires_at: float @dataclass class Enrollment: username: str display_name: str | None created_at: int updated_at: int user_id_b64: str | None = None credential_data_b64: str | None = None USERNAME_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9._-]{1,30}[a-z0-9])?$") AUTH_MODE_PROBE = "probe" AUTH_MODE_FIDO2_DIRECT = "fido2-direct" def normalize_username(raw: str) -> str: username = raw.strip().lower() if not USERNAME_PATTERN.fullmatch(username): raise ValueError( "username must be 3-32 chars of lowercase letters, digits, dot, underscore, or dash" ) return username def normalize_display_name(raw: str | None) -> str | None: value = (raw or "").strip() if not value: return None if len(value) > 64: raise ValueError("display_name must be 64 characters or fewer") return value def b64u_encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def b64u_decode(data: str) -> bytes: pad = "=" * ((4 - len(data) % 4) % 4) return base64.urlsafe_b64decode((data + pad).encode("ascii")) def direct_ctap_key_params() -> list[dict[str, Any]]: # Match the raw probe's narrower algorithm set. The broader default list from # Fido2Server.register_begin was still hitting post-confirmation I/O errors. return [ {"type": "public-key", "alg": -7}, {"type": "public-key", "alg": -257}, ] def direct_ctap_rp(rp: PublicKeyCredentialRpEntity) -> dict[str, Any]: return {"id": rp.id, "name": rp.name} def direct_ctap_user(user: PublicKeyCredentialUserEntity) -> dict[str, Any]: user_id = user.id if isinstance(user_id, bytes): # Match the raw probe's ASCII user-id shape rather than sending opaque # binary bytes into the card path. user_id = user_id.hex().encode("ascii") return { "id": user_id, "name": user.name, "displayName": user.display_name or user.name, } def direct_ctap_allow_list( creds: list[Any] | None, ) -> list[dict[str, Any]] | None: if not creds: return None out: list[dict[str, Any]] = [] for cred in creds: cred_id = getattr(cred, "id", None) if cred_id is None and isinstance(cred, dict): cred_id = cred.get("id") out.append({"type": "public-key", "id": cred_id}) return out def enrollment_payload(enrollment: "Enrollment", *, created: bool | None = None) -> dict[str, Any]: payload: dict[str, Any] = { "ok": True, "username": enrollment.username, "display_name": enrollment.display_name, "created_at": enrollment.created_at, "updated_at": enrollment.updated_at, "has_credential": bool(enrollment.credential_data_b64), } if created is not None: payload["created"] = created return payload class ProxyUserInteraction(UserInteraction): def prompt_up(self) -> None: print("Touch the ChromeCard to continue...", flush=True) super().prompt_up() def request_pin(self, permissions, rp_id: str | None) -> str | None: print("Authenticator PIN is required but not supported by this prototype.", flush=True) return super().request_pin(permissions, rp_id) class ProxyState: def __init__( self, session_ttl_s: int, auth_mode: str, auth_command: str, server_base_url: str, server_ca_file: str | None, server_max_connections: int, proxy_token: str, enrollment_db: Path, rp_id: str, rp_name: str, origin: str, direct_device_path: str, ): self.session_ttl_s = session_ttl_s self.auth_mode = auth_mode self.auth_command = auth_command self.server_base_url = server_base_url.rstrip("/") self.server_ca_file = server_ca_file self.proxy_token = proxy_token self.enrollment_db = enrollment_db self.rp_id = rp_id self.origin = origin self.direct_device_path = direct_device_path self.lock = threading.Lock() self.direct_device_lock = threading.RLock() self.direct_device: CtapHidDevice | None = None self.sessions: dict[str, Session] = {} self.enrollments: dict[str, Enrollment] = {} self.rp = PublicKeyCredentialRpEntity(id=rp_id, name=rp_name) self.fido_server = Fido2Server(self.rp) self.client_data_collector = None self.upstream = UpstreamPool( server_base_url=self.server_base_url, server_ca_file=self.server_ca_file, max_connections=server_max_connections, ) self._load_enrollments() def uses_direct_fido2(self) -> bool: return self.auth_mode == AUTH_MODE_FIDO2_DIRECT def auth_mode_label(self) -> str: return "fido2_assertion" if self.uses_direct_fido2() else "card_presence_probe" def _now(self) -> float: return time.time() def _gc_locked(self) -> None: now = self._now() dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now] for token in dead: del self.sessions[token] def create_session(self, username: str) -> tuple[str, float]: token = secrets.token_urlsafe(32) now = self._now() expires_at = now + self.session_ttl_s with self.lock: self._gc_locked() self.sessions[token] = Session(username=username, expires_at=expires_at) return token, expires_at def get_session(self, token: str) -> Session | None: with self.lock: self._gc_locked() return self.sessions.get(token) def invalidate_session(self, token: str) -> bool: with self.lock: return self.sessions.pop(token, None) is not None def active_session_count(self) -> int: with self.lock: self._gc_locked() return len(self.sessions) def _load_enrollments(self) -> None: if not self.enrollment_db.exists(): return try: payload = json.loads(self.enrollment_db.read_text()) users = payload.get("users", []) for item in users: username = str(item.get("username", "")).strip() if not username: continue created_at = int(item.get("created_at", item.get("enrolled_at", int(self._now())))) updated_at = int(item.get("updated_at", created_at)) self.enrollments[username] = Enrollment( username=username, display_name=normalize_display_name(item.get("display_name")), created_at=created_at, updated_at=updated_at, user_id_b64=item.get("user_id_b64"), credential_data_b64=item.get("credential_data_b64"), ) except Exception: self.enrollments = {} def _save_enrollments_locked(self) -> None: self.enrollment_db.parent.mkdir(parents=True, exist_ok=True) users = [ { "username": enrollment.username, "display_name": enrollment.display_name, "created_at": enrollment.created_at, "updated_at": enrollment.updated_at, "user_id_b64": enrollment.user_id_b64, "credential_data_b64": enrollment.credential_data_b64, } for enrollment in sorted(self.enrollments.values(), key=lambda item: item.username) ] self.enrollment_db.write_text(json.dumps({"users": users}, indent=2) + "\n") def _new_fido_client(self) -> Fido2Client: device = self._get_direct_device() # Newer python-fido2 builds accept a custom client-data collector, while the # VM-side package still expects an origin string plus verifier callback. if self.client_data_collector is not None: return Fido2Client(device, self.client_data_collector, ProxyUserInteraction()) return Fido2Client(device, self.origin, verify=verify_rp_id, user_interaction=ProxyUserInteraction()) def _open_direct_device(self) -> CtapHidDevice: descriptor = get_descriptor(self.direct_device_path) return CtapHidDevice(descriptor, open_connection(descriptor)) def _get_direct_device(self, *, force_reopen: bool = False) -> CtapHidDevice: with self.direct_device_lock: if force_reopen and self.direct_device is not None: try: self.direct_device.close() except Exception: pass self.direct_device = None if self.direct_device is None: self.direct_device = self._open_direct_device() return self.direct_device def _with_direct_ctap2(self, action): with self.direct_device_lock: last_exc: Exception | None = None for reopen in (False, True): try: device = self._get_direct_device(force_reopen=reopen) return action(Ctap2(device)) except Exception as exc: last_exc = exc try: if self.direct_device is not None: self.direct_device.close() except Exception: pass self.direct_device = None assert last_exc is not None raise last_exc def _collect_client_data( self, request_type: str, options: PublicKeyCredentialCreationOptions | PublicKeyCredentialRequestOptions, ) -> CollectedClientData: requested_rp_id = options.rp.id if isinstance(options, PublicKeyCredentialCreationOptions) else options.rp_id if requested_rp_id != self.rp_id: raise RuntimeError(f"rp_id mismatch: expected {self.rp_id}, got {requested_rp_id}") return CollectedClientData.create( type=request_type, challenge=options.challenge, origin=self.origin, ) def _user_entity(self, username: str, display_name: str | None, user_id: bytes) -> PublicKeyCredentialUserEntity: return PublicKeyCredentialUserEntity( id=user_id, name=username, display_name=display_name or username, ) def _register_metadata_only(self, username: str, display_name: str | None) -> Enrollment: canonical = normalize_username(username) pretty = normalize_display_name(display_name) now = int(self._now()) with self.lock: existing = self.enrollments.get(canonical) if existing: raise FileExistsError("user already enrolled") enrollment = Enrollment( username=canonical, display_name=pretty, created_at=now, updated_at=now, ) self.enrollments[canonical] = enrollment self._save_enrollments_locked() return enrollment def _register_direct_fido2(self, username: str, display_name: str | None) -> Enrollment: canonical = normalize_username(username) pretty = normalize_display_name(display_name) now = int(self._now()) with self.lock: existing = self.enrollments.get(canonical) if existing and existing.credential_data_b64: raise FileExistsError("user already enrolled") user_id = b64u_decode(existing.user_id_b64) if existing and existing.user_id_b64 else secrets.token_bytes(32) created_at = existing.created_at if existing else now options, state = self.fido_server.register_begin( self._user_entity(canonical, pretty, user_id), user_verification=UserVerificationRequirement.DISCOURAGED, ) try: client_data = self._collect_client_data("webauthn.create", options.public_key) attestation = self._with_direct_ctap2( lambda ctap2: ctap2.make_credential( client_data_hash=client_data.hash, rp=direct_ctap_rp(options.public_key.rp), user=direct_ctap_user(options.public_key.user), key_params=direct_ctap_key_params(), exclude_list=direct_ctap_allow_list(options.public_key.exclude_credentials), options={"rk": False, "uv": False}, ) ) auth_data = self.fido_server.register_complete( state, RegistrationResponse( id=attestation.auth_data.credential_data.credential_id, response=AuthenticatorAttestationResponse( client_data=client_data, attestation_object=AttestationObject.create( attestation.fmt, attestation.auth_data, attestation.att_stmt, ), ), ), ) except Exception as exc: raise RuntimeError(f"card registration failed: {exc}") from exc credential_data = auth_data.credential_data if credential_data is None: raise RuntimeError("card registration returned no credential data") enrollment = Enrollment( username=canonical, display_name=pretty, created_at=created_at, updated_at=now, user_id_b64=b64u_encode(user_id), credential_data_b64=b64u_encode(bytes(credential_data)), ) with self.lock: self.enrollments[canonical] = enrollment self._save_enrollments_locked() return enrollment def register_enrollment(self, username: str, display_name: str | None) -> Enrollment: if self.uses_direct_fido2(): return self._register_direct_fido2(username, display_name) return self._register_metadata_only(username, display_name) def update_enrollment(self, username: str, display_name: str | None) -> Enrollment: canonical = normalize_username(username) pretty = normalize_display_name(display_name) now = int(self._now()) with self.lock: existing = self.enrollments.get(canonical) if not existing: raise KeyError("user not enrolled") existing.display_name = pretty existing.updated_at = now self._save_enrollments_locked() return existing def delete_enrollment(self, username: str) -> Enrollment: canonical = normalize_username(username) with self.lock: existing = self.enrollments.pop(canonical, None) if not existing: raise KeyError("user not enrolled") dead_tokens = [token for token, sess in self.sessions.items() if sess.username == canonical] for token in dead_tokens: del self.sessions[token] self._save_enrollments_locked() return existing def list_enrollments(self) -> list[Enrollment]: with self.lock: return [self.enrollments[key] for key in sorted(self.enrollments)] def get_enrollment(self, username: str) -> Enrollment | None: try: canonical = normalize_username(username) except ValueError: return None with self.lock: return self.enrollments.get(canonical) def has_enrollment(self, username: str) -> bool: return self.get_enrollment(username) is not None def _authenticate_with_probe(self) -> tuple[bool, str]: try: proc = subprocess.run( self.auth_command, shell=True, capture_output=True, text=True, timeout=10, check=False, ) except Exception as exc: return False, f"auth command failed: {exc}" if proc.returncode != 0: stderr = proc.stderr.strip() stdout = proc.stdout.strip() details = stderr if stderr else stdout return False, details or f"auth command exit code {proc.returncode}" return True, "card presence check succeeded" def _authenticate_with_direct_fido2(self, username: str) -> tuple[bool, str]: enrollment = self.get_enrollment(username) if not enrollment: return False, "user not enrolled" if not enrollment.credential_data_b64: return False, "user has no registered credential" try: credential = AttestedCredentialData(b64u_decode(enrollment.credential_data_b64)) # Keep UV explicitly discouraged here. On the current card/library stack, # asking for stronger UV flows immediately trips PIN/UV capability errors. options, state = self.fido_server.authenticate_begin( [credential], user_verification=UserVerificationRequirement.DISCOURAGED, ) client_data = self._collect_client_data("webauthn.get", options.public_key) assertion = self._with_direct_ctap2( lambda ctap2: ctap2.get_assertion( rp_id=options.public_key.rp_id, client_data_hash=client_data.hash, allow_list=direct_ctap_allow_list(options.public_key.allow_credentials), options={"up": True, "uv": False}, ) ) response = assertion.assertions[0] if getattr(assertion, "assertions", None) else assertion self.fido_server.authenticate_complete( state, [credential], AuthenticationResponse( id=response.credential["id"], response=AuthenticatorAssertionResponse( client_data=client_data, authenticator_data=response.auth_data, signature=response.signature, user_handle=response.user.get("id") if response.user else None, ), ), ) except Exception as exc: return False, f"assertion verification failed: {exc}" return True, "assertion verified" def authenticate_with_card(self, username: str) -> tuple[bool, str]: if not self.uses_direct_fido2(): return self._authenticate_with_probe() return self._authenticate_with_direct_fido2(username) def fetch_counter(self) -> tuple[int, dict[str, Any]]: return self.upstream.request_json( path="/resource/counter", headers={"X-Proxy-Token": self.proxy_token}, payload={}, ) class UpstreamPool: def __init__(self, server_base_url: str, server_ca_file: str | None, max_connections: int = 4): parsed = urlparse(server_base_url) self.scheme = parsed.scheme self.host = parsed.hostname or "127.0.0.1" self.port = parsed.port or (443 if parsed.scheme == "https" else 80) self.base_path = parsed.path.rstrip("/") self.server_ca_file = server_ca_file self.timeout = 5 self.max_connections = max_connections self.idle: queue.LifoQueue[http.client.HTTPConnection] = queue.LifoQueue() self.capacity = threading.BoundedSemaphore(max_connections) def _new_connection(self) -> http.client.HTTPConnection: if self.scheme == "https": context = ssl.create_default_context(cafile=self.server_ca_file) return http.client.HTTPSConnection( self.host, self.port, timeout=self.timeout, context=context, ) return http.client.HTTPConnection(self.host, self.port, timeout=self.timeout) def _acquire(self) -> http.client.HTTPConnection: self.capacity.acquire() try: return self.idle.get_nowait() except queue.Empty: return self._new_connection() def _release(self, conn: http.client.HTTPConnection | None, reusable: bool) -> None: try: if conn is not None and reusable: self.idle.put(conn) elif conn is not None: conn.close() finally: self.capacity.release() def request_json(self, path: str, headers: dict[str, str], payload: dict[str, Any]) -> tuple[int, dict[str, Any]]: conn = self._acquire() reusable = False full_path = f"{self.base_path}{path}" if self.base_path else path try: body = json.dumps(payload).encode("utf-8") req_headers = {"Content-Type": "application/json", **headers} conn.request("POST", full_path, body=body, headers=req_headers) resp = conn.getresponse() raw = resp.read() reusable = not resp.will_close try: data = json.loads(raw.decode("utf-8")) if raw else {} except Exception: data = {"ok": False, "error": f"server http error {resp.status}"} return resp.status, data except (http.client.HTTPException, OSError, ssl.SSLError) as exc: return 502, {"ok": False, "error": f"server unavailable: {exc}"} except Exception as exc: return 502, {"ok": False, "error": f"server call failed: {exc}"} finally: self._release(conn, reusable) class Handler(BaseHTTPRequestHandler): state: ProxyState protocol_version = "HTTP/1.1" def _json(self, status: int, payload: dict[str, Any]) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _html(self, body: str) -> None: data = body.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length) if not raw: return {} return json.loads(raw.decode("utf-8")) def _discard_request_body(self) -> None: length = int(self.headers.get("Content-Length", "0")) if length > 0: self.rfile.read(length) def _bearer_token(self) -> str | None: value = self.headers.get("Authorization", "") if not value.startswith("Bearer "): return None token = value[7:].strip() return token or None def _require_session(self) -> tuple[str, Session] | None: token = self._bearer_token() if not token: self._json(401, {"ok": False, "error": "missing bearer token"}) return None session = self.state.get_session(token) if not session: self._json(401, {"ok": False, "error": "invalid or expired session"}) return None return token, session def do_GET(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/": self._html(HTML) return if path == "/health": self._json( 200, { "ok": True, "service": "k_proxy", "active_sessions": self.state.active_session_count(), "time": int(time.time()), }, ) return if path.startswith("/enroll/status"): self._enroll_status() return if path == "/enroll/list": self._enroll_list() return self.send_error(404) def do_POST(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/session/login": self._session_login() return if path == "/enroll/register": self._enroll_register() return if path == "/enroll/update": self._enroll_update() return if path == "/enroll/delete": self._enroll_delete() return if path == "/session/status": self._session_status() return if path == "/session/logout": self._session_logout() return if path == "/resource/counter": self._resource_counter() return self.send_error(404) def _session_login(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: username = normalize_username(str(data.get("username", ""))) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return if not self.state.has_enrollment(username): self._json(403, {"ok": False, "error": "user not enrolled", "username": username}) return ok, message = self.state.authenticate_with_card(username) if not ok: self._json(401, {"ok": False, "error": "card auth failed", "details": message}) return token, expires_at = self.state.create_session(username) self._json( 200, { "ok": True, "username": username, "session_token": token, "expires_at": int(expires_at), "ttl_seconds": self.state.session_ttl_s, "auth_mode": self.state.auth_mode_label(), }, ) def _enroll_register(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: enrollment = self.state.register_enrollment( str(data.get("username", "")), data.get("display_name"), ) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return except FileExistsError: self._json(409, {"ok": False, "error": "user already enrolled"}) return except RuntimeError as exc: self._json(401, {"ok": False, "error": str(exc)}) return self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at)) def _enroll_update(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: enrollment = self.state.update_enrollment( str(data.get("username", "")), data.get("display_name"), ) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return except KeyError: self._json(404, {"ok": False, "error": "user not enrolled"}) return self._json(200, enrollment_payload(enrollment)) def _enroll_delete(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return try: enrollment = self.state.delete_enrollment(str(data.get("username", ""))) except ValueError as exc: self._json(400, {"ok": False, "error": str(exc)}) return except KeyError: self._json(404, {"ok": False, "error": "user not enrolled"}) return self._json(200, {"ok": True, "username": enrollment.username, "deleted": True}) def _enroll_status(self) -> None: parsed = urlparse(self.path) query = {} if parsed.query: for chunk in parsed.query.split("&"): if "=" not in chunk: continue key, value = chunk.split("=", 1) query[key] = value username = query.get("username", "").strip() if not username: self._json(400, {"ok": False, "error": "username query required"}) return enrollment = self.state.get_enrollment(username) if not enrollment: self._json(404, {"ok": False, "error": "user not enrolled", "username": username}) return self._json(200, enrollment_payload(enrollment)) def _enroll_list(self) -> None: users = [enrollment_payload(item) for item in self.state.list_enrollments()] self._json(200, {"ok": True, "users": users}) def _session_status(self) -> None: self._discard_request_body() got = self._require_session() if not got: return _, session = got self._json( 200, { "ok": True, "username": session.username, "expires_at": int(session.expires_at), "seconds_remaining": max(0, int(session.expires_at - time.time())), }, ) def _session_logout(self) -> None: self._discard_request_body() token = self._bearer_token() if not token: self._json(401, {"ok": False, "error": "missing bearer token"}) return removed = self.state.invalidate_session(token) self._json(200, {"ok": True, "invalidated": removed}) def _resource_counter(self) -> None: self._discard_request_body() got = self._require_session() if not got: return _, session = got status, upstream = self.state.fetch_counter() if status != 200: self._json(status, {"ok": False, "error": "upstream failed", "upstream": upstream}) return self._json( 200, { "ok": True, "username": session.username, "session_reused": True, "upstream": upstream, }, ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run k_proxy session gateway") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8770) parser.add_argument("--tls-certfile", help="PEM certificate chain for HTTPS listener") parser.add_argument("--tls-keyfile", help="PEM private key for HTTPS listener") parser.add_argument("--session-ttl", type=int, default=300, help="Session TTL in seconds") parser.add_argument( "--auth-mode", choices=(AUTH_MODE_PROBE, AUTH_MODE_FIDO2_DIRECT), default=AUTH_MODE_PROBE, help="Session auth mode: legacy card-presence probe or experimental direct FIDO2 registration/assertion", ) parser.add_argument( "--auth-command", default="python3 /home/user/chromecard/fido2_probe.py --json", help="Command used for legacy probe auth mode", ) parser.add_argument( "--rp-id", default="localhost", help="Relying party ID used for direct card-backed registration and assertion verification", ) parser.add_argument( "--rp-name", default="ChromeCard Proxy", help="Relying party name used for direct card-backed registration", ) parser.add_argument( "--origin", default="https://localhost", help="Synthetic origin used by the local FIDO2 client when talking directly to the attached card", ) parser.add_argument( "--server-base-url", default="http://127.0.0.1:8780", help="Base URL for k_server", ) parser.add_argument( "--server-ca-file", help="CA certificate used to verify HTTPS certificate presented by k_server", ) parser.add_argument( "--server-max-connections", type=int, default=1, help="Maximum concurrent pooled upstream connections from k_proxy to k_server", ) parser.add_argument( "--proxy-token", default="dev-proxy-token", help="Shared token to authorize requests to k_server", ) parser.add_argument( "--enrollment-db", default="/home/user/chromecard/k_proxy_enrollments.json", help="JSON file used to persist enrolled usernames for the prototype", ) parser.add_argument( "--direct-device-path", default="/dev/hidraw0", help="Explicit hidraw path used for direct FIDO2 mode", ) return parser.parse_args() def main() -> int: args = parse_args() if bool(args.tls_certfile) != bool(args.tls_keyfile): raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS") if args.server_base_url.startswith("https://") and not args.server_ca_file: raise SystemExit("--server-ca-file is required when --server-base-url uses https") state = ProxyState( session_ttl_s=args.session_ttl, auth_mode=args.auth_mode, auth_command=args.auth_command, server_base_url=args.server_base_url, server_ca_file=args.server_ca_file, server_max_connections=args.server_max_connections, proxy_token=args.proxy_token, enrollment_db=Path(args.enrollment_db), rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin, direct_device_path=args.direct_device_path, ) Handler.state = state server = ThreadingHTTPServer((args.host, args.port), Handler) scheme = "http" if args.tls_certfile: context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(certfile=args.tls_certfile, keyfile=args.tls_keyfile) server.socket = context.wrap_socket(server.socket, server_side=True) scheme = "https" print(f"k_proxy listening on {scheme}://{args.host}:{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())