390 lines
14 KiB
Python
390 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Local WebAuthn demo server for USB FIDO2 card testing.
|
|
|
|
Purpose:
|
|
- Validate registration and authentication flows with the connected card.
|
|
- Keep setup minimal (Python stdlib only).
|
|
|
|
Security note:
|
|
- This demo does NOT verify attestation or assertion signatures.
|
|
- Use only for local bring-up/testing, not production.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import secrets
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
def b64u_encode(data: bytes) -> str:
|
|
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
|
|
|
|
|
|
def b64u_decode(data: str) -> bytes:
|
|
pad = "=" * ((4 - len(data) % 4) % 4)
|
|
return base64.urlsafe_b64decode((data + pad).encode("ascii"))
|
|
|
|
|
|
def random_b64u(n: int = 32) -> str:
|
|
return b64u_encode(secrets.token_bytes(n))
|
|
|
|
|
|
class DemoState:
|
|
def __init__(self, db_path: Path, rp_id: str, rp_name: str, origin: str):
|
|
self.db_path = db_path
|
|
self.rp_id = rp_id
|
|
self.rp_name = rp_name
|
|
self.origin = origin
|
|
self.pending_register: dict[str, str] = {}
|
|
self.pending_auth: dict[str, str] = {}
|
|
self.db: dict[str, Any] = self._load_db()
|
|
|
|
def _load_db(self) -> dict[str, Any]:
|
|
if not self.db_path.exists():
|
|
return {"users": {}}
|
|
with self.db_path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def save_db(self) -> None:
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.db_path.open("w", encoding="utf-8") as f:
|
|
json.dump(self.db, f, indent=2)
|
|
|
|
def get_user(self, username: str) -> dict[str, Any]:
|
|
users = self.db.setdefault("users", {})
|
|
return users.setdefault(username, {"credentials": []})
|
|
|
|
|
|
def html_page() -> str:
|
|
return """<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8" />
|
|
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
|
<title>ChromeCard WebAuthn Local Demo</title>
|
|
<style>
|
|
body { font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, sans-serif; max-width: 860px; margin: 2rem auto; padding: 0 1rem; }
|
|
h1 { margin-bottom: 0.5rem; }
|
|
.row { display: flex; gap: 0.5rem; margin: 0.8rem 0; }
|
|
input { flex: 1; padding: 0.55rem; border: 1px solid #c8c8c8; border-radius: 8px; }
|
|
button { padding: 0.55rem 0.8rem; border: 1px solid #444; border-radius: 8px; background: #fff; cursor: pointer; }
|
|
pre { background: #111; color: #ddd; padding: 1rem; border-radius: 10px; overflow: auto; min-height: 200px; }
|
|
.muted { color: #555; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>ChromeCard WebAuthn Demo</h1>
|
|
<p class="muted">Use this page to test local FIDO2 register/login over USB.</p>
|
|
<div class="row">
|
|
<input id="username" value="alice" />
|
|
<button id="registerBtn">Register</button>
|
|
<button id="loginBtn">Login</button>
|
|
</div>
|
|
<pre id="log"></pre>
|
|
<script>
|
|
const log = (obj) => {
|
|
const el = document.getElementById("log");
|
|
const text = typeof obj === "string" ? obj : JSON.stringify(obj, null, 2);
|
|
el.textContent = text + "\\n" + el.textContent;
|
|
};
|
|
|
|
const toB64u = (bytes) => {
|
|
let str = "";
|
|
const arr = new Uint8Array(bytes);
|
|
for (let i = 0; i < arr.length; i++) str += String.fromCharCode(arr[i]);
|
|
return btoa(str).replace(/\\+/g, "-").replace(/\\//g, "_").replace(/=+$/g, "");
|
|
};
|
|
|
|
const fromB64u = (s) => {
|
|
const b64 = s.replace(/-/g, "+").replace(/_/g, "/") + "===".slice((s.length + 3) % 4);
|
|
const bin = atob(b64);
|
|
const out = new Uint8Array(bin.length);
|
|
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
|
|
return out.buffer;
|
|
};
|
|
|
|
async function postJson(path, body) {
|
|
const resp = await fetch(path, {
|
|
method: "POST",
|
|
headers: {"Content-Type": "application/json"},
|
|
body: JSON.stringify(body),
|
|
});
|
|
const data = await resp.json();
|
|
if (!resp.ok) throw new Error(JSON.stringify(data));
|
|
return data;
|
|
}
|
|
|
|
async function register() {
|
|
const username = document.getElementById("username").value.trim();
|
|
const start = await postJson("/register/start", {username});
|
|
const pk = start.publicKey;
|
|
pk.challenge = fromB64u(pk.challenge);
|
|
pk.user.id = fromB64u(pk.user.id);
|
|
const cred = await navigator.credentials.create({ publicKey: pk });
|
|
const body = {
|
|
username,
|
|
id: cred.id,
|
|
rawId: toB64u(cred.rawId),
|
|
type: cred.type,
|
|
response: {
|
|
clientDataJSON: toB64u(cred.response.clientDataJSON),
|
|
attestationObject: toB64u(cred.response.attestationObject),
|
|
}
|
|
};
|
|
const finish = await postJson("/register/finish", body);
|
|
log({registerResult: finish});
|
|
}
|
|
|
|
async function login() {
|
|
const username = document.getElementById("username").value.trim();
|
|
const start = await postJson("/auth/start", {username});
|
|
const pk = start.publicKey;
|
|
pk.challenge = fromB64u(pk.challenge);
|
|
pk.allowCredentials = pk.allowCredentials.map(c => ({...c, id: fromB64u(c.id)}));
|
|
const assertion = await navigator.credentials.get({ publicKey: pk });
|
|
const body = {
|
|
username,
|
|
id: assertion.id,
|
|
rawId: toB64u(assertion.rawId),
|
|
type: assertion.type,
|
|
response: {
|
|
clientDataJSON: toB64u(assertion.response.clientDataJSON),
|
|
authenticatorData: toB64u(assertion.response.authenticatorData),
|
|
signature: toB64u(assertion.response.signature),
|
|
userHandle: assertion.response.userHandle ? toB64u(assertion.response.userHandle) : null
|
|
}
|
|
};
|
|
const finish = await postJson("/auth/finish", body);
|
|
log({authResult: finish});
|
|
}
|
|
|
|
document.getElementById("registerBtn").addEventListener("click", () => {
|
|
register().catch((e) => log("register error: " + e.message));
|
|
});
|
|
document.getElementById("loginBtn").addEventListener("click", () => {
|
|
login().catch((e) => log("login error: " + e.message));
|
|
});
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
state: DemoState
|
|
|
|
def _json(self, status: int, data: dict[str, Any]) -> None:
|
|
body = json.dumps(data).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _bad(self, message: str, status: int = 400) -> None:
|
|
self._json(status, {"ok": False, "error": message})
|
|
|
|
def _read_json(self) -> dict[str, Any]:
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
raw = self.rfile.read(length)
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
def do_GET(self) -> None: # noqa: N802
|
|
path = urlparse(self.path).path
|
|
if path == "/":
|
|
body = html_page().encode("utf-8")
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
return
|
|
self.send_error(404)
|
|
|
|
def do_POST(self) -> None: # noqa: N802
|
|
path = urlparse(self.path).path
|
|
try:
|
|
data = self._read_json()
|
|
except Exception:
|
|
self._bad("Invalid JSON")
|
|
return
|
|
|
|
if path == "/register/start":
|
|
self._register_start(data)
|
|
return
|
|
if path == "/register/finish":
|
|
self._register_finish(data)
|
|
return
|
|
if path == "/auth/start":
|
|
self._auth_start(data)
|
|
return
|
|
if path == "/auth/finish":
|
|
self._auth_finish(data)
|
|
return
|
|
self.send_error(404)
|
|
|
|
def _register_start(self, data: dict[str, Any]) -> None:
|
|
username = str(data.get("username", "")).strip()
|
|
if not username:
|
|
self._bad("username required")
|
|
return
|
|
|
|
challenge = random_b64u(32)
|
|
user_id = random_b64u(32)
|
|
self.state.pending_register[username] = challenge
|
|
public_key = {
|
|
"rp": {"name": self.state.rp_name, "id": self.state.rp_id},
|
|
"user": {"id": user_id, "name": username, "displayName": username},
|
|
"challenge": challenge,
|
|
"pubKeyCredParams": [{"type": "public-key", "alg": -7}, {"type": "public-key", "alg": -257}],
|
|
"timeout": 60000,
|
|
"attestation": "none",
|
|
"authenticatorSelection": {
|
|
"residentKey": "discouraged",
|
|
"requireResidentKey": False,
|
|
"userVerification": "preferred",
|
|
},
|
|
}
|
|
self._json(200, {"ok": True, "publicKey": public_key})
|
|
|
|
def _register_finish(self, data: dict[str, Any]) -> None:
|
|
username = str(data.get("username", "")).strip()
|
|
expected = self.state.pending_register.get(username)
|
|
if not username or not expected:
|
|
self._bad("no pending registration")
|
|
return
|
|
try:
|
|
client_data_raw = b64u_decode(data["response"]["clientDataJSON"])
|
|
client_data = json.loads(client_data_raw.decode("utf-8"))
|
|
challenge = client_data.get("challenge")
|
|
typ = client_data.get("type")
|
|
origin = client_data.get("origin")
|
|
except Exception:
|
|
self._bad("invalid credential response")
|
|
return
|
|
|
|
if typ != "webauthn.create":
|
|
self._bad("unexpected clientData type")
|
|
return
|
|
if challenge != expected:
|
|
self._bad("challenge mismatch")
|
|
return
|
|
if origin != self.state.origin:
|
|
self._bad(f"origin mismatch: expected {self.state.origin}, got {origin}")
|
|
return
|
|
|
|
raw_id = str(data.get("rawId", ""))
|
|
if not raw_id:
|
|
self._bad("rawId missing")
|
|
return
|
|
|
|
user = self.state.get_user(username)
|
|
creds = user.setdefault("credentials", [])
|
|
if raw_id not in creds:
|
|
creds.append(raw_id)
|
|
self.state.save_db()
|
|
self.state.pending_register.pop(username, None)
|
|
self._json(200, {"ok": True, "username": username, "credential_count": len(creds)})
|
|
|
|
def _auth_start(self, data: dict[str, Any]) -> None:
|
|
username = str(data.get("username", "")).strip()
|
|
if not username:
|
|
self._bad("username required")
|
|
return
|
|
user = self.state.db.get("users", {}).get(username)
|
|
if not user or not user.get("credentials"):
|
|
self._bad("no credentials for user", 404)
|
|
return
|
|
|
|
challenge = random_b64u(32)
|
|
self.state.pending_auth[username] = challenge
|
|
allow_credentials = [{"type": "public-key", "id": cid} for cid in user["credentials"]]
|
|
public_key = {
|
|
"challenge": challenge,
|
|
"rpId": self.state.rp_id,
|
|
"timeout": 60000,
|
|
"userVerification": "preferred",
|
|
"allowCredentials": allow_credentials,
|
|
}
|
|
self._json(200, {"ok": True, "publicKey": public_key})
|
|
|
|
def _auth_finish(self, data: dict[str, Any]) -> None:
|
|
username = str(data.get("username", "")).strip()
|
|
expected = self.state.pending_auth.get(username)
|
|
if not username or not expected:
|
|
self._bad("no pending authentication")
|
|
return
|
|
|
|
user = self.state.db.get("users", {}).get(username, {})
|
|
known = set(user.get("credentials", []))
|
|
raw_id = str(data.get("rawId", ""))
|
|
if raw_id not in known:
|
|
self._bad("unknown credential")
|
|
return
|
|
|
|
try:
|
|
client_data_raw = b64u_decode(data["response"]["clientDataJSON"])
|
|
client_data = json.loads(client_data_raw.decode("utf-8"))
|
|
challenge = client_data.get("challenge")
|
|
typ = client_data.get("type")
|
|
origin = client_data.get("origin")
|
|
except Exception:
|
|
self._bad("invalid assertion response")
|
|
return
|
|
|
|
if typ != "webauthn.get":
|
|
self._bad("unexpected clientData type")
|
|
return
|
|
if challenge != expected:
|
|
self._bad("challenge mismatch")
|
|
return
|
|
if origin != self.state.origin:
|
|
self._bad(f"origin mismatch: expected {self.state.origin}, got {origin}")
|
|
return
|
|
|
|
self.state.pending_auth.pop(username, None)
|
|
self._json(200, {"ok": True, "username": username, "authenticated": True})
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
return
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Local WebAuthn demo server")
|
|
parser.add_argument("--host", default="localhost")
|
|
parser.add_argument("--port", type=int, default=8765)
|
|
parser.add_argument("--rp-id", default="localhost")
|
|
parser.add_argument("--rp-name", default="ChromeCard Local Demo")
|
|
parser.add_argument("--origin", default="http://localhost:8765")
|
|
parser.add_argument("--db", default=".webauthn_demo_db.json")
|
|
args = parser.parse_args()
|
|
|
|
db_path = Path(args.db).resolve()
|
|
state = DemoState(db_path, rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin)
|
|
Handler.state = state
|
|
|
|
server = ThreadingHTTPServer((args.host, args.port), Handler)
|
|
print(f"WebAuthn demo listening on http://{args.host}:{args.port}")
|
|
print(f"RP ID: {args.rp_id}")
|
|
print(f"Origin: {args.origin}")
|
|
print(f"DB: {db_path}")
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
server.server_close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|