k_card/k_proxy_app.py

1351 lines
48 KiB
Python

#!/usr/bin/env python3
"""
k_proxy — session gateway and card authentication bridge.
Creates short-lived bearer sessions after a card-backed auth gate, then
proxies authenticated requests to k_server. Enrollment metadata and session
state are both process-local; sessions do not survive a restart.
Default auth mode is a lightweight card-presence probe (subprocess call to
fido2_probe.py). Pass --auth-mode fido2-direct for real CTAP2
makeCredential/getAssertion against the attached ChromeCard.
"""
from __future__ import annotations
import argparse
import base64
import http.client
import json
import queue
import re
import secrets
import ssl
import subprocess
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
import fido2.features
from fido2.client import Fido2Client, UserInteraction, verify_rp_id
from fido2.ctap2 import Ctap2
from fido2.hid import CtapHidDevice
from fido2.hid.linux import get_descriptor, open_connection
from fido2.server import Fido2Server
from fido2.webauthn import (
AttestedCredentialData,
AttestationObject,
AuthenticatorAssertionResponse,
AuthenticatorAttestationResponse,
AuthenticationResponse,
CollectedClientData,
PublicKeyCredentialCreationOptions,
PublicKeyCredentialRequestOptions,
PublicKeyCredentialRpEntity,
PublicKeyCredentialUserEntity,
RegistrationResponse,
UserVerificationRequirement,
)
try:
if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None:
fido2.features.webauthn_json_mapping.enabled = True
except AttributeError:
pass
HTML = """<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>ChromeCard Proxy Portal</title>
<style>
:root {
--bg: #f1eee8;
--panel: #fffdf8;
--ink: #171615;
--muted: #645f56;
--line: #d6cbb9;
--accent: #0c6a60;
--accent-2: #8e5b2d;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "Iowan Old Style", "Palatino Linotype", serif;
color: var(--ink);
background:
radial-gradient(circle at top right, rgba(12,106,96,0.12), transparent 32%),
radial-gradient(circle at left center, rgba(142,91,45,0.10), transparent 28%),
linear-gradient(180deg, #faf7f0 0%, var(--bg) 100%);
}
main {
max-width: 900px;
margin: 0 auto;
padding: 32px 20px 56px;
}
.hero, .card {
background: var(--panel);
border: 1px solid var(--line);
box-shadow: 0 16px 34px rgba(49, 38, 21, 0.08);
}
.hero {
padding: 24px;
margin-bottom: 20px;
}
h1 {
margin: 0 0 10px;
font-size: clamp(2rem, 4vw, 3.5rem);
line-height: 0.95;
letter-spacing: -0.04em;
}
.subtitle {
margin: 0;
color: var(--muted);
max-width: 64ch;
}
.grid {
display: grid;
gap: 18px;
grid-template-columns: repeat(auto-fit, minmax(260px, 1fr));
}
.card { padding: 18px; }
.card h2 {
margin: 0 0 12px;
font-size: 1.15rem;
}
label {
display: block;
margin-bottom: 8px;
font-size: 0.92rem;
color: var(--muted);
}
input {
width: 100%;
padding: 10px 12px;
border: 1px solid var(--line);
background: #fff;
font: inherit;
color: var(--ink);
}
.actions {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin-top: 14px;
}
button {
border: 0;
padding: 10px 14px;
font: inherit;
color: #fff;
background: var(--accent);
cursor: pointer;
}
button.secondary { background: var(--accent-2); }
.status {
display: grid;
gap: 8px;
margin-top: 14px;
color: var(--muted);
}
pre {
margin: 18px 0 0;
min-height: 300px;
padding: 16px;
overflow: auto;
border: 1px solid var(--line);
background: #141210;
color: #efe6d8;
font-family: "SFMono-Regular", Consolas, monospace;
font-size: 0.9rem;
line-height: 1.45;
}
</style>
</head>
<body>
<main>
<section class="hero">
<h1>ChromeCard Proxy Portal</h1>
<p class="subtitle">
Primary browser entry point for the current prototype. Browser traffic now targets k_proxy directly.
Enrollment, card-backed login, session reuse, counter access, and logout all happen on this TLS endpoint.
</p>
</section>
<section class="grid">
<div class="card">
<h2>Enrollment</h2>
<label for="username">Username</label>
<input id="username" placeholder="alice" autocomplete="off">
<label for="displayName">Display Name</label>
<input id="displayName" placeholder="Alice Example" autocomplete="off">
<div class="actions">
<button id="enrollBtn">Enroll User</button>
<button id="updateBtn" class="secondary">Update User</button>
<button id="deleteBtn" class="secondary">Delete User</button>
<button id="checkBtn" class="secondary">Check Enrollment</button>
<button id="listBtn" class="secondary">List Users</button>
</div>
<div class="status">
<div>Stored username: <strong id="storedUser">none</strong></div>
<div>Session active: <strong id="sessionActive">no</strong></div>
</div>
</div>
<div class="card">
<h2>Session Flow</h2>
<div class="actions">
<button id="loginBtn">Login</button>
<button id="statusBtn" class="secondary">Status</button>
<button id="counterBtn">Counter</button>
<button id="logoutBtn" class="secondary">Logout</button>
</div>
</div>
</section>
<pre id="log"></pre>
</main>
<script>
const USER_KEY = "chromecard.proxy.username";
const TOKEN_KEY = "chromecard.proxy.session_token";
const EXP_KEY = "chromecard.proxy.expires_at";
const logNode = document.getElementById("log");
const usernameNode = document.getElementById("username");
const displayNameNode = document.getElementById("displayName");
const storedUserNode = document.getElementById("storedUser");
const sessionActiveNode = document.getElementById("sessionActive");
function getStoredUser() { return localStorage.getItem(USER_KEY) || ""; }
function getStoredToken() { return localStorage.getItem(TOKEN_KEY) || ""; }
function syncState() {
const user = getStoredUser();
storedUserNode.textContent = user || "none";
sessionActiveNode.textContent = getStoredToken() ? "yes" : "no";
if (user && !usernameNode.value) {
usernameNode.value = user;
}
}
function log(message, payload) {
const stamp = new Date().toLocaleTimeString();
let line = `[${stamp}] ${message}`;
if (payload !== undefined) {
line += "\\n" + JSON.stringify(payload, null, 2);
}
logNode.textContent = line + "\\n\\n" + logNode.textContent;
}
async function jsonRequest(method, path, payload, withToken = false) {
const headers = {"Content-Type": "application/json"};
if (withToken && getStoredToken()) {
headers["Authorization"] = "Bearer " + getStoredToken();
}
const resp = await fetch(path, {
method,
headers,
body: payload === undefined ? undefined : JSON.stringify(payload)
});
const data = await resp.json();
if (!resp.ok) {
throw new Error(JSON.stringify(data));
}
return data;
}
document.getElementById("enrollBtn").addEventListener("click", async () => {
try {
const username = usernameNode.value.trim();
const display_name = displayNameNode.value.trim();
const data = await jsonRequest("POST", "/enroll/register", {username, display_name});
localStorage.setItem(USER_KEY, username);
syncState();
log("Enrollment updated", data);
} catch (err) {
log("Enrollment failed", {error: err.message});
}
});
document.getElementById("checkBtn").addEventListener("click", async () => {
try {
const username = usernameNode.value.trim() || getStoredUser();
const resp = await fetch("/enroll/status?username=" + encodeURIComponent(username));
const data = await resp.json();
if (!resp.ok) throw new Error(JSON.stringify(data));
log("Enrollment status", data);
if (data.display_name) {
displayNameNode.value = data.display_name;
}
} catch (err) {
log("Enrollment status failed", {error: err.message});
}
});
document.getElementById("updateBtn").addEventListener("click", async () => {
try {
const username = usernameNode.value.trim() || getStoredUser();
const display_name = displayNameNode.value.trim();
const data = await jsonRequest("POST", "/enroll/update", {username, display_name});
localStorage.setItem(USER_KEY, username);
syncState();
log("Enrollment updated", data);
} catch (err) {
log("Enrollment update failed", {error: err.message});
}
});
document.getElementById("deleteBtn").addEventListener("click", async () => {
try {
const username = usernameNode.value.trim() || getStoredUser();
const data = await jsonRequest("POST", "/enroll/delete", {username});
if (getStoredUser() === username) {
localStorage.removeItem(USER_KEY);
localStorage.removeItem(TOKEN_KEY);
localStorage.removeItem(EXP_KEY);
}
displayNameNode.value = "";
syncState();
log("Enrollment deleted", data);
} catch (err) {
log("Enrollment delete failed", {error: err.message});
}
});
document.getElementById("listBtn").addEventListener("click", async () => {
try {
const resp = await fetch("/enroll/list");
const data = await resp.json();
if (!resp.ok) throw new Error(JSON.stringify(data));
log("Enrollment list", data);
} catch (err) {
log("Enrollment list failed", {error: err.message});
}
});
document.getElementById("loginBtn").addEventListener("click", async () => {
try {
const username = usernameNode.value.trim() || getStoredUser();
const data = await jsonRequest("POST", "/session/login", {username});
localStorage.setItem(USER_KEY, username);
localStorage.setItem(TOKEN_KEY, data.session_token || "");
localStorage.setItem(EXP_KEY, String(data.expires_at || ""));
syncState();
log("Login ok", data);
} catch (err) {
log("Login failed", {error: err.message});
}
});
document.getElementById("statusBtn").addEventListener("click", async () => {
try {
const data = await jsonRequest("POST", "/session/status", {}, true);
log("Session status", data);
} catch (err) {
log("Status failed", {error: err.message});
}
});
document.getElementById("counterBtn").addEventListener("click", async () => {
try {
const data = await jsonRequest("POST", "/resource/counter", {}, true);
log("Counter response", data);
} catch (err) {
log("Counter failed", {error: err.message});
}
});
document.getElementById("logoutBtn").addEventListener("click", async () => {
try {
const data = await jsonRequest("POST", "/session/logout", {}, true);
localStorage.removeItem(TOKEN_KEY);
localStorage.removeItem(EXP_KEY);
syncState();
log("Logout response", data);
} catch (err) {
log("Logout failed", {error: err.message});
}
});
syncState();
</script>
</body>
</html>
"""
@dataclass
class Session:
username: str
expires_at: float
@dataclass
class Enrollment:
username: str
display_name: str | None
created_at: int
updated_at: int
user_id_b64: str | None = None
credential_data_b64: str | None = None
USERNAME_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9._-]{1,30}[a-z0-9])?$")
AUTH_MODE_PROBE = "probe"
AUTH_MODE_FIDO2_DIRECT = "fido2-direct"
def normalize_username(raw: str) -> str:
username = raw.strip().lower()
if not USERNAME_PATTERN.fullmatch(username):
raise ValueError(
"username must be 3-32 chars of lowercase letters, digits, dot, underscore, or dash"
)
return username
def normalize_display_name(raw: str | None) -> str | None:
value = (raw or "").strip()
if not value:
return None
if len(value) > 64:
raise ValueError("display_name must be 64 characters or fewer")
return value
def b64u_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def b64u_decode(data: str) -> bytes:
pad = "=" * ((4 - len(data) % 4) % 4)
return base64.urlsafe_b64decode((data + pad).encode("ascii"))
def direct_ctap_key_params() -> list[dict[str, Any]]:
# Match the raw probe's narrower algorithm set. The broader default list from
# Fido2Server.register_begin was still hitting post-confirmation I/O errors.
return [
{"type": "public-key", "alg": -7},
{"type": "public-key", "alg": -257},
]
def direct_ctap_rp(rp: PublicKeyCredentialRpEntity) -> dict[str, Any]:
return {"id": rp.id, "name": rp.name}
def direct_ctap_user(user: PublicKeyCredentialUserEntity) -> dict[str, Any]:
user_id = user.id
if isinstance(user_id, bytes):
# Match the raw probe's ASCII user-id shape rather than sending opaque
# binary bytes into the card path.
user_id = user_id.hex().encode("ascii")
return {
"id": user_id,
"name": user.name,
"displayName": user.display_name or user.name,
}
def direct_ctap_allow_list(
creds: list[Any] | None,
) -> list[dict[str, Any]] | None:
if not creds:
return None
out: list[dict[str, Any]] = []
for cred in creds:
cred_id = getattr(cred, "id", None)
if cred_id is None and isinstance(cred, dict):
cred_id = cred.get("id")
out.append({"type": "public-key", "id": cred_id})
return out
def enrollment_payload(enrollment: "Enrollment", *, created: bool | None = None) -> dict[str, Any]:
payload: dict[str, Any] = {
"ok": True,
"username": enrollment.username,
"display_name": enrollment.display_name,
"created_at": enrollment.created_at,
"updated_at": enrollment.updated_at,
"has_credential": bool(enrollment.credential_data_b64),
}
if created is not None:
payload["created"] = created
return payload
class ProxyUserInteraction(UserInteraction):
def prompt_up(self) -> None:
print("Touch the ChromeCard to continue...", flush=True)
super().prompt_up()
def request_pin(self, permissions, rp_id: str | None) -> str | None:
print("Authenticator PIN is required but not supported by this prototype.", flush=True)
return super().request_pin(permissions, rp_id)
class ProxyState:
def __init__(
self,
session_ttl_s: int,
auth_mode: str,
auth_command: str,
server_base_url: str,
server_ca_file: str | None,
server_max_connections: int,
proxy_token: str,
enrollment_db: Path,
rp_id: str,
rp_name: str,
origin: str,
direct_device_path: str,
):
self.session_ttl_s = session_ttl_s
self.auth_mode = auth_mode
self.auth_command = auth_command
self.server_base_url = server_base_url.rstrip("/")
self.server_ca_file = server_ca_file
self.proxy_token = proxy_token
self.enrollment_db = enrollment_db
self.rp_id = rp_id
self.origin = origin
self.direct_device_path = direct_device_path
self.direct_device_configured_path = direct_device_path
self.direct_device_active_path: str | None = None
self.lock = threading.Lock()
self.direct_device_lock = threading.RLock()
self.direct_device: CtapHidDevice | None = None
self.sessions: dict[str, Session] = {}
self.enrollments: dict[str, Enrollment] = {}
self.rp = PublicKeyCredentialRpEntity(id=rp_id, name=rp_name)
self.fido_server = Fido2Server(self.rp)
self.client_data_collector = None
self.upstream = UpstreamPool(
server_base_url=self.server_base_url,
server_ca_file=self.server_ca_file,
max_connections=server_max_connections,
)
self._load_enrollments()
def uses_direct_fido2(self) -> bool:
return self.auth_mode == AUTH_MODE_FIDO2_DIRECT
def auth_mode_label(self) -> str:
return "fido2_assertion" if self.uses_direct_fido2() else "card_presence_probe"
def _now(self) -> float:
return time.time()
def _gc_locked(self) -> None:
# Caller must hold self.lock.
now = self._now()
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
for token in dead:
del self.sessions[token]
def create_session(self, username: str) -> tuple[str, float]:
token = secrets.token_urlsafe(32)
now = self._now()
expires_at = now + self.session_ttl_s
with self.lock:
self._gc_locked()
self.sessions[token] = Session(username=username, expires_at=expires_at)
return token, expires_at
def get_session(self, token: str) -> Session | None:
with self.lock:
self._gc_locked()
return self.sessions.get(token)
def invalidate_session(self, token: str) -> bool:
with self.lock:
return self.sessions.pop(token, None) is not None
def active_session_count(self) -> int:
with self.lock:
self._gc_locked()
return len(self.sessions)
def _load_enrollments(self) -> None:
if not self.enrollment_db.exists():
return
try:
payload = json.loads(self.enrollment_db.read_text())
users = payload.get("users", [])
for item in users:
username = str(item.get("username", "")).strip()
if not username:
continue
created_at = int(item.get("created_at", item.get("enrolled_at", int(self._now()))))
updated_at = int(item.get("updated_at", created_at))
self.enrollments[username] = Enrollment(
username=username,
display_name=normalize_display_name(item.get("display_name")),
created_at=created_at,
updated_at=updated_at,
user_id_b64=item.get("user_id_b64"),
credential_data_b64=item.get("credential_data_b64"),
)
except Exception:
self.enrollments = {}
def _save_enrollments_locked(self) -> None:
self.enrollment_db.parent.mkdir(parents=True, exist_ok=True)
users = [
{
"username": enrollment.username,
"display_name": enrollment.display_name,
"created_at": enrollment.created_at,
"updated_at": enrollment.updated_at,
"user_id_b64": enrollment.user_id_b64,
"credential_data_b64": enrollment.credential_data_b64,
}
for enrollment in sorted(self.enrollments.values(), key=lambda item: item.username)
]
self.enrollment_db.write_text(json.dumps({"users": users}, indent=2) + "\n")
def _new_fido_client(self) -> Fido2Client:
device = self._get_direct_device()
# Newer python-fido2 builds accept a custom client-data collector, while the
# VM-side package still expects an origin string plus verifier callback.
if self.client_data_collector is not None:
return Fido2Client(device, self.client_data_collector, ProxyUserInteraction())
return Fido2Client(device, self.origin, verify=verify_rp_id, user_interaction=ProxyUserInteraction())
def _direct_device_candidates(self) -> list[str]:
configured = str(self.direct_device_configured_path).strip()
candidates: list[str] = []
if configured:
candidates.append(configured)
for path in sorted(Path("/dev").glob("hidraw*")):
as_text = str(path)
if as_text not in candidates:
candidates.append(as_text)
return candidates
def _open_direct_device(self) -> CtapHidDevice:
last_exc: Exception | None = None
recoverable: tuple[type[Exception], ...] = (FileNotFoundError, PermissionError)
for candidate in self._direct_device_candidates():
try:
descriptor = get_descriptor(candidate)
device = CtapHidDevice(descriptor, open_connection(descriptor))
self.direct_device_active_path = candidate
return device
except Exception as exc:
# USB re-enumeration can leave stale hidraw paths behind, and some sibling
# nodes are vendor interfaces that are not readable to the normal user.
# Skip those and keep probing for a usable CTAPHID node.
if isinstance(exc, recoverable):
last_exc = exc
continue
last_exc = exc
if last_exc is None:
raise FileNotFoundError(f"no hidraw devices available for direct auth (configured {self.direct_device_path})")
raise last_exc
def _get_direct_device(self, *, force_reopen: bool = False) -> CtapHidDevice:
with self.direct_device_lock:
if force_reopen and self.direct_device is not None:
self._drop_direct_device_locked()
if self.direct_device is None:
self.direct_device = self._open_direct_device()
return self.direct_device
def _drop_direct_device_locked(self) -> None:
try:
if self.direct_device is not None:
self.direct_device.close()
except Exception:
pass
self.direct_device = None
self.direct_device_active_path = None
def _drop_direct_device(self) -> None:
with self.direct_device_lock:
self._drop_direct_device_locked()
def _with_direct_ctap2(self, action):
# First attempt reuses the cached handle; if it fails (e.g. the card was
# briefly removed or the CTAPHID channel desynchronised), we reopen once
# and retry before propagating the error.
with self.direct_device_lock:
last_exc: Exception | None = None
for reopen in (False, True):
try:
device = self._get_direct_device(force_reopen=reopen)
return action(Ctap2(device))
except Exception as exc:
last_exc = exc
self._drop_direct_device_locked()
assert last_exc is not None
raise last_exc
def _collect_client_data(
self,
request_type: str,
options: PublicKeyCredentialCreationOptions | PublicKeyCredentialRequestOptions,
) -> CollectedClientData:
requested_rp_id = options.rp.id if isinstance(options, PublicKeyCredentialCreationOptions) else options.rp_id
if requested_rp_id != self.rp_id:
raise RuntimeError(f"rp_id mismatch: expected {self.rp_id}, got {requested_rp_id}")
return CollectedClientData.create(
type=request_type,
challenge=options.challenge,
origin=self.origin,
)
def _user_entity(self, username: str, display_name: str | None, user_id: bytes) -> PublicKeyCredentialUserEntity:
return PublicKeyCredentialUserEntity(
id=user_id,
name=username,
display_name=display_name or username,
)
def _register_metadata_only(self, username: str, display_name: str | None) -> Enrollment:
canonical = normalize_username(username)
pretty = normalize_display_name(display_name)
now = int(self._now())
with self.lock:
existing = self.enrollments.get(canonical)
if existing:
raise FileExistsError("user already enrolled")
enrollment = Enrollment(
username=canonical,
display_name=pretty,
created_at=now,
updated_at=now,
)
self.enrollments[canonical] = enrollment
self._save_enrollments_locked()
return enrollment
def _register_direct_fido2(self, username: str, display_name: str | None) -> Enrollment:
canonical = normalize_username(username)
pretty = normalize_display_name(display_name)
now = int(self._now())
with self.lock:
existing = self.enrollments.get(canonical)
if existing and existing.credential_data_b64:
raise FileExistsError("user already enrolled")
user_id = b64u_decode(existing.user_id_b64) if existing and existing.user_id_b64 else secrets.token_bytes(32)
created_at = existing.created_at if existing else now
options, state = self.fido_server.register_begin(
self._user_entity(canonical, pretty, user_id),
user_verification=UserVerificationRequirement.DISCOURAGED,
)
try:
client_data = self._collect_client_data("webauthn.create", options.public_key)
attestation = self._with_direct_ctap2(
lambda ctap2: ctap2.make_credential(
client_data_hash=client_data.hash,
rp=direct_ctap_rp(options.public_key.rp),
user=direct_ctap_user(options.public_key.user),
key_params=direct_ctap_key_params(),
exclude_list=direct_ctap_allow_list(options.public_key.exclude_credentials),
options={"rk": False, "uv": False},
)
)
auth_data = self.fido_server.register_complete(
state,
RegistrationResponse(
id=attestation.auth_data.credential_data.credential_id,
response=AuthenticatorAttestationResponse(
client_data=client_data,
attestation_object=AttestationObject.create(
attestation.fmt,
attestation.auth_data,
attestation.att_stmt,
),
),
),
)
except Exception as exc:
raise RuntimeError(f"card registration failed: {exc}") from exc
credential_data = auth_data.credential_data
if credential_data is None:
raise RuntimeError("card registration returned no credential data")
enrollment = Enrollment(
username=canonical,
display_name=pretty,
created_at=created_at,
updated_at=now,
user_id_b64=b64u_encode(user_id),
credential_data_b64=b64u_encode(bytes(credential_data)),
)
with self.lock:
self.enrollments[canonical] = enrollment
self._save_enrollments_locked()
# Freshly reopen for later assertion flow; some cards do not like immediate
# reuse of the same hidraw handle across makeCredential -> getAssertion.
self._drop_direct_device()
return enrollment
def register_enrollment(self, username: str, display_name: str | None) -> Enrollment:
if self.uses_direct_fido2():
return self._register_direct_fido2(username, display_name)
return self._register_metadata_only(username, display_name)
def update_enrollment(self, username: str, display_name: str | None) -> Enrollment:
canonical = normalize_username(username)
pretty = normalize_display_name(display_name)
now = int(self._now())
with self.lock:
existing = self.enrollments.get(canonical)
if not existing:
raise KeyError("user not enrolled")
existing.display_name = pretty
existing.updated_at = now
self._save_enrollments_locked()
return existing
def delete_enrollment(self, username: str) -> Enrollment:
canonical = normalize_username(username)
with self.lock:
existing = self.enrollments.pop(canonical, None)
if not existing:
raise KeyError("user not enrolled")
dead_tokens = [token for token, sess in self.sessions.items() if sess.username == canonical]
for token in dead_tokens:
del self.sessions[token]
self._save_enrollments_locked()
return existing
def list_enrollments(self) -> list[Enrollment]:
with self.lock:
return [self.enrollments[key] for key in sorted(self.enrollments)]
def get_enrollment(self, username: str) -> Enrollment | None:
try:
canonical = normalize_username(username)
except ValueError:
return None
with self.lock:
return self.enrollments.get(canonical)
def has_enrollment(self, username: str) -> bool:
return self.get_enrollment(username) is not None
def _authenticate_with_probe(self) -> tuple[bool, str]:
try:
proc = subprocess.run(
self.auth_command,
shell=True,
capture_output=True,
text=True,
timeout=10,
check=False,
)
except Exception as exc:
return False, f"auth command failed: {exc}"
if proc.returncode != 0:
stderr = proc.stderr.strip()
stdout = proc.stdout.strip()
details = stderr if stderr else stdout
return False, details or f"auth command exit code {proc.returncode}"
return True, "card presence check succeeded"
def _authenticate_with_direct_fido2(self, username: str) -> tuple[bool, str]:
enrollment = self.get_enrollment(username)
if not enrollment:
return False, "user not enrolled"
if not enrollment.credential_data_b64:
return False, "user has no registered credential"
try:
# Start assertion from a fresh device open rather than reusing the
# post-registration handle, which has been flaky on this stack.
self._drop_direct_device()
credential = AttestedCredentialData(b64u_decode(enrollment.credential_data_b64))
# Keep UV explicitly discouraged here. On the current card/library stack,
# asking for stronger UV flows immediately trips PIN/UV capability errors.
options, state = self.fido_server.authenticate_begin(
[credential],
user_verification=UserVerificationRequirement.DISCOURAGED,
)
client_data = self._collect_client_data("webauthn.get", options.public_key)
assertion = self._with_direct_ctap2(
lambda ctap2: ctap2.get_assertion(
rp_id=options.public_key.rp_id,
client_data_hash=client_data.hash,
allow_list=direct_ctap_allow_list(options.public_key.allow_credentials),
options={"up": True, "uv": False},
)
)
response = assertion.assertions[0] if getattr(assertion, "assertions", None) else assertion
self.fido_server.authenticate_complete(
state,
[credential],
AuthenticationResponse(
id=response.credential["id"],
response=AuthenticatorAssertionResponse(
client_data=client_data,
authenticator_data=response.auth_data,
signature=response.signature,
user_handle=response.user.get("id") if response.user else None,
),
),
)
except Exception as exc:
return False, f"assertion verification failed: {exc}"
return True, "assertion verified"
def authenticate_with_card(self, username: str) -> tuple[bool, str]:
if not self.uses_direct_fido2():
return self._authenticate_with_probe()
return self._authenticate_with_direct_fido2(username)
def fetch_counter(self) -> tuple[int, dict[str, Any]]:
return self.upstream.request_json(
path="/resource/counter",
headers={"X-Proxy-Token": self.proxy_token},
payload={},
)
class UpstreamPool:
def __init__(self, server_base_url: str, server_ca_file: str | None, max_connections: int = 4):
parsed = urlparse(server_base_url)
self.scheme = parsed.scheme
self.host = parsed.hostname or "127.0.0.1"
self.port = parsed.port or (443 if parsed.scheme == "https" else 80)
self.base_path = parsed.path.rstrip("/")
self.server_ca_file = server_ca_file
self.timeout = 5
self.max_connections = max_connections
self.idle: queue.LifoQueue[http.client.HTTPConnection] = queue.LifoQueue()
self.capacity = threading.BoundedSemaphore(max_connections)
def _new_connection(self) -> http.client.HTTPConnection:
if self.scheme == "https":
context = ssl.create_default_context(cafile=self.server_ca_file)
return http.client.HTTPSConnection(
self.host,
self.port,
timeout=self.timeout,
context=context,
)
return http.client.HTTPConnection(self.host, self.port, timeout=self.timeout)
def _acquire(self) -> http.client.HTTPConnection:
self.capacity.acquire()
try:
return self.idle.get_nowait()
except queue.Empty:
return self._new_connection()
def _release(self, conn: http.client.HTTPConnection | None, reusable: bool) -> None:
try:
if conn is not None and reusable:
self.idle.put(conn)
elif conn is not None:
conn.close()
finally:
self.capacity.release()
def request_json(self, path: str, headers: dict[str, str], payload: dict[str, Any]) -> tuple[int, dict[str, Any]]:
conn = self._acquire()
reusable = False
full_path = f"{self.base_path}{path}" if self.base_path else path
try:
body = json.dumps(payload).encode("utf-8")
req_headers = {"Content-Type": "application/json", **headers}
conn.request("POST", full_path, body=body, headers=req_headers)
resp = conn.getresponse()
raw = resp.read()
# will_close is set by the server when it intends to close the connection
# after this response; reusing such a connection would hit an EOF.
reusable = not resp.will_close
try:
data = json.loads(raw.decode("utf-8")) if raw else {}
except Exception:
data = {"ok": False, "error": f"server http error {resp.status}"}
return resp.status, data
except (http.client.HTTPException, OSError, ssl.SSLError) as exc:
return 502, {"ok": False, "error": f"server unavailable: {exc}"}
except Exception as exc:
return 502, {"ok": False, "error": f"server call failed: {exc}"}
finally:
self._release(conn, reusable)
class Handler(BaseHTTPRequestHandler):
state: ProxyState
protocol_version = "HTTP/1.1"
def _json(self, status: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _html(self, body: str) -> None:
data = body.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def _discard_request_body(self) -> None:
# HTTP/1.1 keep-alive: body must be consumed before the response is sent.
length = int(self.headers.get("Content-Length", "0"))
if length > 0:
self.rfile.read(length)
def _require_json(self) -> dict[str, Any] | None:
# Returns None and sends 400 when the body is unparseable; callers must
# return immediately without sending a second response.
try:
return self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return None
def _bearer_token(self) -> str | None:
value = self.headers.get("Authorization", "")
if not value.startswith("Bearer "):
return None
token = value[7:].strip()
return token or None
def _require_session(self) -> tuple[str, Session] | None:
# Returns None when auth fails; the 401 has already been sent, so callers
# must return immediately without writing a second response.
token = self._bearer_token()
if not token:
self._json(401, {"ok": False, "error": "missing bearer token"})
return None
session = self.state.get_session(token)
if not session:
self._json(401, {"ok": False, "error": "invalid or expired session"})
return None
return token, session
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/":
self._html(HTML)
return
if path == "/health":
self._json(
200,
{
"ok": True,
"service": "k_proxy",
"active_sessions": self.state.active_session_count(),
"time": int(time.time()),
},
)
return
if path.startswith("/enroll/status"):
self._enroll_status()
return
if path == "/enroll/list":
self._enroll_list()
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/session/login":
self._session_login()
return
if path == "/enroll/register":
self._enroll_register()
return
if path == "/enroll/update":
self._enroll_update()
return
if path == "/enroll/delete":
self._enroll_delete()
return
if path == "/session/status":
self._session_status()
return
if path == "/session/logout":
self._session_logout()
return
if path == "/resource/counter":
self._resource_counter()
return
self.send_error(404)
def _session_login(self) -> None:
data = self._require_json()
if data is None:
return
try:
username = normalize_username(str(data.get("username", "")))
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
if not self.state.has_enrollment(username):
self._json(403, {"ok": False, "error": "user not enrolled", "username": username})
return
ok, message = self.state.authenticate_with_card(username)
if not ok:
self._json(401, {"ok": False, "error": "card auth failed", "details": message})
return
token, expires_at = self.state.create_session(username)
self._json(
200,
{
"ok": True,
"username": username,
"session_token": token,
"expires_at": int(expires_at),
"ttl_seconds": self.state.session_ttl_s,
"auth_mode": self.state.auth_mode_label(),
},
)
def _enroll_register(self) -> None:
data = self._require_json()
if data is None:
return
try:
enrollment = self.state.register_enrollment(
str(data.get("username", "")),
data.get("display_name"),
)
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
except FileExistsError:
self._json(409, {"ok": False, "error": "user already enrolled"})
return
except RuntimeError as exc:
self._json(401, {"ok": False, "error": str(exc)})
return
self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at))
def _enroll_update(self) -> None:
data = self._require_json()
if data is None:
return
try:
enrollment = self.state.update_enrollment(
str(data.get("username", "")),
data.get("display_name"),
)
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
except KeyError:
self._json(404, {"ok": False, "error": "user not enrolled"})
return
self._json(200, enrollment_payload(enrollment))
def _enroll_delete(self) -> None:
data = self._require_json()
if data is None:
return
try:
enrollment = self.state.delete_enrollment(str(data.get("username", "")))
except ValueError as exc:
self._json(400, {"ok": False, "error": str(exc)})
return
except KeyError:
self._json(404, {"ok": False, "error": "user not enrolled"})
return
self._json(200, {"ok": True, "username": enrollment.username, "deleted": True})
def _enroll_status(self) -> None:
parsed = urlparse(self.path)
query = {}
if parsed.query:
for chunk in parsed.query.split("&"):
if "=" not in chunk:
continue
key, value = chunk.split("=", 1)
query[key] = value
username = query.get("username", "").strip()
if not username:
self._json(400, {"ok": False, "error": "username query required"})
return
enrollment = self.state.get_enrollment(username)
if not enrollment:
self._json(404, {"ok": False, "error": "user not enrolled", "username": username})
return
self._json(200, enrollment_payload(enrollment))
def _enroll_list(self) -> None:
users = [enrollment_payload(item) for item in self.state.list_enrollments()]
self._json(200, {"ok": True, "users": users})
def _session_status(self) -> None:
self._discard_request_body()
got = self._require_session()
if not got:
return
_, session = got
self._json(
200,
{
"ok": True,
"username": session.username,
"expires_at": int(session.expires_at),
"seconds_remaining": max(0, int(session.expires_at - time.time())),
},
)
def _session_logout(self) -> None:
self._discard_request_body()
token = self._bearer_token()
if not token:
self._json(401, {"ok": False, "error": "missing bearer token"})
return
removed = self.state.invalidate_session(token)
self._json(200, {"ok": True, "invalidated": removed})
def _resource_counter(self) -> None:
self._discard_request_body()
got = self._require_session()
if not got:
return
_, session = got
status, upstream = self.state.fetch_counter()
if status != 200:
self._json(status, {"ok": False, "error": "upstream failed", "upstream": upstream})
return
self._json(
200,
{
"ok": True,
"username": session.username,
"session_reused": True,
"upstream": upstream,
},
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run k_proxy session gateway")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8770)
parser.add_argument("--tls-certfile", help="PEM certificate chain for HTTPS listener")
parser.add_argument("--tls-keyfile", help="PEM private key for HTTPS listener")
parser.add_argument("--session-ttl", type=int, default=300, help="Session TTL in seconds")
parser.add_argument(
"--auth-mode",
choices=(AUTH_MODE_PROBE, AUTH_MODE_FIDO2_DIRECT),
default=AUTH_MODE_PROBE,
help="Session auth mode: legacy card-presence probe or experimental direct FIDO2 registration/assertion",
)
parser.add_argument(
"--auth-command",
default="python3 /home/user/chromecard/fido2_probe.py --json",
help="Command used for legacy probe auth mode",
)
parser.add_argument(
"--rp-id",
default="localhost",
help="Relying party ID used for direct card-backed registration and assertion verification",
)
parser.add_argument(
"--rp-name",
default="ChromeCard Proxy",
help="Relying party name used for direct card-backed registration",
)
parser.add_argument(
"--origin",
default="https://localhost",
help="Synthetic origin used by the local FIDO2 client when talking directly to the attached card",
)
parser.add_argument(
"--server-base-url",
default="http://127.0.0.1:8780",
help="Base URL for k_server",
)
parser.add_argument(
"--server-ca-file",
help="CA certificate used to verify HTTPS certificate presented by k_server",
)
parser.add_argument(
"--server-max-connections",
type=int,
default=1,
help="Maximum concurrent pooled upstream connections from k_proxy to k_server",
)
parser.add_argument(
"--proxy-token",
default="dev-proxy-token",
help="Shared token to authorize requests to k_server",
)
parser.add_argument(
"--enrollment-db",
default="/home/user/chromecard/k_proxy_enrollments.json",
help="JSON file used to persist enrolled usernames for the prototype",
)
parser.add_argument(
"--direct-device-path",
default="/dev/hidraw0",
help="Explicit hidraw path used for direct FIDO2 mode",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if bool(args.tls_certfile) != bool(args.tls_keyfile):
raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS")
if args.server_base_url.startswith("https://") and not args.server_ca_file:
raise SystemExit("--server-ca-file is required when --server-base-url uses https")
state = ProxyState(
session_ttl_s=args.session_ttl,
auth_mode=args.auth_mode,
auth_command=args.auth_command,
server_base_url=args.server_base_url,
server_ca_file=args.server_ca_file,
server_max_connections=args.server_max_connections,
proxy_token=args.proxy_token,
enrollment_db=Path(args.enrollment_db),
rp_id=args.rp_id,
rp_name=args.rp_name,
origin=args.origin,
direct_device_path=args.direct_device_path,
)
Handler.state = state
server = ThreadingHTTPServer((args.host, args.port), Handler)
scheme = "http"
if args.tls_certfile:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=args.tls_certfile, keyfile=args.tls_keyfile)
server.socket = context.wrap_socket(server.socket, server_side=True)
scheme = "https"
print(f"k_proxy listening on {scheme}://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())