Compare commits
No commits in common. "main" and "docs-maintenance" have entirely different histories.
main
...
docs-maint
39
Setup.md
39
Setup.md
|
|
@ -1,6 +1,6 @@
|
||||||
# Setup
|
# Setup
|
||||||
|
|
||||||
Last updated: 2026-04-27
|
Last updated: 2026-04-26
|
||||||
|
|
||||||
This is a living setup/status file for the local ChromeCard workspace at `/home/user/chromecard`.
|
This is a living setup/status file for the local ChromeCard workspace at `/home/user/chromecard`.
|
||||||
Update this file whenever environment status or verified behavior changes.
|
Update this file whenever environment status or verified behavior changes.
|
||||||
|
|
@ -611,23 +611,6 @@ Session note (2026-04-25, direct FIDO2 auth attempt):
|
||||||
- the deployed `k_proxy` service was restored to default `probe` mode
|
- the deployed `k_proxy` service was restored to default `probe` mode
|
||||||
- verified `alice` login still works afterward, so the validated Phase 5 baseline remains intact
|
- verified `alice` login still works afterward, so the validated Phase 5 baseline remains intact
|
||||||
|
|
||||||
Session note (2026-04-27, fido2-direct end-to-end browser validation):
|
|
||||||
- Deployed all three services (k_server, k_proxy, k_client_portal) in split-VM chain via SSH/SCP.
|
|
||||||
- k_proxy restarted with --auth-mode fido2-direct.
|
|
||||||
- Full browser flow verified from k_client at http://127.0.0.1:8766 with real card:
|
|
||||||
- Register: makeCredential triggered on card, button press confirmed.
|
|
||||||
- Login: getAssertion triggered on card, button press confirmed.
|
|
||||||
- Counter: k_server returned incremented value.
|
|
||||||
- Logout: session correctly invalidated.
|
|
||||||
- Confirmed: probe mode showed stale directtest enrollment (no credential_data_b64) from earlier session; that is expected.
|
|
||||||
- Bug found and fixed: clicking Register after Login cleared the client-side session token but left the server-side session alive; fix adds a best-effort /session/logout call to k_proxy before re-enrolling.
|
|
||||||
- Current deployed service state:
|
|
||||||
- k_server: https://127.0.0.1:8780, TLS, proxy-token dev-proxy-token
|
|
||||||
- k_proxy: https://127.0.0.1:8771, TLS, --auth-mode fido2-direct, upstream https://127.0.0.1:9780
|
|
||||||
- k_client: http://127.0.0.1:8766, proxy-base-url https://127.0.0.1:9771
|
|
||||||
- Forwards: k_proxy 9780->k_server:8780, k_client 9771->k_proxy:8771
|
|
||||||
- Unit test suite added: tests/test_k_proxy.py (100 tests, all passing, run locally with python3 -m unittest tests/test_k_proxy.py).
|
|
||||||
|
|
||||||
Session note (2026-04-26, markdown maintenance re-scan):
|
Session note (2026-04-26, markdown maintenance re-scan):
|
||||||
- Re-read the maintained workspace markdown set:
|
- Re-read the maintained workspace markdown set:
|
||||||
- `/home/user/chromecard/Setup.md`
|
- `/home/user/chromecard/Setup.md`
|
||||||
|
|
@ -648,26 +631,6 @@ Session note (2026-04-26, markdown maintenance re-scan):
|
||||||
- direct FIDO2 enrollment/login support exists in code and is documented as an optional follow-up path, not the default deployed runtime
|
- direct FIDO2 enrollment/login support exists in code and is documented as an optional follow-up path, not the default deployed runtime
|
||||||
- the main unresolved engineering limit is still the higher-fan-out Qubes forwarding ceiling on the browser-facing path, not basic chain bring-up
|
- the main unresolved engineering limit is still the higher-fan-out Qubes forwarding ceiling on the browser-facing path, not basic chain bring-up
|
||||||
|
|
||||||
Session note (2026-04-27, card emulator and bug fixes):
|
|
||||||
- Added software emulator of the ChromeCard FIDO2 authenticator:
|
|
||||||
- `/home/user/chromecard/tests/card_emulator.py`
|
|
||||||
- implements `make_credential` and `get_assertion` with real P-256 cryptography
|
|
||||||
- in-memory credential store keyed by credential ID (matching firmware layout)
|
|
||||||
- auth_data byte layout and COSE key encoding mirror `fido_make_cred.c` / `fido_get_assertion.c` exactly
|
|
||||||
- `user_confirms=True/False` parameter simulates the card's Yes/No confirmation dialog
|
|
||||||
- `refusing()` method returns a wrapper that forces `user_confirms=False` for integration test paths
|
|
||||||
- `forget_user(username)` simulates card-side credential removal
|
|
||||||
- module docstring is the usage guide
|
|
||||||
- Fixed two bugs in `k_proxy_app.py` that were silently breaking fido2-direct mode:
|
|
||||||
- `RegistrationResponse(id=..., ...)` → `RegistrationResponse(raw_id=..., ...)` (fido2 2.2.0 API)
|
|
||||||
- `AuthenticationResponse(id=..., ...)` → `AuthenticationResponse(raw_id=..., ...)` (same)
|
|
||||||
- both calls raised `TypeError` at runtime, caught by the surrounding `except`, so register and
|
|
||||||
authenticate in fido2-direct mode always returned failure without any visible error
|
|
||||||
- Extended test suite: 22 new tests across `TestCardEmulatorUnit` and `TestCardEmulatorIntegration`
|
|
||||||
- covers: register, authenticate, user-says-no (register and auth), forget, two-user isolation,
|
|
||||||
sign-count monotonicity, wrong RP rejection, empty allow-list rejection
|
|
||||||
- total test count is now 122, all passing locally without card or VMs
|
|
||||||
|
|
||||||
## Known FIDO2 Transport Boundary
|
## Known FIDO2 Transport Boundary
|
||||||
|
|
||||||
- FIDO2 on this firmware is handled via USB HID (CTAPHID), not Wi-Fi/BLE/MQTT.
|
- FIDO2 on this firmware is handled via USB HID (CTAPHID), not Wi-Fi/BLE/MQTT.
|
||||||
|
|
|
||||||
28
Workplan.md
28
Workplan.md
|
|
@ -1,6 +1,6 @@
|
||||||
# Workplan
|
# Workplan
|
||||||
|
|
||||||
Last updated: 2026-04-27
|
Last updated: 2026-04-26
|
||||||
|
|
||||||
This is the execution plan for making ChromeCard FIDO2 development and validation reproducible on this machine.
|
This is the execution plan for making ChromeCard FIDO2 development and validation reproducible on this machine.
|
||||||
|
|
||||||
|
|
@ -549,26 +549,12 @@ Exit criteria:
|
||||||
|
|
||||||
## Current Next Step
|
## Current Next Step
|
||||||
|
|
||||||
Status (2026-04-27):
|
- Treat the default HTTPS split-VM chain as the stable baseline and keep validating it with `/home/user/chromecard/phase5_chain_regression.sh`.
|
||||||
- fido2-direct mode confirmed working end-to-end with real card via browser on k_client.
|
- Push the next engineering cycle toward Phase 6.5 limits:
|
||||||
- Full register → login → counter → logout flow verified with physical card button presses.
|
- reproduce and narrow the `~10` in-flight request ceiling on the browser-facing `k_client -> k_proxy` Qubes forward
|
||||||
- Bug fixed: ClientState.enroll() now calls /session/logout on k_proxy before re-enrolling.
|
- separate Qubes forwarding churn from app-level issues with targeted concurrency probes and log capture
|
||||||
- All three service files refactored and re-deployed.
|
- In parallel, decide whether `--auth-mode fido2-direct` is ready to become the default deployed path or should remain an optional/operator mode.
|
||||||
- Added CardEmulator: software emulator of the ChromeCard FIDO2 authenticator for use in tests.
|
- Keep the regression helpers as the fast check that transport, auth, session reuse, and counter semantics still hold after each change.
|
||||||
- real P-256 crypto; auth_data layout mirrors firmware exactly
|
|
||||||
- user_confirms=True/False simulates card Yes/No; refusing() wrapper for integration test paths
|
|
||||||
- forget_user() simulates card-side key removal
|
|
||||||
- module docstring in tests/card_emulator.py is the usage guide
|
|
||||||
- Fixed two silent fido2-direct bugs: RegistrationResponse and AuthenticationResponse were both
|
|
||||||
constructed with id= instead of raw_id=; all direct-mode register/authenticate calls were failing.
|
|
||||||
- Test suite now at 122 tests (was 100), all passing locally without card or VMs.
|
|
||||||
|
|
||||||
Phase status (2026-04-27):
|
|
||||||
- Phase 6.5 (concurrency): deferred. Ceiling (~10 in-flight) is acceptable until multi-card use cases arrive.
|
|
||||||
- Phase 7 (firmware build/flash): blocked on Chrome Roads (card vendor). No local action until that discussion concludes.
|
|
||||||
- Phase 9 (phone integration): awaiting go-ahead. When approved: Flutter app (iOS + Android) replaces k_proxy; FIDO2 over WiFi to card; depends on Phase 7 firmware capability.
|
|
||||||
|
|
||||||
No active engineering work is unblocked at this time. Resume when Chrome Roads responds or Phase 9 is approved.
|
|
||||||
|
|
||||||
Status (2026-04-26, markdown maintenance):
|
Status (2026-04-26, markdown maintenance):
|
||||||
- Re-scanned `Setup.md`, `Workplan.md`, and `PHASE5_RUNBOOK.md` against the current workspace files.
|
- Re-scanned `Setup.md`, `Workplan.md`, and `PHASE5_RUNBOOK.md` against the current workspace files.
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,9 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
k_client_portal — browser-facing portal running in k_client.
|
Minimal browser-facing client portal for Phase 6 bring-up.
|
||||||
|
|
||||||
Serves the single-page UI and thin API shim that delegates every auth and
|
This runs in k_client, keeps a local preferred username, and talks to k_proxy
|
||||||
resource operation to k_proxy over the localhost-forwarded TLS endpoint.
|
over the localhost-forwarded TLS endpoint.
|
||||||
Persists one preferred username locally; all session and enrollment state
|
|
||||||
lives in k_proxy.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -563,25 +561,18 @@ class ClientState:
|
||||||
self.proxy_base_url = proxy_base_url.rstrip("/")
|
self.proxy_base_url = proxy_base_url.rstrip("/")
|
||||||
self.proxy_ca_file = proxy_ca_file
|
self.proxy_ca_file = proxy_ca_file
|
||||||
self.enroll_db = enroll_db
|
self.enroll_db = enroll_db
|
||||||
# Registration and login both require a physical card touch, which can
|
|
||||||
# take up to ~60 s in practice; 90 s gives a generous margin.
|
|
||||||
self.interactive_timeout_s = interactive_timeout_s
|
self.interactive_timeout_s = interactive_timeout_s
|
||||||
self.default_timeout_s = default_timeout_s
|
self.default_timeout_s = default_timeout_s
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
self.preferred_enrollment: EnrollmentRecord | None = None
|
self.preferred_enrollment: EnrollmentRecord | None = None
|
||||||
self.session_token: str | None = None
|
self.session_token: str | None = None
|
||||||
self.session_expires_at: int | None = None
|
self.session_expires_at: int | None = None
|
||||||
# Build the TLS context once; creating it on every request is expensive
|
|
||||||
# and the CA file doesn't change at runtime.
|
|
||||||
self._ssl_ctx: ssl.SSLContext | None = (
|
|
||||||
ssl.create_default_context(cafile=self.proxy_ca_file)
|
|
||||||
if proxy_base_url.startswith("https://")
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
self._load_preferred_enrollment()
|
self._load_preferred_enrollment()
|
||||||
|
|
||||||
def _ssl_context(self) -> ssl.SSLContext | None:
|
def _ssl_context(self):
|
||||||
return self._ssl_ctx
|
if self.proxy_base_url.startswith("https://"):
|
||||||
|
return ssl.create_default_context(cafile=self.proxy_ca_file)
|
||||||
|
return None
|
||||||
|
|
||||||
def _proxy_json(
|
def _proxy_json(
|
||||||
self,
|
self,
|
||||||
|
|
@ -635,12 +626,6 @@ class ClientState:
|
||||||
username = username.strip()
|
username = username.strip()
|
||||||
if not username:
|
if not username:
|
||||||
return {"ok": False, "error": "username required"}
|
return {"ok": False, "error": "username required"}
|
||||||
# Best-effort: invalidate any active session on k_proxy before re-enrolling.
|
|
||||||
# The new credential will differ from what the old session was issued for.
|
|
||||||
with self.lock:
|
|
||||||
old_token = self.session_token
|
|
||||||
if old_token:
|
|
||||||
self._proxy_json("POST", "/session/logout")
|
|
||||||
status, data = self._proxy_json(
|
status, data = self._proxy_json(
|
||||||
"POST",
|
"POST",
|
||||||
"/enroll/register",
|
"/enroll/register",
|
||||||
|
|
@ -756,15 +741,6 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return {}
|
return {}
|
||||||
return json.loads(raw.decode("utf-8"))
|
return json.loads(raw.decode("utf-8"))
|
||||||
|
|
||||||
def _require_json(self) -> dict[str, Any] | None:
|
|
||||||
# Returns None and sends 400 when the body is unparseable; the caller
|
|
||||||
# should return immediately without sending a second response.
|
|
||||||
try:
|
|
||||||
return self._read_json()
|
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return None
|
|
||||||
|
|
||||||
def do_GET(self) -> None: # noqa: N802
|
def do_GET(self) -> None: # noqa: N802
|
||||||
path = urlparse(self.path).path
|
path = urlparse(self.path).path
|
||||||
if path == "/":
|
if path == "/":
|
||||||
|
|
@ -785,22 +761,28 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
def do_POST(self) -> None: # noqa: N802
|
def do_POST(self) -> None: # noqa: N802
|
||||||
path = urlparse(self.path).path
|
path = urlparse(self.path).path
|
||||||
if path == "/api/enroll":
|
if path == "/api/enroll":
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
result = self.state.enroll(str(data.get("username", "")))
|
result = self.state.enroll(str(data.get("username", "")))
|
||||||
self._json(200 if result.get("ok") else 400, result)
|
self._json(200 if result.get("ok") else 400, result)
|
||||||
return
|
return
|
||||||
if path == "/api/login":
|
if path == "/api/login":
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
status, data = self.state.login(str(data.get("username", "")))
|
status, data = self.state.login(str(data.get("username", "")))
|
||||||
self._json(status, data)
|
self._json(status, data)
|
||||||
return
|
return
|
||||||
if path == "/api/enroll/delete":
|
if path == "/api/enroll/delete":
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
status, data = self.state.delete_enrollment(str(data.get("username", "")))
|
status, data = self.state.delete_enrollment(str(data.get("username", "")))
|
||||||
self._json(status, data)
|
self._json(status, data)
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,16 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
k_proxy — session gateway and card authentication bridge.
|
Minimal k_proxy service for Phase 5 bring-up.
|
||||||
|
|
||||||
Creates short-lived bearer sessions after a card-backed auth gate, then
|
Behavior:
|
||||||
proxies authenticated requests to k_server. Enrollment metadata and session
|
- Creates short-lived sessions after a card-backed auth gate.
|
||||||
state are both process-local; sessions do not survive a restart.
|
- Reuses valid sessions to access k_server protected counter endpoint.
|
||||||
|
- Supports enrollment, session status, and logout.
|
||||||
|
|
||||||
Default auth mode is a lightweight card-presence probe (subprocess call to
|
Notes:
|
||||||
fido2_probe.py). Pass --auth-mode fido2-direct for real CTAP2
|
- Default runtime still uses the legacy card-presence probe gate.
|
||||||
makeCredential/getAssertion against the attached ChromeCard.
|
- Experimental direct FIDO2 registration/assertion lives behind `--auth-mode fido2-direct`.
|
||||||
|
- This is still a prototype and not a final production auth design.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -53,11 +55,8 @@ from fido2.webauthn import (
|
||||||
UserVerificationRequirement,
|
UserVerificationRequirement,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None:
|
||||||
if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None:
|
|
||||||
fido2.features.webauthn_json_mapping.enabled = True
|
fido2.features.webauthn_json_mapping.enabled = True
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
HTML = """<!doctype html>
|
HTML = """<!doctype html>
|
||||||
|
|
@ -544,7 +543,6 @@ class ProxyState:
|
||||||
return time.time()
|
return time.time()
|
||||||
|
|
||||||
def _gc_locked(self) -> None:
|
def _gc_locked(self) -> None:
|
||||||
# Caller must hold self.lock.
|
|
||||||
now = self._now()
|
now = self._now()
|
||||||
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
|
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
|
||||||
for token in dead:
|
for token in dead:
|
||||||
|
|
@ -673,9 +671,6 @@ class ProxyState:
|
||||||
self._drop_direct_device_locked()
|
self._drop_direct_device_locked()
|
||||||
|
|
||||||
def _with_direct_ctap2(self, action):
|
def _with_direct_ctap2(self, action):
|
||||||
# First attempt reuses the cached handle; if it fails (e.g. the card was
|
|
||||||
# briefly removed or the CTAPHID channel desynchronised), we reopen once
|
|
||||||
# and retry before propagating the error.
|
|
||||||
with self.direct_device_lock:
|
with self.direct_device_lock:
|
||||||
last_exc: Exception | None = None
|
last_exc: Exception | None = None
|
||||||
for reopen in (False, True):
|
for reopen in (False, True):
|
||||||
|
|
@ -757,7 +752,7 @@ class ProxyState:
|
||||||
auth_data = self.fido_server.register_complete(
|
auth_data = self.fido_server.register_complete(
|
||||||
state,
|
state,
|
||||||
RegistrationResponse(
|
RegistrationResponse(
|
||||||
raw_id=attestation.auth_data.credential_data.credential_id,
|
id=attestation.auth_data.credential_data.credential_id,
|
||||||
response=AuthenticatorAttestationResponse(
|
response=AuthenticatorAttestationResponse(
|
||||||
client_data=client_data,
|
client_data=client_data,
|
||||||
attestation_object=AttestationObject.create(
|
attestation_object=AttestationObject.create(
|
||||||
|
|
@ -888,7 +883,7 @@ class ProxyState:
|
||||||
state,
|
state,
|
||||||
[credential],
|
[credential],
|
||||||
AuthenticationResponse(
|
AuthenticationResponse(
|
||||||
raw_id=response.credential["id"],
|
id=response.credential["id"],
|
||||||
response=AuthenticatorAssertionResponse(
|
response=AuthenticatorAssertionResponse(
|
||||||
client_data=client_data,
|
client_data=client_data,
|
||||||
authenticator_data=response.auth_data,
|
authenticator_data=response.auth_data,
|
||||||
|
|
@ -964,8 +959,6 @@ class UpstreamPool:
|
||||||
conn.request("POST", full_path, body=body, headers=req_headers)
|
conn.request("POST", full_path, body=body, headers=req_headers)
|
||||||
resp = conn.getresponse()
|
resp = conn.getresponse()
|
||||||
raw = resp.read()
|
raw = resp.read()
|
||||||
# will_close is set by the server when it intends to close the connection
|
|
||||||
# after this response; reusing such a connection would hit an EOF.
|
|
||||||
reusable = not resp.will_close
|
reusable = not resp.will_close
|
||||||
try:
|
try:
|
||||||
data = json.loads(raw.decode("utf-8")) if raw else {}
|
data = json.loads(raw.decode("utf-8")) if raw else {}
|
||||||
|
|
@ -1008,20 +1001,10 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return json.loads(raw.decode("utf-8"))
|
return json.loads(raw.decode("utf-8"))
|
||||||
|
|
||||||
def _discard_request_body(self) -> None:
|
def _discard_request_body(self) -> None:
|
||||||
# HTTP/1.1 keep-alive: body must be consumed before the response is sent.
|
|
||||||
length = int(self.headers.get("Content-Length", "0"))
|
length = int(self.headers.get("Content-Length", "0"))
|
||||||
if length > 0:
|
if length > 0:
|
||||||
self.rfile.read(length)
|
self.rfile.read(length)
|
||||||
|
|
||||||
def _require_json(self) -> dict[str, Any] | None:
|
|
||||||
# Returns None and sends 400 when the body is unparseable; callers must
|
|
||||||
# return immediately without sending a second response.
|
|
||||||
try:
|
|
||||||
return self._read_json()
|
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _bearer_token(self) -> str | None:
|
def _bearer_token(self) -> str | None:
|
||||||
value = self.headers.get("Authorization", "")
|
value = self.headers.get("Authorization", "")
|
||||||
if not value.startswith("Bearer "):
|
if not value.startswith("Bearer "):
|
||||||
|
|
@ -1030,8 +1013,6 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return token or None
|
return token or None
|
||||||
|
|
||||||
def _require_session(self) -> tuple[str, Session] | None:
|
def _require_session(self) -> tuple[str, Session] | None:
|
||||||
# Returns None when auth fails; the 401 has already been sent, so callers
|
|
||||||
# must return immediately without writing a second response.
|
|
||||||
token = self._bearer_token()
|
token = self._bearer_token()
|
||||||
if not token:
|
if not token:
|
||||||
self._json(401, {"ok": False, "error": "missing bearer token"})
|
self._json(401, {"ok": False, "error": "missing bearer token"})
|
||||||
|
|
@ -1092,8 +1073,10 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self.send_error(404)
|
self.send_error(404)
|
||||||
|
|
||||||
def _session_login(self) -> None:
|
def _session_login(self) -> None:
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -1124,8 +1107,10 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _enroll_register(self) -> None:
|
def _enroll_register(self) -> None:
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -1146,8 +1131,10 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at))
|
self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at))
|
||||||
|
|
||||||
def _enroll_update(self) -> None:
|
def _enroll_update(self) -> None:
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
enrollment = self.state.update_enrollment(
|
enrollment = self.state.update_enrollment(
|
||||||
|
|
@ -1163,8 +1150,10 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self._json(200, enrollment_payload(enrollment))
|
self._json(200, enrollment_payload(enrollment))
|
||||||
|
|
||||||
def _enroll_delete(self) -> None:
|
def _enroll_delete(self) -> None:
|
||||||
data = self._require_json()
|
try:
|
||||||
if data is None:
|
data = self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
enrollment = self.state.delete_enrollment(str(data.get("username", "")))
|
enrollment = self.state.delete_enrollment(str(data.get("username", "")))
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,11 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
k_server — protected resource backend.
|
Minimal k_server service for Phase 5/5.5 bring-up.
|
||||||
|
|
||||||
Exposes a monotonic counter behind a shared proxy token. Only k_proxy
|
Behavior:
|
||||||
is expected to reach this service; k_client should have no direct path.
|
- Exposes a protected monotonic counter endpoint.
|
||||||
All state is process-local and resets on restart.
|
- Accepts only requests from k_proxy via a shared proxy token header.
|
||||||
|
- Uses thread-safe counter increments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -20,7 +21,6 @@ from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
class ServerState:
|
class ServerState:
|
||||||
# All state is process-local; a restart resets the counter to zero.
|
|
||||||
def __init__(self, proxy_token: str):
|
def __init__(self, proxy_token: str):
|
||||||
self.proxy_token = proxy_token
|
self.proxy_token = proxy_token
|
||||||
self.counter = 0
|
self.counter = 0
|
||||||
|
|
@ -45,8 +45,6 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self.wfile.write(body)
|
self.wfile.write(body)
|
||||||
|
|
||||||
def _discard_request_body(self) -> None:
|
def _discard_request_body(self) -> None:
|
||||||
# HTTP/1.1 keep-alive: the connection is reused, so the body must be fully
|
|
||||||
# consumed before we send the response, even for endpoints that ignore it.
|
|
||||||
length = int(self.headers.get("Content-Length", "0"))
|
length = int(self.headers.get("Content-Length", "0"))
|
||||||
if length > 0:
|
if length > 0:
|
||||||
self.rfile.read(length)
|
self.rfile.read(length)
|
||||||
|
|
|
||||||
|
|
@ -1,339 +0,0 @@
|
||||||
"""
|
|
||||||
CardEmulator — software emulator of the ChromeCard FIDO2 authenticator
|
|
||||||
======================================================================
|
|
||||||
|
|
||||||
What it is
|
|
||||||
----------
|
|
||||||
CardEmulator is a drop-in replacement for the physical ChromeCard in tests.
|
|
||||||
It implements the two Ctap2 methods that k_proxy_app calls in fido2-direct
|
|
||||||
mode — make_credential (registration) and get_assertion (authentication) —
|
|
||||||
using real P-256 cryptography and an in-memory credential store.
|
|
||||||
|
|
||||||
The auth_data layout and COSE key encoding mirror the firmware exactly
|
|
||||||
(see fido_make_cred.c and fido_get_assertion.c), so fido2.server's
|
|
||||||
register_complete and authenticate_complete accept the emulator's responses
|
|
||||||
without any extra patching.
|
|
||||||
|
|
||||||
|
|
||||||
Wiring the emulator into a ProxyState test
|
|
||||||
------------------------------------------
|
|
||||||
Two patches are needed: one to replace _with_direct_ctap2 so it hands the
|
|
||||||
emulator to the lambda instead of opening a real HID device, and one to
|
|
||||||
suppress _drop_direct_device which would otherwise try to close a real handle.
|
|
||||||
|
|
||||||
A convenience helper for this is provided in test_k_proxy.py:
|
|
||||||
|
|
||||||
def _patch_emulator(state, emulator):
|
|
||||||
return patch.multiple(
|
|
||||||
state,
|
|
||||||
_with_direct_ctap2=lambda fn: fn(emulator),
|
|
||||||
_drop_direct_device=lambda: None,
|
|
||||||
)
|
|
||||||
|
|
||||||
Typical test setup:
|
|
||||||
|
|
||||||
from card_emulator import CardEmulator
|
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
emulator = CardEmulator()
|
|
||||||
state = _make_state(tmp_path, auth_mode=AUTH_MODE_FIDO2_DIRECT)
|
|
||||||
|
|
||||||
with _patch_emulator(state, emulator):
|
|
||||||
enrollment = state.register_enrollment("alice", "Alice")
|
|
||||||
ok, msg = state.authenticate_with_card("alice") # True, "assertion verified"
|
|
||||||
|
|
||||||
|
|
||||||
User confirmation (Yes / No on the card)
|
|
||||||
-----------------------------------------
|
|
||||||
Both make_credential and get_assertion accept a `user_confirms` keyword
|
|
||||||
argument (default True). Setting it to False raises
|
|
||||||
CtapError(OPERATION_DENIED), exactly as the firmware does when the user taps
|
|
||||||
No on the card's confirmation dialog.
|
|
||||||
|
|
||||||
Direct calls — pass the flag explicitly:
|
|
||||||
|
|
||||||
attest = emulator.make_credential(
|
|
||||||
client_data_hash=..., rp=..., user=..., key_params=...,
|
|
||||||
user_confirms=False,
|
|
||||||
) # raises CtapError(OPERATION_DENIED)
|
|
||||||
|
|
||||||
Integration tests through _with_direct_ctap2 — the lambda that ProxyState
|
|
||||||
builds cannot inject user_confirms, so use refusing() instead. It returns
|
|
||||||
a thin wrapper whose methods forward to the emulator with user_confirms=False:
|
|
||||||
|
|
||||||
with _patch_emulator(state, emulator.refusing()):
|
|
||||||
ok, msg = state.authenticate_with_card("alice") # False
|
|
||||||
# msg contains "assertion verification failed: CTAP error: OPERATION_DENIED"
|
|
||||||
|
|
||||||
with _patch_emulator(state, emulator.refusing()):
|
|
||||||
with self.assertRaises(RuntimeError):
|
|
||||||
state.register_enrollment("bob", None)
|
|
||||||
# raises RuntimeError("card registration failed: ...")
|
|
||||||
|
|
||||||
refusing() shares the same credential store as the parent emulator, so
|
|
||||||
credentials registered before or after the call are visible to both.
|
|
||||||
|
|
||||||
|
|
||||||
Simulating card-side credential removal
|
|
||||||
----------------------------------------
|
|
||||||
forget_user(username) removes all credentials for that user from the
|
|
||||||
emulator's store and returns the count removed. Use it to simulate a
|
|
||||||
factory reset or a deliberate key deletion:
|
|
||||||
|
|
||||||
emulator.forget_user("alice")
|
|
||||||
ok, msg = state.authenticate_with_card("alice") # False
|
|
||||||
|
|
||||||
|
|
||||||
API summary
|
|
||||||
-----------
|
|
||||||
CardEmulator()
|
|
||||||
Create a new emulator with an empty credential store.
|
|
||||||
|
|
||||||
make_credential(client_data_hash, rp, user, key_params, *, user_confirms=True)
|
|
||||||
Simulate CTAP2 makeCredential. Returns AttestationResponse.
|
|
||||||
|
|
||||||
get_assertion(rp_id, client_data_hash, allow_list, *, user_confirms=True)
|
|
||||||
Simulate CTAP2 getAssertion. Returns AssertionResponse.
|
|
||||||
Raises CtapError(NO_CREDENTIALS) if no matching credential is found.
|
|
||||||
|
|
||||||
refusing() -> _RefusingView
|
|
||||||
Return a wrapper that forces user_confirms=False on every call.
|
|
||||||
|
|
||||||
forget_user(username) -> int
|
|
||||||
Remove all credentials for username. Returns count removed.
|
|
||||||
|
|
||||||
credential_count() -> int
|
|
||||||
Total credentials currently in the store.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import hashlib
|
|
||||||
import os
|
|
||||||
import struct
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any, Mapping
|
|
||||||
|
|
||||||
from cryptography.hazmat.primitives import hashes
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
|
||||||
from fido2.ctap import CtapError
|
|
||||||
from fido2.ctap2 import AssertionResponse, AttestationResponse
|
|
||||||
from fido2.webauthn import AuthenticatorData
|
|
||||||
|
|
||||||
# AAGUID from fido_make_cred.c
|
|
||||||
_AAGUID = bytes([
|
|
||||||
0x12, 0x34, 0x56, 0x78, 0x90, 0xAB, 0xCD, 0xEF,
|
|
||||||
0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF,
|
|
||||||
])
|
|
||||||
|
|
||||||
|
|
||||||
def _cose_es256(x: bytes, y: bytes) -> bytes:
|
|
||||||
"""CBOR-encoded COSE ES256 public key, byte-for-byte as the firmware builds it."""
|
|
||||||
return (
|
|
||||||
bytes([0xA5]) # map(5)
|
|
||||||
+ bytes([0x01, 0x02]) # kty: 2 (EC2)
|
|
||||||
+ bytes([0x03, 0x26]) # alg: -7 (ES256)
|
|
||||||
+ bytes([0x20, 0x01]) # crv (-1): 1 (P-256)
|
|
||||||
+ bytes([0x21, 0x58, 0x20]) + x # x (-2): bstr(32)
|
|
||||||
+ bytes([0x22, 0x58, 0x20]) + y # y (-3): bstr(32)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class _Credential:
|
|
||||||
private_key: ec.EllipticCurvePrivateKey
|
|
||||||
rp_id_hash: bytes
|
|
||||||
user_id: bytes
|
|
||||||
username: str
|
|
||||||
sign_count: int = 0
|
|
||||||
|
|
||||||
|
|
||||||
class CardEmulator:
|
|
||||||
"""In-process emulator of a ChromeCard FIDO2 authenticator.
|
|
||||||
|
|
||||||
Implements make_credential and get_assertion with the same signatures as
|
|
||||||
fido2.ctap2.Ctap2, plus forget_user() for test teardown.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self._creds: dict[bytes, _Credential] = {}
|
|
||||||
|
|
||||||
# ── CTAP2 interface ───────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def make_credential(
|
|
||||||
self,
|
|
||||||
client_data_hash: bytes,
|
|
||||||
rp: Mapping[str, Any],
|
|
||||||
user: Mapping[str, Any],
|
|
||||||
key_params: list[Mapping[str, Any]],
|
|
||||||
exclude_list: list[Mapping[str, Any]] | None = None,
|
|
||||||
extensions: Mapping[str, Any] | None = None,
|
|
||||||
options: Mapping[str, Any] | None = None,
|
|
||||||
*,
|
|
||||||
user_confirms: bool = True,
|
|
||||||
**_: Any,
|
|
||||||
) -> AttestationResponse:
|
|
||||||
"""Simulate makeCredential.
|
|
||||||
|
|
||||||
When user_confirms is False the call raises CtapError(OPERATION_DENIED),
|
|
||||||
mirroring the firmware's response when the user taps No on the card.
|
|
||||||
Otherwise a real P-256 keypair is generated, stored, and returned as a
|
|
||||||
fmt='none' AttestationResponse with a valid COSE ES256 public key.
|
|
||||||
"""
|
|
||||||
if not user_confirms:
|
|
||||||
raise CtapError(CtapError.ERR.OPERATION_DENIED)
|
|
||||||
|
|
||||||
rp_id: str = rp["id"]
|
|
||||||
rp_id_hash = hashlib.sha256(rp_id.encode()).digest()
|
|
||||||
|
|
||||||
priv = ec.generate_private_key(ec.SECP256R1())
|
|
||||||
pub_nums = priv.public_key().public_numbers()
|
|
||||||
x = pub_nums.x.to_bytes(32, "big")
|
|
||||||
y = pub_nums.y.to_bytes(32, "big")
|
|
||||||
|
|
||||||
credential_id = os.urandom(32)
|
|
||||||
|
|
||||||
raw_user_id: bytes = user.get("id", b"") # type: ignore[assignment]
|
|
||||||
if isinstance(raw_user_id, str):
|
|
||||||
raw_user_id = raw_user_id.encode()
|
|
||||||
|
|
||||||
cred = _Credential(
|
|
||||||
private_key=priv,
|
|
||||||
rp_id_hash=rp_id_hash,
|
|
||||||
user_id=raw_user_id,
|
|
||||||
username=user.get("name", ""), # type: ignore[arg-type]
|
|
||||||
)
|
|
||||||
|
|
||||||
# authData layout matches fido_make_cred.c build_make_credential_response():
|
|
||||||
# rpIdHash(32) | flags(1) | signCount(4) | aaguid(16)
|
|
||||||
# | credIdLen(2) | credId(32) | coseKey(77)
|
|
||||||
auth_data_bytes = (
|
|
||||||
rp_id_hash
|
|
||||||
+ bytes([0x41]) # flags: UP=1, AT=1
|
|
||||||
+ struct.pack(">I", cred.sign_count)
|
|
||||||
+ _AAGUID
|
|
||||||
+ struct.pack(">H", len(credential_id))
|
|
||||||
+ credential_id
|
|
||||||
+ _cose_es256(x, y)
|
|
||||||
)
|
|
||||||
cred.sign_count += 1
|
|
||||||
self._creds[credential_id] = cred
|
|
||||||
|
|
||||||
return AttestationResponse(
|
|
||||||
fmt="none",
|
|
||||||
auth_data=AuthenticatorData(auth_data_bytes),
|
|
||||||
att_stmt={},
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_assertion(
|
|
||||||
self,
|
|
||||||
rp_id: str,
|
|
||||||
client_data_hash: bytes,
|
|
||||||
allow_list: list[Mapping[str, Any]] | None = None,
|
|
||||||
extensions: Mapping[str, Any] | None = None,
|
|
||||||
options: Mapping[str, Any] | None = None,
|
|
||||||
*,
|
|
||||||
user_confirms: bool = True,
|
|
||||||
**_: Any,
|
|
||||||
) -> AssertionResponse:
|
|
||||||
"""Simulate getAssertion.
|
|
||||||
|
|
||||||
When user_confirms is False raises CtapError(OPERATION_DENIED).
|
|
||||||
Otherwise finds the credential, builds authData (37 bytes, no AT flag),
|
|
||||||
signs authData || clientDataHash with ECDSA-SHA256 (DER), and returns
|
|
||||||
an AssertionResponse — byte-for-byte compatible with the firmware output.
|
|
||||||
"""
|
|
||||||
if not user_confirms:
|
|
||||||
raise CtapError(CtapError.ERR.OPERATION_DENIED)
|
|
||||||
|
|
||||||
rp_id_hash = hashlib.sha256(rp_id.encode()).digest()
|
|
||||||
|
|
||||||
if not allow_list:
|
|
||||||
raise CtapError(CtapError.ERR.NO_CREDENTIALS)
|
|
||||||
|
|
||||||
cred_id: bytes | None = None
|
|
||||||
cred: _Credential | None = None
|
|
||||||
for desc in allow_list:
|
|
||||||
cid: bytes = desc["id"] if isinstance(desc, dict) else getattr(desc, "id")
|
|
||||||
entry = self._creds.get(cid)
|
|
||||||
if entry is not None and entry.rp_id_hash == rp_id_hash:
|
|
||||||
cred_id = cid
|
|
||||||
cred = entry
|
|
||||||
break
|
|
||||||
|
|
||||||
if cred is None or cred_id is None:
|
|
||||||
raise CtapError(CtapError.ERR.NO_CREDENTIALS)
|
|
||||||
|
|
||||||
# authData layout matches fido_get_assertion.c build_get_assertion_response():
|
|
||||||
# rpIdHash(32) | flags(1) | signCount(4)
|
|
||||||
auth_data_bytes = (
|
|
||||||
rp_id_hash
|
|
||||||
+ bytes([0x01]) # flags: UP=1
|
|
||||||
+ struct.pack(">I", cred.sign_count)
|
|
||||||
)
|
|
||||||
cred.sign_count += 1
|
|
||||||
|
|
||||||
# Signature over authData || clientDataHash, DER-encoded.
|
|
||||||
# Matches drv_crypto_sign_hash_DER() in the firmware.
|
|
||||||
sig = cred.private_key.sign(
|
|
||||||
auth_data_bytes + client_data_hash,
|
|
||||||
ec.ECDSA(hashes.SHA256()),
|
|
||||||
)
|
|
||||||
|
|
||||||
user_field: dict[str, Any] | None = {"id": cred.user_id} if cred.user_id else None
|
|
||||||
|
|
||||||
return AssertionResponse(
|
|
||||||
credential={"id": cred_id, "type": "public-key"},
|
|
||||||
auth_data=AuthenticatorData(auth_data_bytes),
|
|
||||||
signature=sig,
|
|
||||||
user=user_field,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── test helpers ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def refusing(self) -> "_RefusingView":
|
|
||||||
"""Return a view of this emulator that always declines confirmation.
|
|
||||||
|
|
||||||
Use this when the test goes through _with_direct_ctap2, which calls
|
|
||||||
make_credential / get_assertion via a lambda and cannot inject
|
|
||||||
user_confirms directly:
|
|
||||||
|
|
||||||
with patch.object(state, "_with_direct_ctap2",
|
|
||||||
side_effect=lambda fn: fn(emulator.refusing())):
|
|
||||||
ok, msg = state.authenticate_with_card("alice")
|
|
||||||
self.assertFalse(ok)
|
|
||||||
"""
|
|
||||||
return _RefusingView(self)
|
|
||||||
|
|
||||||
def forget_user(self, username: str) -> int:
|
|
||||||
"""Remove all credentials for *username* from the emulator store.
|
|
||||||
|
|
||||||
Returns the number of credentials removed. Use this to simulate a
|
|
||||||
card-side credential deletion (factory reset, or deliberate removal).
|
|
||||||
"""
|
|
||||||
to_delete = [cid for cid, e in self._creds.items() if e.username == username]
|
|
||||||
for cid in to_delete:
|
|
||||||
del self._creds[cid]
|
|
||||||
return len(to_delete)
|
|
||||||
|
|
||||||
def credential_count(self) -> int:
|
|
||||||
"""Total number of credentials currently in the emulator store."""
|
|
||||||
return len(self._creds)
|
|
||||||
|
|
||||||
|
|
||||||
class _RefusingView:
|
|
||||||
"""Thin proxy returned by CardEmulator.refusing().
|
|
||||||
|
|
||||||
Forwards make_credential and get_assertion to the underlying emulator
|
|
||||||
with user_confirms=False, so every call raises OPERATION_DENIED.
|
|
||||||
The credential store is shared with the parent emulator.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, emulator: CardEmulator) -> None:
|
|
||||||
self._emulator = emulator
|
|
||||||
|
|
||||||
def make_credential(self, **kwargs: Any) -> AttestationResponse:
|
|
||||||
return self._emulator.make_credential(**kwargs, user_confirms=False)
|
|
||||||
|
|
||||||
def get_assertion(self, **kwargs: Any) -> AssertionResponse:
|
|
||||||
return self._emulator.get_assertion(**kwargs, user_confirms=False)
|
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue