diff --git a/k_phone/lib/proxy_service.dart b/k_phone/lib/proxy_service.dart index 1923476..f802254 100644 --- a/k_phone/lib/proxy_service.dart +++ b/k_phone/lib/proxy_service.dart @@ -311,7 +311,11 @@ class _ProxyServer { } if (enrollment.hasCredential && _cardCid != null) { - // FIDO2-direct: getAssertion + verify + // FIDO2-direct: getAssertion + local verify. + // Random challenge is intentional here: session login only proves the + // user CAN authenticate (user-presence check). The resulting session token + // is for portal access. Per-request resource binding (challenge = SHA256 + // of url|method|nonce) happens in _handleAuthGetToken, not here. GetAssertionResult assertionResult; try { assertionResult = await getAssertion(_cardCid!, enrollment.credentialDataB64!); diff --git a/tests/test_k_server.py b/tests/test_k_server.py new file mode 100644 index 0000000..bed7dab --- /dev/null +++ b/tests/test_k_server.py @@ -0,0 +1,349 @@ +""" +Unit + round-trip tests for k_server_app._verify_assertion_token. + +Run: + uv run --python 3.12 --with fido2 --with cbor2 --with cryptography \ + python3 -m unittest tests/test_k_server.py + +The unit tests (TestVerifyAssertionToken) only need cbor2 + cryptography. +The round-trip tests (TestVerifyAssertionTokenRoundTrip) also need fido2 +(through CardEmulator) — they are skipped automatically if fido2 is absent. +""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import os +import struct +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, os.path.dirname(__file__)) + +import k_server_app + + +# --------------------------------------------------------------------------- +# Dependency guards +# --------------------------------------------------------------------------- + +try: + import cbor2 # noqa: F401 + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.asymmetric.ec import ( + ECDSA, + SECP256R1, + generate_private_key, + ) + from cryptography.hazmat.primitives.hashes import SHA256 + HAS_CRYPTO = True +except ImportError: + HAS_CRYPTO = False + +try: + from card_emulator import CardEmulator + HAS_FIDO2 = True +except ImportError: + HAS_FIDO2 = False + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _b64u_encode(b: bytes) -> str: + return base64.urlsafe_b64encode(b).rstrip(b"=").decode() + + +def _b64u_decode(s: str) -> bytes: + padded = s + "=" * ((4 - len(s) % 4) % 4) + return base64.urlsafe_b64decode(padded) + + +# COSE ES256 key layout matching card_emulator._cose_es256 exactly. +def _cose_es256(x: bytes, y: bytes) -> bytes: + return ( + bytes([0xA5, 0x01, 0x02, 0x03, 0x26, 0x20, 0x01, 0x21, 0x58, 0x20]) + + x + + bytes([0x22, 0x58, 0x20]) + + y + ) + + +def _make_bundle(url: str, method: str, nonce: str) -> tuple[str, object]: + """Return (base64url-token, private_key) for a fresh P-256 assertion. + + Mirrors exactly what proxy_service.dart's _handleAuthGetToken produces. + """ + priv = generate_private_key(SECP256R1(), default_backend()) + pub = priv.public_key().public_numbers() + x = pub.x.to_bytes(32, "big") + y = pub.y.to_bytes(32, "big") + + cose_key = _cose_es256(x, y) + + # AttestedCredentialData: aaguid(16) + credIdLen(2) + credId + coseKey + aaguid = bytes.fromhex("1234567890abcdef0123456789abcdef") + cred_id = os.urandom(16) + cred_data = aaguid + struct.pack(">H", len(cred_id)) + cred_id + cose_key + + # Challenge = SHA256(url|method|nonce) — same as proxy_service.dart + challenge_b64u = _b64u_encode(hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest()) + + cdj = json.dumps( + { + "type": "webauthn.get", + "challenge": challenge_b64u, + "origin": "https://localhost", + "crossOrigin": False, + }, + separators=(",", ":"), + ) + cdj_bytes = cdj.encode() + cdh = hashlib.sha256(cdj_bytes).digest() + + # authData: rpIdHash(32) + flags(1) + signCount(4) + auth_data = hashlib.sha256(b"localhost").digest() + b"\x01" + struct.pack(">I", 1) + + sig = priv.sign(auth_data + cdh, ECDSA(SHA256())) + + bundle = { + "v": 1, + "url": url, + "method": method, + "nonce": nonce, + "authData": _b64u_encode(auth_data), + "sig": _b64u_encode(sig), + "cdj": _b64u_encode(cdj_bytes), + "cred": _b64u_encode(cred_data), + "user": "testuser", + } + return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()), priv + + +def _tamper(token: str, key: str, transform) -> str: + bundle = json.loads(_b64u_decode(token)) + bundle[key] = transform(bundle[key]) + return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()) + + +# --------------------------------------------------------------------------- +# Group 1 — unit tests (cbor2 + cryptography only) +# --------------------------------------------------------------------------- + +@unittest.skipUnless(HAS_CRYPTO, "cbor2 / cryptography not installed") +class TestVerifyAssertionToken(unittest.TestCase): + def setUp(self): + self.url = "https://127.0.0.1:8780/resource/counter" + self.method = "POST" + self.nonce = "deadbeef01234567" + self.token, _ = _make_bundle(self.url, self.method, self.nonce) + + def _check(self, token=None, path="/resource/counter", method="POST") -> bool: + return k_server_app._verify_assertion_token( + self.token if token is None else token, request_path=path, request_method=method + ) + + def test_valid_token_accepted(self): + self.assertTrue(self._check()) + + def test_wrong_path_rejected(self): + self.assertFalse(self._check(path="/resource/other")) + + def test_wrong_method_rejected(self): + self.assertFalse(self._check(method="GET")) + + def test_method_comparison_case_insensitive(self): + self.assertTrue(self._check(method="post")) + self.assertTrue(self._check(method="POST")) + + def test_tampered_nonce_invalidates_challenge(self): + tampered = _tamper(self.token, "nonce", lambda _: "tampered00000000") + self.assertFalse(self._check(tampered)) + + def test_tampered_signature_rejected(self): + def flip_last_byte(b64: str) -> str: + raw = bytearray(_b64u_decode(b64)) + raw[-1] ^= 0xFF + return _b64u_encode(bytes(raw)) + + tampered = _tamper(self.token, "sig", flip_last_byte) + self.assertFalse(self._check(tampered)) + + def test_wrong_public_key_rejected(self): + other = generate_private_key(SECP256R1(), default_backend()) + pub = other.public_key().public_numbers() + x = pub.x.to_bytes(32, "big") + y = pub.y.to_bytes(32, "big") + new_cose = _cose_es256(x, y) + + def swap_key(b64: str) -> str: + orig = _b64u_decode(b64) + cred_id_len = (orig[16] << 8) | orig[17] + return _b64u_encode(orig[:18 + cred_id_len] + new_cose) + + tampered = _tamper(self.token, "cred", swap_key) + self.assertFalse(self._check(tampered)) + + def test_cross_resource_replay_rejected(self): + # Token bound to /resource/counter must not pass for /resource/admin. + self.assertFalse(self._check(path="/resource/admin")) + + def test_malformed_token_returns_false(self): + self.assertFalse(self._check(token="!!!not-base64!!!")) + + def test_empty_token_returns_false(self): + self.assertFalse(self._check(token="")) + + def test_missing_field_returns_false(self): + bundle = json.loads(_b64u_decode(self.token)) + del bundle["sig"] + truncated = _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()) + self.assertFalse(self._check(truncated)) + + def test_cdj_wrong_type_rejected(self): + bundle = json.loads(_b64u_decode(self.token)) + cdj = json.loads(_b64u_decode(bundle["cdj"])) + cdj["type"] = "webauthn.create" # wrong type + bundle["cdj"] = _b64u_encode(json.dumps(cdj, separators=(",", ":")).encode()) + tampered = _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()) + self.assertFalse(self._check(tampered)) + + +# --------------------------------------------------------------------------- +# Group 2 — end-to-end round-trip via CardEmulator (needs fido2) +# --------------------------------------------------------------------------- + +@unittest.skipUnless(HAS_FIDO2, "fido2 not installed") +class TestVerifyAssertionTokenRoundTrip(unittest.TestCase): + """Full round-trip: CardEmulator → assertion bundle → server verification. + + Mirrors the actual k_phone flow: + make_credential (enrollment) → get_assertion (per-request binding) + → bundle as k_server_app.py expects → _verify_assertion_token. + """ + + def _register_and_assert( + self, + emulator: "CardEmulator", + url: str, + method: str, + nonce: str, + ) -> str: + """Return a token string after one register + one assertion.""" + # 1. Register — mirrors makeCredential in fido2_ops.dart + reg_cdh = hashlib.sha256(b"registration-placeholder").digest() + attest = emulator.make_credential( + client_data_hash=reg_cdh, + rp={"id": "localhost", "name": "ChromeCard"}, + user={"id": b"testuid", "name": "alice"}, + key_params=[{"type": "public-key", "alg": -7}], + ) + # AttestedCredentialData = authData[37:] + auth_data_make = bytes(attest.auth_data) + cred_data = auth_data_make[37:] + cred_id_len = (cred_data[16] << 8) | cred_data[17] + cred_id = cred_data[18:18 + cred_id_len] + + # 2. Assert with bound challenge — mirrors _handleAuthGetToken in proxy_service.dart + challenge_b64u = _b64u_encode( + hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest() + ) + cdj = json.dumps( + { + "type": "webauthn.get", + "challenge": challenge_b64u, + "origin": "https://localhost", + "crossOrigin": False, + }, + separators=(",", ":"), + ) + cdj_bytes = cdj.encode() + assertion = emulator.get_assertion( + rp_id="localhost", + client_data_hash=hashlib.sha256(cdj_bytes).digest(), + allow_list=[{"type": "public-key", "id": cred_id}], + ) + + # 3. Encode bundle — same as proxy_service.dart _handleAuthGetToken + bundle = { + "v": 1, + "url": url, + "method": method, + "nonce": nonce, + "authData": _b64u_encode(bytes(assertion.auth_data)), + "sig": _b64u_encode(bytes(assertion.signature)), + "cdj": _b64u_encode(cdj_bytes), + "cred": _b64u_encode(cred_data), + "user": "alice", + } + return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()) + + def test_roundtrip_accepted(self): + emulator = CardEmulator() + token = self._register_and_assert( + emulator, "https://example.com/api/data", "GET", "cafebabe12345678" + ) + self.assertTrue( + k_server_app._verify_assertion_token(token, "/api/data", "GET"), + "valid round-trip token must be accepted", + ) + + def test_roundtrip_wrong_path_rejected(self): + emulator = CardEmulator() + token = self._register_and_assert( + emulator, "https://example.com/api/data", "GET", "aabbccdd11223344" + ) + self.assertFalse( + k_server_app._verify_assertion_token(token, "/api/other", "GET"), + "token for /api/data must not pass for /api/other", + ) + + def test_roundtrip_wrong_method_rejected(self): + emulator = CardEmulator() + token = self._register_and_assert( + emulator, "https://example.com/submit", "POST", "1122334455667788" + ) + self.assertFalse( + k_server_app._verify_assertion_token(token, "/submit", "GET"), + "token for POST must not pass for GET", + ) + + def test_roundtrip_same_token_twice_rejected_after_nonce_tamper(self): + """Changing the nonce in a real assertion bundle breaks verification.""" + emulator = CardEmulator() + token = self._register_and_assert( + emulator, "https://example.com/resource/counter", "POST", "original00000000" + ) + tampered = _tamper(token, "nonce", lambda _: "tampered11111111") + self.assertFalse( + k_server_app._verify_assertion_token(tampered, "/resource/counter", "POST"), + "tampered nonce must break challenge verification", + ) + + def test_roundtrip_replayed_for_different_user_rejected(self): + """Two users register separate credentials; each token is only valid for its own key.""" + em_a = CardEmulator() + em_b = CardEmulator() + url, method, nonce = "https://example.com/protected", "GET", "00112233aabbccdd" + token_a = self._register_and_assert(em_a, url, method, nonce) + token_b = self._register_and_assert(em_b, url, method, nonce) + + # token_a's signature was made with em_a's key — must not verify with em_b's public key. + # Tamper: swap cred (public key) from token_b into token_a's bundle. + bundle_a = json.loads(_b64u_decode(token_a)) + bundle_b = json.loads(_b64u_decode(token_b)) + bundle_a["cred"] = bundle_b["cred"] + cross = _b64u_encode(json.dumps(bundle_a, separators=(",", ":")).encode()) + + self.assertFalse( + k_server_app._verify_assertion_token(cross, "/protected", "GET"), + "cross-user key swap must fail verification", + ) + + +if __name__ == "__main__": + unittest.main(verbosity=2)