Compare commits
7 Commits
docs-maint
...
main
| Author | SHA1 | Date |
|---|---|---|
|
|
35c40985dd | |
|
|
56132528fe | |
|
|
23c37f4590 | |
|
|
9d6da53b8f | |
|
|
855b4175bc | |
|
|
2cf44e97df | |
|
|
e7212b49a0 |
39
Setup.md
39
Setup.md
|
|
@ -1,6 +1,6 @@
|
||||||
# Setup
|
# Setup
|
||||||
|
|
||||||
Last updated: 2026-04-26
|
Last updated: 2026-04-27
|
||||||
|
|
||||||
This is a living setup/status file for the local ChromeCard workspace at `/home/user/chromecard`.
|
This is a living setup/status file for the local ChromeCard workspace at `/home/user/chromecard`.
|
||||||
Update this file whenever environment status or verified behavior changes.
|
Update this file whenever environment status or verified behavior changes.
|
||||||
|
|
@ -611,6 +611,23 @@ Session note (2026-04-25, direct FIDO2 auth attempt):
|
||||||
- the deployed `k_proxy` service was restored to default `probe` mode
|
- the deployed `k_proxy` service was restored to default `probe` mode
|
||||||
- verified `alice` login still works afterward, so the validated Phase 5 baseline remains intact
|
- verified `alice` login still works afterward, so the validated Phase 5 baseline remains intact
|
||||||
|
|
||||||
|
Session note (2026-04-27, fido2-direct end-to-end browser validation):
|
||||||
|
- Deployed all three services (k_server, k_proxy, k_client_portal) in split-VM chain via SSH/SCP.
|
||||||
|
- k_proxy restarted with --auth-mode fido2-direct.
|
||||||
|
- Full browser flow verified from k_client at http://127.0.0.1:8766 with real card:
|
||||||
|
- Register: makeCredential triggered on card, button press confirmed.
|
||||||
|
- Login: getAssertion triggered on card, button press confirmed.
|
||||||
|
- Counter: k_server returned incremented value.
|
||||||
|
- Logout: session correctly invalidated.
|
||||||
|
- Confirmed: probe mode showed stale directtest enrollment (no credential_data_b64) from earlier session; that is expected.
|
||||||
|
- Bug found and fixed: clicking Register after Login cleared the client-side session token but left the server-side session alive; fix adds a best-effort /session/logout call to k_proxy before re-enrolling.
|
||||||
|
- Current deployed service state:
|
||||||
|
- k_server: https://127.0.0.1:8780, TLS, proxy-token dev-proxy-token
|
||||||
|
- k_proxy: https://127.0.0.1:8771, TLS, --auth-mode fido2-direct, upstream https://127.0.0.1:9780
|
||||||
|
- k_client: http://127.0.0.1:8766, proxy-base-url https://127.0.0.1:9771
|
||||||
|
- Forwards: k_proxy 9780->k_server:8780, k_client 9771->k_proxy:8771
|
||||||
|
- Unit test suite added: tests/test_k_proxy.py (100 tests, all passing, run locally with python3 -m unittest tests/test_k_proxy.py).
|
||||||
|
|
||||||
Session note (2026-04-26, markdown maintenance re-scan):
|
Session note (2026-04-26, markdown maintenance re-scan):
|
||||||
- Re-read the maintained workspace markdown set:
|
- Re-read the maintained workspace markdown set:
|
||||||
- `/home/user/chromecard/Setup.md`
|
- `/home/user/chromecard/Setup.md`
|
||||||
|
|
@ -631,6 +648,26 @@ Session note (2026-04-26, markdown maintenance re-scan):
|
||||||
- direct FIDO2 enrollment/login support exists in code and is documented as an optional follow-up path, not the default deployed runtime
|
- direct FIDO2 enrollment/login support exists in code and is documented as an optional follow-up path, not the default deployed runtime
|
||||||
- the main unresolved engineering limit is still the higher-fan-out Qubes forwarding ceiling on the browser-facing path, not basic chain bring-up
|
- the main unresolved engineering limit is still the higher-fan-out Qubes forwarding ceiling on the browser-facing path, not basic chain bring-up
|
||||||
|
|
||||||
|
Session note (2026-04-27, card emulator and bug fixes):
|
||||||
|
- Added software emulator of the ChromeCard FIDO2 authenticator:
|
||||||
|
- `/home/user/chromecard/tests/card_emulator.py`
|
||||||
|
- implements `make_credential` and `get_assertion` with real P-256 cryptography
|
||||||
|
- in-memory credential store keyed by credential ID (matching firmware layout)
|
||||||
|
- auth_data byte layout and COSE key encoding mirror `fido_make_cred.c` / `fido_get_assertion.c` exactly
|
||||||
|
- `user_confirms=True/False` parameter simulates the card's Yes/No confirmation dialog
|
||||||
|
- `refusing()` method returns a wrapper that forces `user_confirms=False` for integration test paths
|
||||||
|
- `forget_user(username)` simulates card-side credential removal
|
||||||
|
- module docstring is the usage guide
|
||||||
|
- Fixed two bugs in `k_proxy_app.py` that were silently breaking fido2-direct mode:
|
||||||
|
- `RegistrationResponse(id=..., ...)` → `RegistrationResponse(raw_id=..., ...)` (fido2 2.2.0 API)
|
||||||
|
- `AuthenticationResponse(id=..., ...)` → `AuthenticationResponse(raw_id=..., ...)` (same)
|
||||||
|
- both calls raised `TypeError` at runtime, caught by the surrounding `except`, so register and
|
||||||
|
authenticate in fido2-direct mode always returned failure without any visible error
|
||||||
|
- Extended test suite: 22 new tests across `TestCardEmulatorUnit` and `TestCardEmulatorIntegration`
|
||||||
|
- covers: register, authenticate, user-says-no (register and auth), forget, two-user isolation,
|
||||||
|
sign-count monotonicity, wrong RP rejection, empty allow-list rejection
|
||||||
|
- total test count is now 122, all passing locally without card or VMs
|
||||||
|
|
||||||
## Known FIDO2 Transport Boundary
|
## Known FIDO2 Transport Boundary
|
||||||
|
|
||||||
- FIDO2 on this firmware is handled via USB HID (CTAPHID), not Wi-Fi/BLE/MQTT.
|
- FIDO2 on this firmware is handled via USB HID (CTAPHID), not Wi-Fi/BLE/MQTT.
|
||||||
|
|
|
||||||
28
Workplan.md
28
Workplan.md
|
|
@ -1,6 +1,6 @@
|
||||||
# Workplan
|
# Workplan
|
||||||
|
|
||||||
Last updated: 2026-04-26
|
Last updated: 2026-04-27
|
||||||
|
|
||||||
This is the execution plan for making ChromeCard FIDO2 development and validation reproducible on this machine.
|
This is the execution plan for making ChromeCard FIDO2 development and validation reproducible on this machine.
|
||||||
|
|
||||||
|
|
@ -549,12 +549,26 @@ Exit criteria:
|
||||||
|
|
||||||
## Current Next Step
|
## Current Next Step
|
||||||
|
|
||||||
- Treat the default HTTPS split-VM chain as the stable baseline and keep validating it with `/home/user/chromecard/phase5_chain_regression.sh`.
|
Status (2026-04-27):
|
||||||
- Push the next engineering cycle toward Phase 6.5 limits:
|
- fido2-direct mode confirmed working end-to-end with real card via browser on k_client.
|
||||||
- reproduce and narrow the `~10` in-flight request ceiling on the browser-facing `k_client -> k_proxy` Qubes forward
|
- Full register → login → counter → logout flow verified with physical card button presses.
|
||||||
- separate Qubes forwarding churn from app-level issues with targeted concurrency probes and log capture
|
- Bug fixed: ClientState.enroll() now calls /session/logout on k_proxy before re-enrolling.
|
||||||
- In parallel, decide whether `--auth-mode fido2-direct` is ready to become the default deployed path or should remain an optional/operator mode.
|
- All three service files refactored and re-deployed.
|
||||||
- Keep the regression helpers as the fast check that transport, auth, session reuse, and counter semantics still hold after each change.
|
- Added CardEmulator: software emulator of the ChromeCard FIDO2 authenticator for use in tests.
|
||||||
|
- real P-256 crypto; auth_data layout mirrors firmware exactly
|
||||||
|
- user_confirms=True/False simulates card Yes/No; refusing() wrapper for integration test paths
|
||||||
|
- forget_user() simulates card-side key removal
|
||||||
|
- module docstring in tests/card_emulator.py is the usage guide
|
||||||
|
- Fixed two silent fido2-direct bugs: RegistrationResponse and AuthenticationResponse were both
|
||||||
|
constructed with id= instead of raw_id=; all direct-mode register/authenticate calls were failing.
|
||||||
|
- Test suite now at 122 tests (was 100), all passing locally without card or VMs.
|
||||||
|
|
||||||
|
Phase status (2026-04-27):
|
||||||
|
- Phase 6.5 (concurrency): deferred. Ceiling (~10 in-flight) is acceptable until multi-card use cases arrive.
|
||||||
|
- Phase 7 (firmware build/flash): blocked on Chrome Roads (card vendor). No local action until that discussion concludes.
|
||||||
|
- Phase 9 (phone integration): awaiting go-ahead. When approved: Flutter app (iOS + Android) replaces k_proxy; FIDO2 over WiFi to card; depends on Phase 7 firmware capability.
|
||||||
|
|
||||||
|
No active engineering work is unblocked at this time. Resume when Chrome Roads responds or Phase 9 is approved.
|
||||||
|
|
||||||
Status (2026-04-26, markdown maintenance):
|
Status (2026-04-26, markdown maintenance):
|
||||||
- Re-scanned `Setup.md`, `Workplan.md`, and `PHASE5_RUNBOOK.md` against the current workspace files.
|
- Re-scanned `Setup.md`, `Workplan.md`, and `PHASE5_RUNBOOK.md` against the current workspace files.
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,11 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Minimal browser-facing client portal for Phase 6 bring-up.
|
k_client_portal — browser-facing portal running in k_client.
|
||||||
|
|
||||||
This runs in k_client, keeps a local preferred username, and talks to k_proxy
|
Serves the single-page UI and thin API shim that delegates every auth and
|
||||||
over the localhost-forwarded TLS endpoint.
|
resource operation to k_proxy over the localhost-forwarded TLS endpoint.
|
||||||
|
Persists one preferred username locally; all session and enrollment state
|
||||||
|
lives in k_proxy.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -561,18 +563,25 @@ class ClientState:
|
||||||
self.proxy_base_url = proxy_base_url.rstrip("/")
|
self.proxy_base_url = proxy_base_url.rstrip("/")
|
||||||
self.proxy_ca_file = proxy_ca_file
|
self.proxy_ca_file = proxy_ca_file
|
||||||
self.enroll_db = enroll_db
|
self.enroll_db = enroll_db
|
||||||
|
# Registration and login both require a physical card touch, which can
|
||||||
|
# take up to ~60 s in practice; 90 s gives a generous margin.
|
||||||
self.interactive_timeout_s = interactive_timeout_s
|
self.interactive_timeout_s = interactive_timeout_s
|
||||||
self.default_timeout_s = default_timeout_s
|
self.default_timeout_s = default_timeout_s
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
self.preferred_enrollment: EnrollmentRecord | None = None
|
self.preferred_enrollment: EnrollmentRecord | None = None
|
||||||
self.session_token: str | None = None
|
self.session_token: str | None = None
|
||||||
self.session_expires_at: int | None = None
|
self.session_expires_at: int | None = None
|
||||||
|
# Build the TLS context once; creating it on every request is expensive
|
||||||
|
# and the CA file doesn't change at runtime.
|
||||||
|
self._ssl_ctx: ssl.SSLContext | None = (
|
||||||
|
ssl.create_default_context(cafile=self.proxy_ca_file)
|
||||||
|
if proxy_base_url.startswith("https://")
|
||||||
|
else None
|
||||||
|
)
|
||||||
self._load_preferred_enrollment()
|
self._load_preferred_enrollment()
|
||||||
|
|
||||||
def _ssl_context(self):
|
def _ssl_context(self) -> ssl.SSLContext | None:
|
||||||
if self.proxy_base_url.startswith("https://"):
|
return self._ssl_ctx
|
||||||
return ssl.create_default_context(cafile=self.proxy_ca_file)
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _proxy_json(
|
def _proxy_json(
|
||||||
self,
|
self,
|
||||||
|
|
@ -626,6 +635,12 @@ class ClientState:
|
||||||
username = username.strip()
|
username = username.strip()
|
||||||
if not username:
|
if not username:
|
||||||
return {"ok": False, "error": "username required"}
|
return {"ok": False, "error": "username required"}
|
||||||
|
# Best-effort: invalidate any active session on k_proxy before re-enrolling.
|
||||||
|
# The new credential will differ from what the old session was issued for.
|
||||||
|
with self.lock:
|
||||||
|
old_token = self.session_token
|
||||||
|
if old_token:
|
||||||
|
self._proxy_json("POST", "/session/logout")
|
||||||
status, data = self._proxy_json(
|
status, data = self._proxy_json(
|
||||||
"POST",
|
"POST",
|
||||||
"/enroll/register",
|
"/enroll/register",
|
||||||
|
|
@ -741,6 +756,15 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return {}
|
return {}
|
||||||
return json.loads(raw.decode("utf-8"))
|
return json.loads(raw.decode("utf-8"))
|
||||||
|
|
||||||
|
def _require_json(self) -> dict[str, Any] | None:
|
||||||
|
# Returns None and sends 400 when the body is unparseable; the caller
|
||||||
|
# should return immediately without sending a second response.
|
||||||
|
try:
|
||||||
|
return self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
|
return None
|
||||||
|
|
||||||
def do_GET(self) -> None: # noqa: N802
|
def do_GET(self) -> None: # noqa: N802
|
||||||
path = urlparse(self.path).path
|
path = urlparse(self.path).path
|
||||||
if path == "/":
|
if path == "/":
|
||||||
|
|
@ -761,28 +785,22 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
def do_POST(self) -> None: # noqa: N802
|
def do_POST(self) -> None: # noqa: N802
|
||||||
path = urlparse(self.path).path
|
path = urlparse(self.path).path
|
||||||
if path == "/api/enroll":
|
if path == "/api/enroll":
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
result = self.state.enroll(str(data.get("username", "")))
|
result = self.state.enroll(str(data.get("username", "")))
|
||||||
self._json(200 if result.get("ok") else 400, result)
|
self._json(200 if result.get("ok") else 400, result)
|
||||||
return
|
return
|
||||||
if path == "/api/login":
|
if path == "/api/login":
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
status, data = self.state.login(str(data.get("username", "")))
|
status, data = self.state.login(str(data.get("username", "")))
|
||||||
self._json(status, data)
|
self._json(status, data)
|
||||||
return
|
return
|
||||||
if path == "/api/enroll/delete":
|
if path == "/api/enroll/delete":
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
status, data = self.state.delete_enrollment(str(data.get("username", "")))
|
status, data = self.state.delete_enrollment(str(data.get("username", "")))
|
||||||
self._json(status, data)
|
self._json(status, data)
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,14 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Minimal k_proxy service for Phase 5 bring-up.
|
k_proxy — session gateway and card authentication bridge.
|
||||||
|
|
||||||
Behavior:
|
Creates short-lived bearer sessions after a card-backed auth gate, then
|
||||||
- Creates short-lived sessions after a card-backed auth gate.
|
proxies authenticated requests to k_server. Enrollment metadata and session
|
||||||
- Reuses valid sessions to access k_server protected counter endpoint.
|
state are both process-local; sessions do not survive a restart.
|
||||||
- Supports enrollment, session status, and logout.
|
|
||||||
|
|
||||||
Notes:
|
Default auth mode is a lightweight card-presence probe (subprocess call to
|
||||||
- Default runtime still uses the legacy card-presence probe gate.
|
fido2_probe.py). Pass --auth-mode fido2-direct for real CTAP2
|
||||||
- Experimental direct FIDO2 registration/assertion lives behind `--auth-mode fido2-direct`.
|
makeCredential/getAssertion against the attached ChromeCard.
|
||||||
- This is still a prototype and not a final production auth design.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -55,8 +53,11 @@ from fido2.webauthn import (
|
||||||
UserVerificationRequirement,
|
UserVerificationRequirement,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None:
|
if getattr(fido2.features.webauthn_json_mapping, "_enabled", None) is None:
|
||||||
fido2.features.webauthn_json_mapping.enabled = True
|
fido2.features.webauthn_json_mapping.enabled = True
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
HTML = """<!doctype html>
|
HTML = """<!doctype html>
|
||||||
|
|
@ -543,6 +544,7 @@ class ProxyState:
|
||||||
return time.time()
|
return time.time()
|
||||||
|
|
||||||
def _gc_locked(self) -> None:
|
def _gc_locked(self) -> None:
|
||||||
|
# Caller must hold self.lock.
|
||||||
now = self._now()
|
now = self._now()
|
||||||
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
|
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
|
||||||
for token in dead:
|
for token in dead:
|
||||||
|
|
@ -671,6 +673,9 @@ class ProxyState:
|
||||||
self._drop_direct_device_locked()
|
self._drop_direct_device_locked()
|
||||||
|
|
||||||
def _with_direct_ctap2(self, action):
|
def _with_direct_ctap2(self, action):
|
||||||
|
# First attempt reuses the cached handle; if it fails (e.g. the card was
|
||||||
|
# briefly removed or the CTAPHID channel desynchronised), we reopen once
|
||||||
|
# and retry before propagating the error.
|
||||||
with self.direct_device_lock:
|
with self.direct_device_lock:
|
||||||
last_exc: Exception | None = None
|
last_exc: Exception | None = None
|
||||||
for reopen in (False, True):
|
for reopen in (False, True):
|
||||||
|
|
@ -752,7 +757,7 @@ class ProxyState:
|
||||||
auth_data = self.fido_server.register_complete(
|
auth_data = self.fido_server.register_complete(
|
||||||
state,
|
state,
|
||||||
RegistrationResponse(
|
RegistrationResponse(
|
||||||
id=attestation.auth_data.credential_data.credential_id,
|
raw_id=attestation.auth_data.credential_data.credential_id,
|
||||||
response=AuthenticatorAttestationResponse(
|
response=AuthenticatorAttestationResponse(
|
||||||
client_data=client_data,
|
client_data=client_data,
|
||||||
attestation_object=AttestationObject.create(
|
attestation_object=AttestationObject.create(
|
||||||
|
|
@ -883,7 +888,7 @@ class ProxyState:
|
||||||
state,
|
state,
|
||||||
[credential],
|
[credential],
|
||||||
AuthenticationResponse(
|
AuthenticationResponse(
|
||||||
id=response.credential["id"],
|
raw_id=response.credential["id"],
|
||||||
response=AuthenticatorAssertionResponse(
|
response=AuthenticatorAssertionResponse(
|
||||||
client_data=client_data,
|
client_data=client_data,
|
||||||
authenticator_data=response.auth_data,
|
authenticator_data=response.auth_data,
|
||||||
|
|
@ -959,6 +964,8 @@ class UpstreamPool:
|
||||||
conn.request("POST", full_path, body=body, headers=req_headers)
|
conn.request("POST", full_path, body=body, headers=req_headers)
|
||||||
resp = conn.getresponse()
|
resp = conn.getresponse()
|
||||||
raw = resp.read()
|
raw = resp.read()
|
||||||
|
# will_close is set by the server when it intends to close the connection
|
||||||
|
# after this response; reusing such a connection would hit an EOF.
|
||||||
reusable = not resp.will_close
|
reusable = not resp.will_close
|
||||||
try:
|
try:
|
||||||
data = json.loads(raw.decode("utf-8")) if raw else {}
|
data = json.loads(raw.decode("utf-8")) if raw else {}
|
||||||
|
|
@ -1001,10 +1008,20 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return json.loads(raw.decode("utf-8"))
|
return json.loads(raw.decode("utf-8"))
|
||||||
|
|
||||||
def _discard_request_body(self) -> None:
|
def _discard_request_body(self) -> None:
|
||||||
|
# HTTP/1.1 keep-alive: body must be consumed before the response is sent.
|
||||||
length = int(self.headers.get("Content-Length", "0"))
|
length = int(self.headers.get("Content-Length", "0"))
|
||||||
if length > 0:
|
if length > 0:
|
||||||
self.rfile.read(length)
|
self.rfile.read(length)
|
||||||
|
|
||||||
|
def _require_json(self) -> dict[str, Any] | None:
|
||||||
|
# Returns None and sends 400 when the body is unparseable; callers must
|
||||||
|
# return immediately without sending a second response.
|
||||||
|
try:
|
||||||
|
return self._read_json()
|
||||||
|
except Exception:
|
||||||
|
self._json(400, {"ok": False, "error": "invalid json"})
|
||||||
|
return None
|
||||||
|
|
||||||
def _bearer_token(self) -> str | None:
|
def _bearer_token(self) -> str | None:
|
||||||
value = self.headers.get("Authorization", "")
|
value = self.headers.get("Authorization", "")
|
||||||
if not value.startswith("Bearer "):
|
if not value.startswith("Bearer "):
|
||||||
|
|
@ -1013,6 +1030,8 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return token or None
|
return token or None
|
||||||
|
|
||||||
def _require_session(self) -> tuple[str, Session] | None:
|
def _require_session(self) -> tuple[str, Session] | None:
|
||||||
|
# Returns None when auth fails; the 401 has already been sent, so callers
|
||||||
|
# must return immediately without writing a second response.
|
||||||
token = self._bearer_token()
|
token = self._bearer_token()
|
||||||
if not token:
|
if not token:
|
||||||
self._json(401, {"ok": False, "error": "missing bearer token"})
|
self._json(401, {"ok": False, "error": "missing bearer token"})
|
||||||
|
|
@ -1073,10 +1092,8 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self.send_error(404)
|
self.send_error(404)
|
||||||
|
|
||||||
def _session_login(self) -> None:
|
def _session_login(self) -> None:
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -1107,10 +1124,8 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _enroll_register(self) -> None:
|
def _enroll_register(self) -> None:
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -1131,10 +1146,8 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at))
|
self._json(200, enrollment_payload(enrollment, created=enrollment.created_at == enrollment.updated_at))
|
||||||
|
|
||||||
def _enroll_update(self) -> None:
|
def _enroll_update(self) -> None:
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
enrollment = self.state.update_enrollment(
|
enrollment = self.state.update_enrollment(
|
||||||
|
|
@ -1150,10 +1163,8 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self._json(200, enrollment_payload(enrollment))
|
self._json(200, enrollment_payload(enrollment))
|
||||||
|
|
||||||
def _enroll_delete(self) -> None:
|
def _enroll_delete(self) -> None:
|
||||||
try:
|
data = self._require_json()
|
||||||
data = self._read_json()
|
if data is None:
|
||||||
except Exception:
|
|
||||||
self._json(400, {"ok": False, "error": "invalid json"})
|
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
enrollment = self.state.delete_enrollment(str(data.get("username", "")))
|
enrollment = self.state.delete_enrollment(str(data.get("username", "")))
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Minimal k_server service for Phase 5/5.5 bring-up.
|
k_server — protected resource backend.
|
||||||
|
|
||||||
Behavior:
|
Exposes a monotonic counter behind a shared proxy token. Only k_proxy
|
||||||
- Exposes a protected monotonic counter endpoint.
|
is expected to reach this service; k_client should have no direct path.
|
||||||
- Accepts only requests from k_proxy via a shared proxy token header.
|
All state is process-local and resets on restart.
|
||||||
- Uses thread-safe counter increments.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -21,6 +20,7 @@ from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
class ServerState:
|
class ServerState:
|
||||||
|
# All state is process-local; a restart resets the counter to zero.
|
||||||
def __init__(self, proxy_token: str):
|
def __init__(self, proxy_token: str):
|
||||||
self.proxy_token = proxy_token
|
self.proxy_token = proxy_token
|
||||||
self.counter = 0
|
self.counter = 0
|
||||||
|
|
@ -45,6 +45,8 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
self.wfile.write(body)
|
self.wfile.write(body)
|
||||||
|
|
||||||
def _discard_request_body(self) -> None:
|
def _discard_request_body(self) -> None:
|
||||||
|
# HTTP/1.1 keep-alive: the connection is reused, so the body must be fully
|
||||||
|
# consumed before we send the response, even for endpoints that ignore it.
|
||||||
length = int(self.headers.get("Content-Length", "0"))
|
length = int(self.headers.get("Content-Length", "0"))
|
||||||
if length > 0:
|
if length > 0:
|
||||||
self.rfile.read(length)
|
self.rfile.read(length)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,339 @@
|
||||||
|
"""
|
||||||
|
CardEmulator — software emulator of the ChromeCard FIDO2 authenticator
|
||||||
|
======================================================================
|
||||||
|
|
||||||
|
What it is
|
||||||
|
----------
|
||||||
|
CardEmulator is a drop-in replacement for the physical ChromeCard in tests.
|
||||||
|
It implements the two Ctap2 methods that k_proxy_app calls in fido2-direct
|
||||||
|
mode — make_credential (registration) and get_assertion (authentication) —
|
||||||
|
using real P-256 cryptography and an in-memory credential store.
|
||||||
|
|
||||||
|
The auth_data layout and COSE key encoding mirror the firmware exactly
|
||||||
|
(see fido_make_cred.c and fido_get_assertion.c), so fido2.server's
|
||||||
|
register_complete and authenticate_complete accept the emulator's responses
|
||||||
|
without any extra patching.
|
||||||
|
|
||||||
|
|
||||||
|
Wiring the emulator into a ProxyState test
|
||||||
|
------------------------------------------
|
||||||
|
Two patches are needed: one to replace _with_direct_ctap2 so it hands the
|
||||||
|
emulator to the lambda instead of opening a real HID device, and one to
|
||||||
|
suppress _drop_direct_device which would otherwise try to close a real handle.
|
||||||
|
|
||||||
|
A convenience helper for this is provided in test_k_proxy.py:
|
||||||
|
|
||||||
|
def _patch_emulator(state, emulator):
|
||||||
|
return patch.multiple(
|
||||||
|
state,
|
||||||
|
_with_direct_ctap2=lambda fn: fn(emulator),
|
||||||
|
_drop_direct_device=lambda: None,
|
||||||
|
)
|
||||||
|
|
||||||
|
Typical test setup:
|
||||||
|
|
||||||
|
from card_emulator import CardEmulator
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
emulator = CardEmulator()
|
||||||
|
state = _make_state(tmp_path, auth_mode=AUTH_MODE_FIDO2_DIRECT)
|
||||||
|
|
||||||
|
with _patch_emulator(state, emulator):
|
||||||
|
enrollment = state.register_enrollment("alice", "Alice")
|
||||||
|
ok, msg = state.authenticate_with_card("alice") # True, "assertion verified"
|
||||||
|
|
||||||
|
|
||||||
|
User confirmation (Yes / No on the card)
|
||||||
|
-----------------------------------------
|
||||||
|
Both make_credential and get_assertion accept a `user_confirms` keyword
|
||||||
|
argument (default True). Setting it to False raises
|
||||||
|
CtapError(OPERATION_DENIED), exactly as the firmware does when the user taps
|
||||||
|
No on the card's confirmation dialog.
|
||||||
|
|
||||||
|
Direct calls — pass the flag explicitly:
|
||||||
|
|
||||||
|
attest = emulator.make_credential(
|
||||||
|
client_data_hash=..., rp=..., user=..., key_params=...,
|
||||||
|
user_confirms=False,
|
||||||
|
) # raises CtapError(OPERATION_DENIED)
|
||||||
|
|
||||||
|
Integration tests through _with_direct_ctap2 — the lambda that ProxyState
|
||||||
|
builds cannot inject user_confirms, so use refusing() instead. It returns
|
||||||
|
a thin wrapper whose methods forward to the emulator with user_confirms=False:
|
||||||
|
|
||||||
|
with _patch_emulator(state, emulator.refusing()):
|
||||||
|
ok, msg = state.authenticate_with_card("alice") # False
|
||||||
|
# msg contains "assertion verification failed: CTAP error: OPERATION_DENIED"
|
||||||
|
|
||||||
|
with _patch_emulator(state, emulator.refusing()):
|
||||||
|
with self.assertRaises(RuntimeError):
|
||||||
|
state.register_enrollment("bob", None)
|
||||||
|
# raises RuntimeError("card registration failed: ...")
|
||||||
|
|
||||||
|
refusing() shares the same credential store as the parent emulator, so
|
||||||
|
credentials registered before or after the call are visible to both.
|
||||||
|
|
||||||
|
|
||||||
|
Simulating card-side credential removal
|
||||||
|
----------------------------------------
|
||||||
|
forget_user(username) removes all credentials for that user from the
|
||||||
|
emulator's store and returns the count removed. Use it to simulate a
|
||||||
|
factory reset or a deliberate key deletion:
|
||||||
|
|
||||||
|
emulator.forget_user("alice")
|
||||||
|
ok, msg = state.authenticate_with_card("alice") # False
|
||||||
|
|
||||||
|
|
||||||
|
API summary
|
||||||
|
-----------
|
||||||
|
CardEmulator()
|
||||||
|
Create a new emulator with an empty credential store.
|
||||||
|
|
||||||
|
make_credential(client_data_hash, rp, user, key_params, *, user_confirms=True)
|
||||||
|
Simulate CTAP2 makeCredential. Returns AttestationResponse.
|
||||||
|
|
||||||
|
get_assertion(rp_id, client_data_hash, allow_list, *, user_confirms=True)
|
||||||
|
Simulate CTAP2 getAssertion. Returns AssertionResponse.
|
||||||
|
Raises CtapError(NO_CREDENTIALS) if no matching credential is found.
|
||||||
|
|
||||||
|
refusing() -> _RefusingView
|
||||||
|
Return a wrapper that forces user_confirms=False on every call.
|
||||||
|
|
||||||
|
forget_user(username) -> int
|
||||||
|
Remove all credentials for username. Returns count removed.
|
||||||
|
|
||||||
|
credential_count() -> int
|
||||||
|
Total credentials currently in the store.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import struct
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Mapping
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from fido2.ctap import CtapError
|
||||||
|
from fido2.ctap2 import AssertionResponse, AttestationResponse
|
||||||
|
from fido2.webauthn import AuthenticatorData
|
||||||
|
|
||||||
|
# AAGUID from fido_make_cred.c
|
||||||
|
_AAGUID = bytes([
|
||||||
|
0x12, 0x34, 0x56, 0x78, 0x90, 0xAB, 0xCD, 0xEF,
|
||||||
|
0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF,
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def _cose_es256(x: bytes, y: bytes) -> bytes:
|
||||||
|
"""CBOR-encoded COSE ES256 public key, byte-for-byte as the firmware builds it."""
|
||||||
|
return (
|
||||||
|
bytes([0xA5]) # map(5)
|
||||||
|
+ bytes([0x01, 0x02]) # kty: 2 (EC2)
|
||||||
|
+ bytes([0x03, 0x26]) # alg: -7 (ES256)
|
||||||
|
+ bytes([0x20, 0x01]) # crv (-1): 1 (P-256)
|
||||||
|
+ bytes([0x21, 0x58, 0x20]) + x # x (-2): bstr(32)
|
||||||
|
+ bytes([0x22, 0x58, 0x20]) + y # y (-3): bstr(32)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _Credential:
|
||||||
|
private_key: ec.EllipticCurvePrivateKey
|
||||||
|
rp_id_hash: bytes
|
||||||
|
user_id: bytes
|
||||||
|
username: str
|
||||||
|
sign_count: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
class CardEmulator:
|
||||||
|
"""In-process emulator of a ChromeCard FIDO2 authenticator.
|
||||||
|
|
||||||
|
Implements make_credential and get_assertion with the same signatures as
|
||||||
|
fido2.ctap2.Ctap2, plus forget_user() for test teardown.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._creds: dict[bytes, _Credential] = {}
|
||||||
|
|
||||||
|
# ── CTAP2 interface ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def make_credential(
|
||||||
|
self,
|
||||||
|
client_data_hash: bytes,
|
||||||
|
rp: Mapping[str, Any],
|
||||||
|
user: Mapping[str, Any],
|
||||||
|
key_params: list[Mapping[str, Any]],
|
||||||
|
exclude_list: list[Mapping[str, Any]] | None = None,
|
||||||
|
extensions: Mapping[str, Any] | None = None,
|
||||||
|
options: Mapping[str, Any] | None = None,
|
||||||
|
*,
|
||||||
|
user_confirms: bool = True,
|
||||||
|
**_: Any,
|
||||||
|
) -> AttestationResponse:
|
||||||
|
"""Simulate makeCredential.
|
||||||
|
|
||||||
|
When user_confirms is False the call raises CtapError(OPERATION_DENIED),
|
||||||
|
mirroring the firmware's response when the user taps No on the card.
|
||||||
|
Otherwise a real P-256 keypair is generated, stored, and returned as a
|
||||||
|
fmt='none' AttestationResponse with a valid COSE ES256 public key.
|
||||||
|
"""
|
||||||
|
if not user_confirms:
|
||||||
|
raise CtapError(CtapError.ERR.OPERATION_DENIED)
|
||||||
|
|
||||||
|
rp_id: str = rp["id"]
|
||||||
|
rp_id_hash = hashlib.sha256(rp_id.encode()).digest()
|
||||||
|
|
||||||
|
priv = ec.generate_private_key(ec.SECP256R1())
|
||||||
|
pub_nums = priv.public_key().public_numbers()
|
||||||
|
x = pub_nums.x.to_bytes(32, "big")
|
||||||
|
y = pub_nums.y.to_bytes(32, "big")
|
||||||
|
|
||||||
|
credential_id = os.urandom(32)
|
||||||
|
|
||||||
|
raw_user_id: bytes = user.get("id", b"") # type: ignore[assignment]
|
||||||
|
if isinstance(raw_user_id, str):
|
||||||
|
raw_user_id = raw_user_id.encode()
|
||||||
|
|
||||||
|
cred = _Credential(
|
||||||
|
private_key=priv,
|
||||||
|
rp_id_hash=rp_id_hash,
|
||||||
|
user_id=raw_user_id,
|
||||||
|
username=user.get("name", ""), # type: ignore[arg-type]
|
||||||
|
)
|
||||||
|
|
||||||
|
# authData layout matches fido_make_cred.c build_make_credential_response():
|
||||||
|
# rpIdHash(32) | flags(1) | signCount(4) | aaguid(16)
|
||||||
|
# | credIdLen(2) | credId(32) | coseKey(77)
|
||||||
|
auth_data_bytes = (
|
||||||
|
rp_id_hash
|
||||||
|
+ bytes([0x41]) # flags: UP=1, AT=1
|
||||||
|
+ struct.pack(">I", cred.sign_count)
|
||||||
|
+ _AAGUID
|
||||||
|
+ struct.pack(">H", len(credential_id))
|
||||||
|
+ credential_id
|
||||||
|
+ _cose_es256(x, y)
|
||||||
|
)
|
||||||
|
cred.sign_count += 1
|
||||||
|
self._creds[credential_id] = cred
|
||||||
|
|
||||||
|
return AttestationResponse(
|
||||||
|
fmt="none",
|
||||||
|
auth_data=AuthenticatorData(auth_data_bytes),
|
||||||
|
att_stmt={},
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_assertion(
|
||||||
|
self,
|
||||||
|
rp_id: str,
|
||||||
|
client_data_hash: bytes,
|
||||||
|
allow_list: list[Mapping[str, Any]] | None = None,
|
||||||
|
extensions: Mapping[str, Any] | None = None,
|
||||||
|
options: Mapping[str, Any] | None = None,
|
||||||
|
*,
|
||||||
|
user_confirms: bool = True,
|
||||||
|
**_: Any,
|
||||||
|
) -> AssertionResponse:
|
||||||
|
"""Simulate getAssertion.
|
||||||
|
|
||||||
|
When user_confirms is False raises CtapError(OPERATION_DENIED).
|
||||||
|
Otherwise finds the credential, builds authData (37 bytes, no AT flag),
|
||||||
|
signs authData || clientDataHash with ECDSA-SHA256 (DER), and returns
|
||||||
|
an AssertionResponse — byte-for-byte compatible with the firmware output.
|
||||||
|
"""
|
||||||
|
if not user_confirms:
|
||||||
|
raise CtapError(CtapError.ERR.OPERATION_DENIED)
|
||||||
|
|
||||||
|
rp_id_hash = hashlib.sha256(rp_id.encode()).digest()
|
||||||
|
|
||||||
|
if not allow_list:
|
||||||
|
raise CtapError(CtapError.ERR.NO_CREDENTIALS)
|
||||||
|
|
||||||
|
cred_id: bytes | None = None
|
||||||
|
cred: _Credential | None = None
|
||||||
|
for desc in allow_list:
|
||||||
|
cid: bytes = desc["id"] if isinstance(desc, dict) else getattr(desc, "id")
|
||||||
|
entry = self._creds.get(cid)
|
||||||
|
if entry is not None and entry.rp_id_hash == rp_id_hash:
|
||||||
|
cred_id = cid
|
||||||
|
cred = entry
|
||||||
|
break
|
||||||
|
|
||||||
|
if cred is None or cred_id is None:
|
||||||
|
raise CtapError(CtapError.ERR.NO_CREDENTIALS)
|
||||||
|
|
||||||
|
# authData layout matches fido_get_assertion.c build_get_assertion_response():
|
||||||
|
# rpIdHash(32) | flags(1) | signCount(4)
|
||||||
|
auth_data_bytes = (
|
||||||
|
rp_id_hash
|
||||||
|
+ bytes([0x01]) # flags: UP=1
|
||||||
|
+ struct.pack(">I", cred.sign_count)
|
||||||
|
)
|
||||||
|
cred.sign_count += 1
|
||||||
|
|
||||||
|
# Signature over authData || clientDataHash, DER-encoded.
|
||||||
|
# Matches drv_crypto_sign_hash_DER() in the firmware.
|
||||||
|
sig = cred.private_key.sign(
|
||||||
|
auth_data_bytes + client_data_hash,
|
||||||
|
ec.ECDSA(hashes.SHA256()),
|
||||||
|
)
|
||||||
|
|
||||||
|
user_field: dict[str, Any] | None = {"id": cred.user_id} if cred.user_id else None
|
||||||
|
|
||||||
|
return AssertionResponse(
|
||||||
|
credential={"id": cred_id, "type": "public-key"},
|
||||||
|
auth_data=AuthenticatorData(auth_data_bytes),
|
||||||
|
signature=sig,
|
||||||
|
user=user_field,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── test helpers ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def refusing(self) -> "_RefusingView":
|
||||||
|
"""Return a view of this emulator that always declines confirmation.
|
||||||
|
|
||||||
|
Use this when the test goes through _with_direct_ctap2, which calls
|
||||||
|
make_credential / get_assertion via a lambda and cannot inject
|
||||||
|
user_confirms directly:
|
||||||
|
|
||||||
|
with patch.object(state, "_with_direct_ctap2",
|
||||||
|
side_effect=lambda fn: fn(emulator.refusing())):
|
||||||
|
ok, msg = state.authenticate_with_card("alice")
|
||||||
|
self.assertFalse(ok)
|
||||||
|
"""
|
||||||
|
return _RefusingView(self)
|
||||||
|
|
||||||
|
def forget_user(self, username: str) -> int:
|
||||||
|
"""Remove all credentials for *username* from the emulator store.
|
||||||
|
|
||||||
|
Returns the number of credentials removed. Use this to simulate a
|
||||||
|
card-side credential deletion (factory reset, or deliberate removal).
|
||||||
|
"""
|
||||||
|
to_delete = [cid for cid, e in self._creds.items() if e.username == username]
|
||||||
|
for cid in to_delete:
|
||||||
|
del self._creds[cid]
|
||||||
|
return len(to_delete)
|
||||||
|
|
||||||
|
def credential_count(self) -> int:
|
||||||
|
"""Total number of credentials currently in the emulator store."""
|
||||||
|
return len(self._creds)
|
||||||
|
|
||||||
|
|
||||||
|
class _RefusingView:
|
||||||
|
"""Thin proxy returned by CardEmulator.refusing().
|
||||||
|
|
||||||
|
Forwards make_credential and get_assertion to the underlying emulator
|
||||||
|
with user_confirms=False, so every call raises OPERATION_DENIED.
|
||||||
|
The credential store is shared with the parent emulator.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, emulator: CardEmulator) -> None:
|
||||||
|
self._emulator = emulator
|
||||||
|
|
||||||
|
def make_credential(self, **kwargs: Any) -> AttestationResponse:
|
||||||
|
return self._emulator.make_credential(**kwargs, user_confirms=False)
|
||||||
|
|
||||||
|
def get_assertion(self, **kwargs: Any) -> AssertionResponse:
|
||||||
|
return self._emulator.get_assertion(**kwargs, user_confirms=False)
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue