#!/usr/bin/env python3 """ Local WebAuthn demo server for USB FIDO2 card testing. Purpose: - Validate registration and authentication flows with the connected card. - Keep setup minimal (Python stdlib only). Security note: - This demo does NOT verify attestation or assertion signatures. - Use only for local bring-up/testing, not production. """ from __future__ import annotations import argparse import base64 import json import os import secrets from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.parse import urlparse def b64u_encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def b64u_decode(data: str) -> bytes: pad = "=" * ((4 - len(data) % 4) % 4) return base64.urlsafe_b64decode((data + pad).encode("ascii")) def random_b64u(n: int = 32) -> str: return b64u_encode(secrets.token_bytes(n)) class DemoState: def __init__(self, db_path: Path, rp_id: str, rp_name: str, origin: str): self.db_path = db_path self.rp_id = rp_id self.rp_name = rp_name self.origin = origin self.pending_register: dict[str, str] = {} self.pending_auth: dict[str, str] = {} self.db: dict[str, Any] = self._load_db() def _load_db(self) -> dict[str, Any]: if not self.db_path.exists(): return {"users": {}} with self.db_path.open("r", encoding="utf-8") as f: return json.load(f) def save_db(self) -> None: self.db_path.parent.mkdir(parents=True, exist_ok=True) with self.db_path.open("w", encoding="utf-8") as f: json.dump(self.db, f, indent=2) def get_user(self, username: str) -> dict[str, Any]: users = self.db.setdefault("users", {}) return users.setdefault(username, {"credentials": []}) def html_page() -> str: return """ ChromeCard WebAuthn Local Demo

ChromeCard WebAuthn Demo

Use this page to test local FIDO2 register/login over USB.


  


