k_card/webauthn_local_demo.py

390 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Local WebAuthn demo server for USB FIDO2 card testing.
Purpose:
- Validate registration and authentication flows with the connected card.
- Keep setup minimal (Python stdlib only).
Security note:
- This demo does NOT verify attestation or assertion signatures.
- Use only for local bring-up/testing, not production.
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import secrets
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
def b64u_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def b64u_decode(data: str) -> bytes:
pad = "=" * ((4 - len(data) % 4) % 4)
return base64.urlsafe_b64decode((data + pad).encode("ascii"))
def random_b64u(n: int = 32) -> str:
return b64u_encode(secrets.token_bytes(n))
class DemoState:
def __init__(self, db_path: Path, rp_id: str, rp_name: str, origin: str):
self.db_path = db_path
self.rp_id = rp_id
self.rp_name = rp_name
self.origin = origin
self.pending_register: dict[str, str] = {}
self.pending_auth: dict[str, str] = {}
self.db: dict[str, Any] = self._load_db()
def _load_db(self) -> dict[str, Any]:
if not self.db_path.exists():
return {"users": {}}
with self.db_path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_db(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self.db_path.open("w", encoding="utf-8") as f:
json.dump(self.db, f, indent=2)
def get_user(self, username: str) -> dict[str, Any]:
users = self.db.setdefault("users", {})
return users.setdefault(username, {"credentials": []})
def html_page() -> str:
return """<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>ChromeCard WebAuthn Local Demo</title>
<style>
body { font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, sans-serif; max-width: 860px; margin: 2rem auto; padding: 0 1rem; }
h1 { margin-bottom: 0.5rem; }
.row { display: flex; gap: 0.5rem; margin: 0.8rem 0; }
input { flex: 1; padding: 0.55rem; border: 1px solid #c8c8c8; border-radius: 8px; }
button { padding: 0.55rem 0.8rem; border: 1px solid #444; border-radius: 8px; background: #fff; cursor: pointer; }
pre { background: #111; color: #ddd; padding: 1rem; border-radius: 10px; overflow: auto; min-height: 200px; }
.muted { color: #555; }
</style>
</head>
<body>
<h1>ChromeCard WebAuthn Demo</h1>
<p class="muted">Use this page to test local FIDO2 register/login over USB.</p>
<div class="row">
<input id="username" value="alice" />
<button id="registerBtn">Register</button>
<button id="loginBtn">Login</button>
</div>
<pre id="log"></pre>
<script>
const log = (obj) => {
const el = document.getElementById("log");
const text = typeof obj === "string" ? obj : JSON.stringify(obj, null, 2);
el.textContent = text + "\\n" + el.textContent;
};
const toB64u = (bytes) => {
let str = "";
const arr = new Uint8Array(bytes);
for (let i = 0; i < arr.length; i++) str += String.fromCharCode(arr[i]);
return btoa(str).replace(/\\+/g, "-").replace(/\\//g, "_").replace(/=+$/g, "");
};
const fromB64u = (s) => {
const b64 = s.replace(/-/g, "+").replace(/_/g, "/") + "===".slice((s.length + 3) % 4);
const bin = atob(b64);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
return out.buffer;
};
async function postJson(path, body) {
const resp = await fetch(path, {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify(body),
});
const data = await resp.json();
if (!resp.ok) throw new Error(JSON.stringify(data));
return data;
}
async function register() {
const username = document.getElementById("username").value.trim();
const start = await postJson("/register/start", {username});
const pk = start.publicKey;
pk.challenge = fromB64u(pk.challenge);
pk.user.id = fromB64u(pk.user.id);
const cred = await navigator.credentials.create({ publicKey: pk });
const body = {
username,
id: cred.id,
rawId: toB64u(cred.rawId),
type: cred.type,
response: {
clientDataJSON: toB64u(cred.response.clientDataJSON),
attestationObject: toB64u(cred.response.attestationObject),
}
};
const finish = await postJson("/register/finish", body);
log({registerResult: finish});
}
async function login() {
const username = document.getElementById("username").value.trim();
const start = await postJson("/auth/start", {username});
const pk = start.publicKey;
pk.challenge = fromB64u(pk.challenge);
pk.allowCredentials = pk.allowCredentials.map(c => ({...c, id: fromB64u(c.id)}));
const assertion = await navigator.credentials.get({ publicKey: pk });
const body = {
username,
id: assertion.id,
rawId: toB64u(assertion.rawId),
type: assertion.type,
response: {
clientDataJSON: toB64u(assertion.response.clientDataJSON),
authenticatorData: toB64u(assertion.response.authenticatorData),
signature: toB64u(assertion.response.signature),
userHandle: assertion.response.userHandle ? toB64u(assertion.response.userHandle) : null
}
};
const finish = await postJson("/auth/finish", body);
log({authResult: finish});
}
document.getElementById("registerBtn").addEventListener("click", () => {
register().catch((e) => log("register error: " + e.message));
});
document.getElementById("loginBtn").addEventListener("click", () => {
login().catch((e) => log("login error: " + e.message));
});
</script>
</body>
</html>
"""
class Handler(BaseHTTPRequestHandler):
state: DemoState
def _json(self, status: int, data: dict[str, Any]) -> None:
body = json.dumps(data).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _bad(self, message: str, status: int = 400) -> None:
self._json(status, {"ok": False, "error": message})
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
return json.loads(raw.decode("utf-8"))
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/":
body = html_page().encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
try:
data = self._read_json()
except Exception:
self._bad("Invalid JSON")
return
if path == "/register/start":
self._register_start(data)
return
if path == "/register/finish":
self._register_finish(data)
return
if path == "/auth/start":
self._auth_start(data)
return
if path == "/auth/finish":
self._auth_finish(data)
return
self.send_error(404)
def _register_start(self, data: dict[str, Any]) -> None:
username = str(data.get("username", "")).strip()
if not username:
self._bad("username required")
return
challenge = random_b64u(32)
user_id = random_b64u(32)
self.state.pending_register[username] = challenge
public_key = {
"rp": {"name": self.state.rp_name, "id": self.state.rp_id},
"user": {"id": user_id, "name": username, "displayName": username},
"challenge": challenge,
"pubKeyCredParams": [{"type": "public-key", "alg": -7}, {"type": "public-key", "alg": -257}],
"timeout": 60000,
"attestation": "none",
"authenticatorSelection": {
"residentKey": "discouraged",
"requireResidentKey": False,
"userVerification": "preferred",
},
}
self._json(200, {"ok": True, "publicKey": public_key})
def _register_finish(self, data: dict[str, Any]) -> None:
username = str(data.get("username", "")).strip()
expected = self.state.pending_register.get(username)
if not username or not expected:
self._bad("no pending registration")
return
try:
client_data_raw = b64u_decode(data["response"]["clientDataJSON"])
client_data = json.loads(client_data_raw.decode("utf-8"))
challenge = client_data.get("challenge")
typ = client_data.get("type")
origin = client_data.get("origin")
except Exception:
self._bad("invalid credential response")
return
if typ != "webauthn.create":
self._bad("unexpected clientData type")
return
if challenge != expected:
self._bad("challenge mismatch")
return
if origin != self.state.origin:
self._bad(f"origin mismatch: expected {self.state.origin}, got {origin}")
return
raw_id = str(data.get("rawId", ""))
if not raw_id:
self._bad("rawId missing")
return
user = self.state.get_user(username)
creds = user.setdefault("credentials", [])
if raw_id not in creds:
creds.append(raw_id)
self.state.save_db()
self.state.pending_register.pop(username, None)
self._json(200, {"ok": True, "username": username, "credential_count": len(creds)})
def _auth_start(self, data: dict[str, Any]) -> None:
username = str(data.get("username", "")).strip()
if not username:
self._bad("username required")
return
user = self.state.db.get("users", {}).get(username)
if not user or not user.get("credentials"):
self._bad("no credentials for user", 404)
return
challenge = random_b64u(32)
self.state.pending_auth[username] = challenge
allow_credentials = [{"type": "public-key", "id": cid} for cid in user["credentials"]]
public_key = {
"challenge": challenge,
"rpId": self.state.rp_id,
"timeout": 60000,
"userVerification": "preferred",
"allowCredentials": allow_credentials,
}
self._json(200, {"ok": True, "publicKey": public_key})
def _auth_finish(self, data: dict[str, Any]) -> None:
username = str(data.get("username", "")).strip()
expected = self.state.pending_auth.get(username)
if not username or not expected:
self._bad("no pending authentication")
return
user = self.state.db.get("users", {}).get(username, {})
known = set(user.get("credentials", []))
raw_id = str(data.get("rawId", ""))
if raw_id not in known:
self._bad("unknown credential")
return
try:
client_data_raw = b64u_decode(data["response"]["clientDataJSON"])
client_data = json.loads(client_data_raw.decode("utf-8"))
challenge = client_data.get("challenge")
typ = client_data.get("type")
origin = client_data.get("origin")
except Exception:
self._bad("invalid assertion response")
return
if typ != "webauthn.get":
self._bad("unexpected clientData type")
return
if challenge != expected:
self._bad("challenge mismatch")
return
if origin != self.state.origin:
self._bad(f"origin mismatch: expected {self.state.origin}, got {origin}")
return
self.state.pending_auth.pop(username, None)
self._json(200, {"ok": True, "username": username, "authenticated": True})
def log_message(self, format: str, *args: Any) -> None:
return
def main() -> int:
parser = argparse.ArgumentParser(description="Local WebAuthn demo server")
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", type=int, default=8765)
parser.add_argument("--rp-id", default="localhost")
parser.add_argument("--rp-name", default="ChromeCard Local Demo")
parser.add_argument("--origin", default="http://localhost:8765")
parser.add_argument("--db", default=".webauthn_demo_db.json")
args = parser.parse_args()
db_path = Path(args.db).resolve()
state = DemoState(db_path, rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin)
Handler.state = state
server = ThreadingHTTPServer((args.host, args.port), Handler)
print(f"WebAuthn demo listening on http://{args.host}:{args.port}")
print(f"RP ID: {args.rp_id}")
print(f"Origin: {args.origin}")
print(f"DB: {db_path}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
return 0
if __name__ == "__main__":
raise SystemExit(main())