#!/usr/bin/env python3
"""
Minimal k_proxy service for Phase 5 bring-up.
Behavior:
- Creates short-lived sessions after a card-backed auth gate.
- Reuses valid sessions to access k_server protected counter endpoint.
- Supports enrollment, session status, and logout.
Notes:
- Default runtime still uses the legacy card-presence probe gate.
- Experimental direct FIDO2 registration/assertion lives behind `--auth-mode fido2-direct`.
- This is still a prototype and not a final production auth design.
"""
from __future__ import annotations
import argparse
import base64
import http.client
import json
import queue
import re
import secrets
import ssl
import subprocess
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
import fido2.features
from fido2.client import Fido2Client, UserInteraction, verify_rp_id
from fido2.ctap2 import Ctap2
from fido2.hid import CtapHidDevice
from fido2.hid.linux import get_descriptor, open_connection
from fido2.server import Fido2Server
from fido2.webauthn import (
AttestedCredentialData,
AttestationObject,
AuthenticatorAssertionResponse,
AuthenticatorAttestationResponse,
AuthenticationResponse,
CollectedClientData,
PublicKeyCredentialCreationOptions,
PublicKeyCredentialRequestOptions,
PublicKeyCredentialRpEntity,
PublicKeyCredentialUserEntity,
RegistrationResponse,
UserVerificationRequirement,
)
if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None:
fido2.features.webauthn_json_mapping.enabled = True
HTML = """
ChromeCard Proxy Portal
ChromeCard Proxy Portal
Primary browser entry point for the current prototype. Browser traffic now targets k_proxy directly.
Enrollment, card-backed login, session reuse, counter access, and logout all happen on this TLS endpoint.
Enrollment
Stored username: none
Session active: no
Session Flow
"""
@dataclass
class Session:
username: str
expires_at: float
@dataclass
class Enrollment:
username: str
display_name: str | None
created_at: int
updated_at: int
user_id_b64: str | None = None
credential_data_b64: str | None = None
USERNAME_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9._-]{1,30}[a-z0-9])?$")
AUTH_MODE_PROBE = "probe"
AUTH_MODE_FIDO2_DIRECT = "fido2-direct"
def normalize_username(raw: str) -> str:
username = raw.strip().lower()
if not USERNAME_PATTERN.fullmatch(username):
raise ValueError(
"username must be 3-32 chars of lowercase letters, digits, dot, underscore, or dash"
)
return username
def normalize_display_name(raw: str | None) -> str | None:
value = (raw or "").strip()
if not value:
return None
if len(value) > 64:
raise ValueError("display_name must be 64 characters or fewer")
return value
def b64u_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def b64u_decode(data: str) -> bytes:
pad = "=" * ((4 - len(data) % 4) % 4)
return base64.urlsafe_b64decode((data + pad).encode("ascii"))
def direct_ctap_key_params() -> list[dict[str, Any]]:
# Match the raw probe's narrower algorithm set. The broader default list from
# Fido2Server.register_begin was still hitting post-confirmation I/O errors.
return [
{"type": "public-key", "alg": -7},
{"type": "public-key", "alg": -257},
]
def direct_ctap_rp(rp: PublicKeyCredentialRpEntity) -> dict[str, Any]:
return {"id": rp.id, "name": rp.name}
def direct_ctap_user(user: PublicKeyCredentialUserEntity) -> dict[str, Any]:
user_id = user.id
if isinstance(user_id, bytes):
# Match the raw probe's ASCII user-id shape rather than sending opaque
# binary bytes into the card path.
user_id = user_id.hex().encode("ascii")
return {
"id": user_id,
"name": user.name,
"displayName": user.display_name or user.name,
}
def direct_ctap_allow_list(
creds: list[Any] | None,
) -> list[dict[str, Any]] | None:
if not creds:
return None
out: list[dict[str, Any]] = []
for cred in creds:
cred_id = getattr(cred, "id", None)
if cred_id is None and isinstance(cred, dict):
cred_id = cred.get("id")
out.append({"type": "public-key", "id": cred_id})
return out
def enrollment_payload(enrollment: "Enrollment", *, created: bool | None = None) -> dict[str, Any]:
payload: dict[str, Any] = {
"ok": True,
"username": enrollment.username,
"display_name": enrollment.display_name,
"created_at": enrollment.created_at,
"updated_at": enrollment.updated_at,
"has_credential": bool(enrollment.credential_data_b64),
}
if created is not None:
payload["created"] = created
return payload
class ProxyUserInteraction(UserInteraction):
def prompt_up(self) -> None:
print("Touch the ChromeCard to continue...", flush=True)
super().prompt_up()
def request_pin(self, permissions, rp_id: str | None) -> str | None:
print("Authenticator PIN is required but not supported by this prototype.", flush=True)
return super().request_pin(permissions, rp_id)
class ProxyState:
def __init__(
self,
session_ttl_s: int,
auth_mode: str,
auth_command: str,
server_base_url: str,
server_ca_file: str | None,
server_max_connections: int,
proxy_token: str,
enrollment_db: Path,
rp_id: str,
rp_name: str,
origin: str,
direct_device_path: str,
):
self.session_ttl_s = session_ttl_s
self.auth_mode = auth_mode
self.auth_command = auth_command
self.server_base_url = server_base_url.rstrip("/")
self.server_ca_file = server_ca_file
self.proxy_token = proxy_token
self.enrollment_db = enrollment_db
self.rp_id = rp_id
self.origin = origin
self.direct_device_path = direct_device_path
self.direct_device_configured_path = direct_device_path
self.direct_device_active_path: str | None = None
self.lock = threading.Lock()
self.direct_device_lock = threading.RLock()
self.direct_device: CtapHidDevice | None = None
self.sessions: dict[str, Session] = {}
self.enrollments: dict[str, Enrollment] = {}
self.rp = PublicKeyCredentialRpEntity(id=rp_id, name=rp_name)
self.fido_server = Fido2Server(self.rp)
self.client_data_collector = None
self.upstream = UpstreamPool(
server_base_url=self.server_base_url,
server_ca_file=self.server_ca_file,
max_connections=server_max_connections,
)
self._load_enrollments()
def uses_direct_fido2(self) -> bool:
return self.auth_mode == AUTH_MODE_FIDO2_DIRECT
def auth_mode_label(self) -> str:
return "fido2_assertion" if self.uses_direct_fido2() else "card_presence_probe"
def _now(self) -> float:
return time.time()
def _gc_locked(self) -> None:
now = self._now()
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
for token in dead:
del self.sessions[token]
def create_session(self, username: str) -> tuple[str, float]:
token = secrets.token_urlsafe(32)
now = self._now()
expires_at = now + self.session_ttl_s
with self.lock:
self._gc_locked()
self.sessions[token] = Session(username=username, expires_at=expires_at)
return token, expires_at
def get_session(self, token: str) -> Session | None:
with self.lock:
self._gc_locked()
return self.sessions.get(token)
def invalidate_session(self, token: str) -> bool:
with self.lock:
return self.sessions.pop(token, None) is not None
def active_session_count(self) -> int:
with self.lock:
self._gc_locked()
return len(self.sessions)
def _load_enrollments(self) -> None:
if not self.enrollment_db.exists():
return
try:
payload = json.loads(self.enrollment_db.read_text())
users = payload.get("users", [])
for item in users:
username = str(item.get("username", "")).strip()
if not username:
continue
created_at = int(item.get("created_at", item.get("enrolled_at", int(self._now()))))
updated_at = int(item.get("updated_at", created_at))
self.enrollments[username] = Enrollment(
username=username,
display_name=normalize_display_name(item.get("display_name")),
created_at=created_at,
updated_at=updated_at,
user_id_b64=item.get("user_id_b64"),
credential_data_b64=item.get("credential_data_b64"),
)
except Exception:
self.enrollments = {}
def _save_enrollments_locked(self) -> None:
self.enrollment_db.parent.mkdir(parents=True, exist_ok=True)
users = [
{
"username": enrollment.username,
"display_name": enrollment.display_name,
"created_at": enrollment.created_at,
"updated_at": enrollment.updated_at,
"user_id_b64": enrollment.user_id_b64,
"credential_data_b64": enrollment.credential_data_b64,
}
for enrollment in sorted(self.enrollments.values(), key=lambda item: item.username)
]
self.enrollment_db.write_text(json.dumps({"users": users}, indent=2) + "\n")
def _new_fido_client(self) -> Fido2Client:
device = self._get_direct_device()
# Newer python-fido2 builds accept a custom client-data collector, while the
# VM-side package still expects an origin string plus verifier callback.
if self.client_data_collector is not None:
return Fido2Client(device, self.client_data_collector, ProxyUserInteraction())
return Fido2Client(device, self.origin, verify=verify_rp_id, user_interaction=ProxyUserInteraction())
def _direct_device_candidates(self) -> list[str]:
configured = str(self.direct_device_configured_path).strip()
candidates: list[str] = []
if configured:
candidates.append(configured)
for path in sorted(Path("/dev").glob("hidraw*")):
as_text = str(path)
if as_text not in candidates:
candidates.append(as_text)
return candidates
def _open_direct_device(self) -> CtapHidDevice:
last_exc: Exception | None = None
recoverable: tuple[type[Exception], ...] = (FileNotFoundError, PermissionError)
for candidate in self._direct_device_candidates():
try:
descriptor = get_descriptor(candidate)
device = CtapHidDevice(descriptor, open_connection(descriptor))
self.direct_device_active_path = candidate
return device
except Exception as exc:
# USB re-enumeration can leave stale hidraw paths behind, and some sibling
# nodes are vendor interfaces that are not readable to the normal user.
# Skip those and keep probing for a usable CTAPHID node.
if isinstance(exc, recoverable):
last_exc = exc
continue
last_exc = exc
if last_exc is None:
raise FileNotFoundError(f"no hidraw devices available for direct auth (configured {self.direct_device_path})")
raise last_exc
def _get_direct_device(self, *, force_reopen: bool = False) -> CtapHidDevice:
with self.direct_device_lock:
if force_reopen and self.direct_device is not None:
self._drop_direct_device_locked()
if self.direct_device is None:
self.direct_device = self._open_direct_device()
return self.direct_device
def _drop_direct_device_locked(self) -> None:
try:
if self.direct_device is not None:
self.direct_device.close()
except Exception:
pass
self.direct_device = None
self.direct_device_active_path = None
def _drop_direct_device(self) -> None:
with self.direct_device_lock:
self._drop_direct_device_locked()
def _with_direct_ctap2(self, action):
with self.direct_device_lock:
last_exc: Exception | None = None
for reopen in (False, True):
try:
device = self._get_direct_device(force_reopen=reopen)
return action(Ctap2(device))
except Exception as exc:
last_exc = exc
self._drop_direct_device_locked()
assert last_exc is not None
raise last_exc
def _collect_client_data(
self,
request_type: str,
options: PublicKeyCredentialCreationOptions | PublicKeyCredentialRequestOptions,
) -> CollectedClientData:
requested_rp_id = options.rp.id if isinstance(options, PublicKeyCredentialCreationOptions) else options.rp_id
if requested_rp_id != self.rp_id:
raise RuntimeError(f"rp_id mismatch: expected {self.rp_id}, got {requested_rp_id}")
return CollectedClientData.create(
type=request_type,
challenge=options.challenge,
origin=self.origin,
)
def _user_entity(self, username: str, display_name: str | None, user_id: bytes) -> PublicKeyCredentialUserEntity:
return PublicKeyCredentialUserEntity(
id=user_id,
name=username,
display_name=display_name or username,
)
def _register_metadata_only(self, username: str, display_name: str | None) -> Enrollment:
canonical = normalize_username(username)
pretty = normalize_display_name(display_name)
now = int(self._now())
with self.lock:
existing = self.enrollments.get(canonical)
if existing:
raise FileExistsError("user already enrolled")
enrollment = Enrollment(
username=canonical,
display_name=pretty,
created_at=now,
updated_at=now,
)
self.enrollments[canonical] = enrollment
self._save_enrollments_locked()
return enrollment
def _register_direct_fido2(self, username: str, display_name: str | None) -> Enrollment:
canonical = normalize_username(username)
pretty = normalize_display_name(display_name)
now = int(self._now())
with self.lock:
existing = self.enrollments.get(canonical)
if existing and existing.credential_data_b64:
raise FileExistsError("user already enrolled")
user_id = b64u_decode(existing.user_id_b64) if existing and existing.user_id_b64 else secrets.token_bytes(32)
created_at = existing.created_at if existing else now
options, state = self.fido_server.register_begin(
self._user_entity(canonical, pretty, user_id),
user_verification=UserVerificationRequirement.DISCOURAGED,
)
try:
client_data = self._collect_client_data("webauthn.create", options.public_key)
attestation = self._with_direct_ctap2(
lambda ctap2: ctap2.make_credential(
client_data_hash=client_data.hash,
rp=direct_ctap_rp(options.public_key.rp),
user=direct_ctap_user(options.public_key.user),
key_params=direct_ctap_key_params(),
exclude_list=direct_ctap_allow_list(options.public_key.exclude_credentials),
options={"rk": False, "uv": False},
)
)
auth_data = self.fido_server.register_complete(
state,
RegistrationResponse(
id=attestation.auth_data.credential_data.credential_id,
response=AuthenticatorAttestationResponse(
client_data=client_data,
attestation_object=AttestationObject.create(
attestation.fmt,
attestation.auth_data,
attestation.att_stmt,
),
),
),
)
except Exception as exc:
raise RuntimeError(f"card registration failed: {exc}") from exc
credential_data = auth_data.credential_data
if credential_data is None:
raise RuntimeError("card registration returned no credential data")
enrollment = Enrollment(
username=canonical,
display_name=pretty,
created_at=created_at,
updated_at=now,
user_id_b64=b64u_encode(user_id),
credential_data_b64=b64u_encode(bytes(credential_data)),
)
with self.lock:
self.enrollments[canonical] = enrollment
self._save_enrollments_locked()
# Freshly reopen for later assertion flow; some cards do not like immediate
# reuse of the same hidraw handle across makeCredential -> getAssertion.
self._drop_direct_device()
return enrollment
def register_enrollment(self, username: str, display_name: str | None) -> Enrollment:
if self.uses_direct_fido2():
return self._register_direct_fido2(username, display_name)
return self._register_metadata_only(username, display_name)
def update_enrollment(self, username: str, display_name: str | None) -> Enrollment:
canonical = normalize_username(username)
pretty = normalize_display_name(display_name)
now = int(self._now())
with self.lock:
existing = self.enrollments.get(canonical)
if not existing:
raise KeyError("user not enrolled")
existing.display_name = pretty
existing.updated_at = now
self._save_enrollments_locked()
return existing
def delete_enrollment(self, username: str) -> Enrollment:
canonical = normalize_username(username)
with self.lock:
existing = self.enrollments.pop(canonical, None)
if not existing:
raise KeyError("user not enrolled")
dead_tokens = [token for token, sess in self.sessions.items() if sess.username == canonical]
for token in dead_tokens:
del self.sessions[token]
self._save_enrollments_locked()
return existing
def list_enrollments(self) -> list[Enrollment]:
with self.lock:
return [self.enrollments[key] for key in sorted(self.enrollments)]
def get_enrollment(self, username: str) -> Enrollment | None:
try:
canonical = normalize_username(username)
except ValueError:
return None
with self.lock:
return self.enrollments.get(canonical)
def has_enrollment(self, username: str) -> bool:
return self.get_enrollment(username) is not None
def _authenticate_with_probe(self) -> tuple[bool, str]:
try:
proc = subprocess.run(
self.auth_command,
shell=True,
capture_output=True,
text=True,
timeout=10,
check=False,
)
except Exception as exc:
return False, f"auth command failed: {exc}"
if proc.returncode != 0:
stderr = proc.stderr.strip()
stdout = proc.stdout.strip()
details = stderr if stderr else stdout
return False, details or f"auth command exit code {proc.returncode}"
return True, "card presence check succeeded"
def _authenticate_with_direct_fido2(self, username: str) -> tuple[bool, str]:
enrollment = self.get_enrollment(username)
if not enrollment:
return False, "user not enrolled"
if not enrollment.credential_data_b64:
return False, "user has no registered credential"
try:
# Start assertion from a fresh device open rather than reusing the
# post-registration handle, which has been flaky on this stack.
self._drop_direct_device()
credential = AttestedCredentialData(b64u_decode(enrollment.credential_data_b64))
# Keep UV explicitly discouraged here. On the current card/library stack,
# asking for stronger UV flows immediately trips PIN/UV capability errors.
options, state = self.fido_server.authenticate_begin(
[credential],
user_verification=UserVerificationRequirement.DISCOURAGED,
)
client_data = self._collect_client_data("webauthn.get", options.public_key)
assertion = self._with_direct_ctap2(
lambda ctap2: ctap2.get_assertion(
rp_id=options.public_key.rp_id,
client_data_hash=client_data.hash,
allow_list=direct_ctap_allow_list(options.public_key.allow_credentials),
options={"up": True, "uv": False},
)
)
response = assertion.assertions[0] if getattr(assertion, "assertions", None) else assertion
self.fido_server.authenticate_complete(
state,
[credential],
AuthenticationResponse(
id=response.credential["id"],
response=AuthenticatorAssertionResponse(
client_data=client_data,
authenticator_data=response.auth_data,
signature=response.signature,
user_handle=response.user.get("id") if response.user else None,
),
),
)
except Exception as exc:
return False, f"assertion verification failed: {exc}"
return True, "assertion verified"
def authenticate_with_card(self, username: str) -> tuple[bool, str]:
if not self.uses_direct_fido2():
return self._authenticate_with_probe()
return self._authenticate_with_direct_fido2(username)
def fetch_counter(self) -> tuple[int, dict[str, Any]]:
return self.upstream.request_json(
path="/resource/counter",
headers={"X-Proxy-Token": self.proxy_token},
payload={},
)
class UpstreamPool:
def __init__(self, server_base_url: str, server_ca_file: str | None, max_connections: int = 4):
parsed = urlparse(server_base_url)
self.scheme = parsed.scheme
self.host = parsed.hostname or "127.0.0.1"
self.port = parsed.port or (443 if parsed.scheme == "https" else 80)
self.base_path = parsed.path.rstrip("/")
self.server_ca_file = server_ca_file
self.timeout = 5
self.max_connections = max_connections
self.idle: queue.LifoQueue[http.client.HTTPConnection] = queue.LifoQueue()
self.capacity = threading.BoundedSemaphore(max_connections)
def _new_connection(self) -> http.client.HTTPConnection:
if self.scheme == "https":
context = ssl.create_default_context(cafile=self.server_ca_file)
return http.client.HTTPSConnection(
self.host,
self.port,
timeout=self.timeout,
context=context,
)
return http.client.HTTPConnection(self.host, self.port, timeout=self.timeout)
def _acquire(self) -> http.client.HTTPConnection:
self.capacity.acquire()
try:
return self.idle.get_nowait()
except queue.Empty:
return self._new_connection()
def _release(self, conn: http.client.HTTPConnection | None, reusable: bool) -> None:
try:
if conn is not None and reusable:
self.idle.put(conn)
elif conn is not None:
conn.close()
finally:
self.capacity.release()
def request_json(self, path: str, headers: dict[str, str], payload: dict[str, Any]) -> tuple[int, dict[str, Any]]:
conn = self._acquire()
reusable = False
full_path = f"{self.base_path}{path}" if self.base_path else path
try:
body = json.dumps(payload).encode("utf-8")
req_headers = {"Content-Type": "application/json", **headers}
conn.request("POST", full_path, body=body, headers=req_headers)
resp = conn.getresponse()
raw = resp.read()
reusable = not resp.will_close
try:
data = json.loads(raw.decode("utf-8")) if raw else {}
except Exception:
data = {"ok": False, "error": f"server http error {resp.status}"}
return resp.status, data
except (http.client.HTTPException, OSError, ssl.SSLError) as exc:
return 502, {"ok": False, "error": f"server unavailable: {exc}"}
except Exception as exc:
return 502, {"ok": False, "error": f"server call failed: {exc}"}
finally:
self._release(conn, reusable)
class Handler(BaseHTTPRequestHandler):
state: ProxyState
protocol_version = "HTTP/1.1"
def _json(self, status: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _html(self, body: str) -> None:
data = body.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def _discard_request_body(self) -> None:
length = int(self.headers.get("Content-Length", "0"))
if length > 0:
self.rfile.read(length)
def _bearer_token(self) -> str | None:
value = self.headers.get("Authorization", "")
if not value.startswith("Bearer "):
return None
token = value[7:].strip()
return token or None
def _require_session(self) -> tuple[str, Session] | None:
token = self._bearer_token()
if not token:
self._json(401, {"ok": False, "error": "missing bearer token"})
return None
session = self.state.get_session(token)
if not session:
self._json(401, {"ok": False, "error": "invalid or expired session"})
return None
return token, session
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/":
self._html(HTML)
return
if path == "/health":
self._json(
200,
{
"ok": True,
"service": "k_proxy",
"active_sessions": self.state.active_session_count(),
"time": int(time.time()),
},
)
return
if path.startswith("/enroll/status"):
self._enroll_status()
return
if path == "/enroll/list":
self._enroll_list()
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/session/login":
self._session_login()
return
if path == "/enroll/register":
self._enroll_register()
return
if path == "/enroll/update":
self._enroll_update()
return
if path == "/enroll/delete":
self._enroll_delete()
return
if path == "/session/status":
self._session_status()
return
if path == "/session/logout":
self._session_logout()
return
if path == "/resource/counter":
self._resource_counter()
return
self.send_error(404)
def _session_login(self) -> None:
try:
data = self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return
try:
username = normalize_username(str(data.get("username", "")))
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
if not self.state.has_enrollment(username):
self._json(403, {"ok": False, "error": "user not enrolled", "username": username})
return
ok, message = self.state.authenticate_with_card(username)
if not ok:
self._json(401, {"ok": False, "error": "card auth failed", "details": message})
return
token, expires_at = self.state.create_session(username)
self._json(
200,
{
"ok": True,
"username": username,
"session_token": token,
"expires_at": int(expires_at),
"ttl_seconds": self.state.session_ttl_s,
"auth_mode": self.state.auth_mode_label(),
},
)
def _enroll_register(self) -> None:
try:
data = self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return
try:
enrollment = self.state.register_enrollment(
str(data.get("username", "")),
data.get("display_name"),
)
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
except FileExistsError:
self._json(409, {"ok": False, "error": "user already enrolled"})
return
except RuntimeError as exc:
self._json(401, {"ok": False, "error": str(exc)})
return
self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at))
def _enroll_update(self) -> None:
try:
data = self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return
try:
enrollment = self.state.update_enrollment(
str(data.get("username", "")),
data.get("display_name"),
)
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
except KeyError:
self._json(404, {"ok": False, "error": "user not enrolled"})
return
self._json(200, enrollment_payload(enrollment))
def _enroll_delete(self) -> None:
try:
data = self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return
try:
enrollment = self.state.delete_enrollment(str(data.get("username", "")))
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
except KeyError:
self._json(404, {"ok": False, "error": "user not enrolled"})
return
self._json(200, {"ok": True, "username": enrollment.username, "deleted": True})
def _enroll_status(self) -> None:
parsed = urlparse(self.path)
query = {}
if parsed.query:
for chunk in parsed.query.split("&"):
if "=" not in chunk:
continue
key, value = chunk.split("=", 1)
query[key] = value
username = query.get("username", "").strip()
if not username:
self._json(400, {"ok": False, "error": "username query required"})
return
enrollment = self.state.get_enrollment(username)
if not enrollment:
self._json(404, {"ok": False, "error": "user not enrolled", "username": username})
return
self._json(200, enrollment_payload(enrollment))
def _enroll_list(self) -> None:
users = [enrollment_payload(item) for item in self.state.list_enrollments()]
self._json(200, {"ok": True, "users": users})
def _session_status(self) -> None:
self._discard_request_body()
got = self._require_session()
if not got:
return
_, session = got
self._json(
200,
{
"ok": True,
"username": session.username,
"expires_at": int(session.expires_at),
"seconds_remaining": max(0, int(session.expires_at - time.time())),
},
)
def _session_logout(self) -> None:
self._discard_request_body()
token = self._bearer_token()
if not token:
self._json(401, {"ok": False, "error": "missing bearer token"})
return
removed = self.state.invalidate_session(token)
self._json(200, {"ok": True, "invalidated": removed})
def _resource_counter(self) -> None:
self._discard_request_body()
got = self._require_session()
if not got:
return
_, session = got
status, upstream = self.state.fetch_counter()
if status != 200:
self._json(status, {"ok": False, "error": "upstream failed", "upstream": upstream})
return
self._json(
200,
{
"ok": True,
"username": session.username,
"session_reused": True,
"upstream": upstream,
},
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run k_proxy session gateway")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8770)
parser.add_argument("--tls-certfile", help="PEM certificate chain for HTTPS listener")
parser.add_argument("--tls-keyfile", help="PEM private key for HTTPS listener")
parser.add_argument("--session-ttl", type=int, default=300, help="Session TTL in seconds")
parser.add_argument(
"--auth-mode",
choices=(AUTH_MODE_PROBE, AUTH_MODE_FIDO2_DIRECT),
default=AUTH_MODE_PROBE,
help="Session auth mode: legacy card-presence probe or experimental direct FIDO2 registration/assertion",
)
parser.add_argument(
"--auth-command",
default="python3 /home/user/chromecard/fido2_probe.py --json",
help="Command used for legacy probe auth mode",
)
parser.add_argument(
"--rp-id",
default="localhost",
help="Relying party ID used for direct card-backed registration and assertion verification",
)
parser.add_argument(
"--rp-name",
default="ChromeCard Proxy",
help="Relying party name used for direct card-backed registration",
)
parser.add_argument(
"--origin",
default="https://localhost",
help="Synthetic origin used by the local FIDO2 client when talking directly to the attached card",
)
parser.add_argument(
"--server-base-url",
default="http://127.0.0.1:8780",
help="Base URL for k_server",
)
parser.add_argument(
"--server-ca-file",
help="CA certificate used to verify HTTPS certificate presented by k_server",
)
parser.add_argument(
"--server-max-connections",
type=int,
default=1,
help="Maximum concurrent pooled upstream connections from k_proxy to k_server",
)
parser.add_argument(
"--proxy-token",
default="dev-proxy-token",
help="Shared token to authorize requests to k_server",
)
parser.add_argument(
"--enrollment-db",
default="/home/user/chromecard/k_proxy_enrollments.json",
help="JSON file used to persist enrolled usernames for the prototype",
)
parser.add_argument(
"--direct-device-path",
default="/dev/hidraw0",
help="Explicit hidraw path used for direct FIDO2 mode",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if bool(args.tls_certfile) != bool(args.tls_keyfile):
raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS")
if args.server_base_url.startswith("https://") and not args.server_ca_file:
raise SystemExit("--server-ca-file is required when --server-base-url uses https")
state = ProxyState(
session_ttl_s=args.session_ttl,
auth_mode=args.auth_mode,
auth_command=args.auth_command,
server_base_url=args.server_base_url,
server_ca_file=args.server_ca_file,
server_max_connections=args.server_max_connections,
proxy_token=args.proxy_token,
enrollment_db=Path(args.enrollment_db),
rp_id=args.rp_id,
rp_name=args.rp_name,
origin=args.origin,
direct_device_path=args.direct_device_path,
)
Handler.state = state
server = ThreadingHTTPServer((args.host, args.port), Handler)
scheme = "http"
if args.tls_certfile:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=args.tls_certfile, keyfile=args.tls_keyfile)
server.socket = context.wrap_socket(server.socket, server_side=True)
scheme = "https"
print(f"k_proxy listening on {scheme}://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())