"""


class Handler(BaseHTTPRequestHandler):
    state: DemoState

    def _json(self, status: int, data: dict[str, Any]) -> None:
        body = json.dumps(data).encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _bad(self, message: str, status: int = 400) -> None:
        self._json(status, {"ok": False, "error": message})

    def _read_json(self) -> dict[str, Any]:
        length = int(self.headers.get("Content-Length", "0"))
        raw = self.rfile.read(length)
        return json.loads(raw.decode("utf-8"))

    def do_GET(self) -> None:  # noqa: N802
        path = urlparse(self.path).path
        if path == "/":
            body = html_page().encode("utf-8")
            self.send_response(200)
            self.send_header("Content-Type", "text/html; charset=utf-8")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)
            return
        self.send_error(404)

    def do_POST(self) -> None:  # noqa: N802
        path = urlparse(self.path).path
        try:
            data = self._read_json()
        except Exception:
            self._bad("Invalid JSON")
            return

        if path == "/register/start":
            self._register_start(data)
            return
        if path == "/register/finish":
            self._register_finish(data)
            return
        if path == "/auth/start":
            self._auth_start(data)
            return
        if path == "/auth/finish":
            self._auth_finish(data)
            return
        self.send_error(404)

    def _register_start(self, data: dict[str, Any]) -> None:
        username = str(data.get("username", "")).strip()
        if not username:
            self._bad("username required")
            return

        challenge = random_b64u(32)
        user_id = random_b64u(32)
        self.state.pending_register[username] = challenge
        public_key = {
            "rp": {"name": self.state.rp_name, "id": self.state.rp_id},
            "user": {"id": user_id, "name": username, "displayName": username},
            "challenge": challenge,
            "pubKeyCredParams": [{"type": "public-key", "alg": -7}, {"type": "public-key", "alg": -257}],
            "timeout": 60000,
            "attestation": "none",
            "authenticatorSelection": {
                "residentKey": "discouraged",
                "requireResidentKey": False,
                "userVerification": "preferred",
            },
        }
        self._json(200, {"ok": True, "publicKey": public_key})

    def _register_finish(self, data: dict[str, Any]) -> None:
        username = str(data.get("username", "")).strip()
        expected = self.state.pending_register.get(username)
        if not username or not expected:
            self._bad("no pending registration")
            return
        try:
            client_data_raw = b64u_decode(data["response"]["clientDataJSON"])
            client_data = json.loads(client_data_raw.decode("utf-8"))
            challenge = client_data.get("challenge")
            typ = client_data.get("type")
            origin = client_data.get("origin")
        except Exception:
            self._bad("invalid credential response")
            return

        if typ != "webauthn.create":
            self._bad("unexpected clientData type")
            return
        if challenge != expected:
            self._bad("challenge mismatch")
            return
        if origin != self.state.origin:
            self._bad(f"origin mismatch: expected {self.state.origin}, got {origin}")
            return

        raw_id = str(data.get("rawId", ""))
        if not raw_id:
            self._bad("rawId missing")
            return

        user = self.state.get_user(username)
        creds = user.setdefault("credentials", [])
        if raw_id not in creds:
            creds.append(raw_id)
        self.state.save_db()
        self.state.pending_register.pop(username, None)
        self._json(200, {"ok": True, "username": username, "credential_count": len(creds)})

    def _auth_start(self, data: dict[str, Any]) -> None:
        username = str(data.get("username", "")).strip()
        if not username:
            self._bad("username required")
            return
        user = self.state.db.get("users", {}).get(username)
        if not user or not user.get("credentials"):
            self._bad("no credentials for user", 404)
            return

        challenge = random_b64u(32)
        self.state.pending_auth[username] = challenge
        allow_credentials = [{"type": "public-key", "id": cid} for cid in user["credentials"]]
        public_key = {
            "challenge": challenge,
            "rpId": self.state.rp_id,
            "timeout": 60000,
            "userVerification": "preferred",
            "allowCredentials": allow_credentials,
        }
        self._json(200, {"ok": True, "publicKey": public_key})

    def _auth_finish(self, data: dict[str, Any]) -> None:
        username = str(data.get("username", "")).strip()
        expected = self.state.pending_auth.get(username)
        if not username or not expected:
            self._bad("no pending authentication")
            return

        user = self.state.db.get("users", {}).get(username, {})
        known = set(user.get("credentials", []))
        raw_id = str(data.get("rawId", ""))
        if raw_id not in known:
            self._bad("unknown credential")
            return

        try:
            client_data_raw = b64u_decode(data["response"]["clientDataJSON"])
            client_data = json.loads(client_data_raw.decode("utf-8"))
            challenge = client_data.get("challenge")
            typ = client_data.get("type")
            origin = client_data.get("origin")
        except Exception:
            self._bad("invalid assertion response")
            return

        if typ != "webauthn.get":
            self._bad("unexpected clientData type")
            return
        if challenge != expected:
            self._bad("challenge mismatch")
            return
        if origin != self.state.origin:
            self._bad(f"origin mismatch: expected {self.state.origin}, got {origin}")
            return

        self.state.pending_auth.pop(username, None)
        self._json(200, {"ok": True, "username": username, "authenticated": True})

    def log_message(self, format: str, *args: Any) -> None:
        return


def main() -> int:
    parser = argparse.ArgumentParser(description="Local WebAuthn demo server")
    parser.add_argument("--host", default="localhost")
    parser.add_argument("--port", type=int, default=8765)
    parser.add_argument("--rp-id", default="localhost")
    parser.add_argument("--rp-name", default="ChromeCard Local Demo")
    parser.add_argument("--origin", default="http://localhost:8765")
    parser.add_argument("--db", default=".webauthn_demo_db.json")
    args = parser.parse_args()

    db_path = Path(args.db).resolve()
    state = DemoState(db_path, rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin)
    Handler.state = state

    server = ThreadingHTTPServer((args.host, args.port), Handler)
    print(f"WebAuthn demo listening on http://{args.host}:{args.port}")
    print(f"RP ID: {args.rp_id}")
    print(f"Origin: {args.origin}")
    print(f"DB: {db_path}")
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.server_close()
    return 0


if __name__ == "__main__":
    raise SystemExit(main())