335 lines
12 KiB
Python
335 lines
12 KiB
Python
"""
|
|
Unit + round-trip tests for k_server_app._verify_assertion_token.
|
|
|
|
Run:
|
|
uv run --python 3.12 --with fido2 --with cbor2 --with cryptography \
|
|
python3 -m unittest tests/test_k_server.py
|
|
|
|
The unit tests (TestVerifyAssertionToken) only need cbor2 + cryptography.
|
|
The round-trip tests (TestVerifyAssertionTokenRoundTrip) also need fido2
|
|
(through CardEmulator) — they are skipped automatically if fido2 is absent.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import struct
|
|
import sys
|
|
import unittest
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
|
|
import k_server_app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dependency guards
|
|
# ---------------------------------------------------------------------------
|
|
|
|
try:
|
|
import cbor2 # noqa: F401
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric.ec import (
|
|
ECDSA,
|
|
SECP256R1,
|
|
generate_private_key,
|
|
)
|
|
from cryptography.hazmat.primitives.hashes import SHA256
|
|
HAS_CRYPTO = True
|
|
except ImportError:
|
|
HAS_CRYPTO = False
|
|
|
|
try:
|
|
from card_emulator import CardEmulator
|
|
HAS_FIDO2 = True
|
|
except ImportError:
|
|
HAS_FIDO2 = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _b64u_encode(b: bytes) -> str:
|
|
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
|
|
|
|
|
|
def _b64u_decode(s: str) -> bytes:
|
|
padded = s + "=" * ((4 - len(s) % 4) % 4)
|
|
return base64.urlsafe_b64decode(padded)
|
|
|
|
|
|
# COSE ES256 key layout matching card_emulator._cose_es256 exactly.
|
|
def _cose_es256(x: bytes, y: bytes) -> bytes:
|
|
return (
|
|
bytes([0xA5, 0x01, 0x02, 0x03, 0x26, 0x20, 0x01, 0x21, 0x58, 0x20])
|
|
+ x
|
|
+ bytes([0x22, 0x58, 0x20])
|
|
+ y
|
|
)
|
|
|
|
|
|
def _make_bundle(host: str, nonce: str) -> tuple[str, object]:
|
|
"""Return (base64url-token, private_key) for a fresh P-256 assertion.
|
|
|
|
Mirrors exactly what proxy_service.dart's _handleAuthGetToken produces.
|
|
"""
|
|
priv = generate_private_key(SECP256R1(), default_backend())
|
|
pub = priv.public_key().public_numbers()
|
|
x = pub.x.to_bytes(32, "big")
|
|
y = pub.y.to_bytes(32, "big")
|
|
|
|
cose_key = _cose_es256(x, y)
|
|
|
|
# AttestedCredentialData: aaguid(16) + credIdLen(2) + credId + coseKey
|
|
aaguid = bytes.fromhex("1234567890abcdef0123456789abcdef")
|
|
cred_id = os.urandom(16)
|
|
cred_data = aaguid + struct.pack(">H", len(cred_id)) + cred_id + cose_key
|
|
|
|
# Challenge = SHA256(host|nonce) — same as proxy_service.dart
|
|
challenge_b64u = _b64u_encode(hashlib.sha256(f"{host}|{nonce}".encode()).digest())
|
|
|
|
cdj = json.dumps(
|
|
{
|
|
"type": "webauthn.get",
|
|
"challenge": challenge_b64u,
|
|
"origin": "https://localhost",
|
|
"crossOrigin": False,
|
|
},
|
|
separators=(",", ":"),
|
|
)
|
|
cdj_bytes = cdj.encode()
|
|
cdh = hashlib.sha256(cdj_bytes).digest()
|
|
|
|
# authData: rpIdHash(32) + flags(1) + signCount(4)
|
|
auth_data = hashlib.sha256(b"localhost").digest() + b"\x01" + struct.pack(">I", 1)
|
|
|
|
sig = priv.sign(auth_data + cdh, ECDSA(SHA256()))
|
|
|
|
bundle = {
|
|
"v": 1,
|
|
"host": host,
|
|
"nonce": nonce,
|
|
"authData": _b64u_encode(auth_data),
|
|
"sig": _b64u_encode(sig),
|
|
"cdj": _b64u_encode(cdj_bytes),
|
|
"cred": _b64u_encode(cred_data),
|
|
"user": "testuser",
|
|
}
|
|
return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()), priv
|
|
|
|
|
|
def _tamper(token: str, key: str, transform) -> str:
|
|
bundle = json.loads(_b64u_decode(token))
|
|
bundle[key] = transform(bundle[key])
|
|
return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 1 — unit tests (cbor2 + cryptography only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@unittest.skipUnless(HAS_CRYPTO, "cbor2 / cryptography not installed")
|
|
class TestVerifyAssertionToken(unittest.TestCase):
|
|
def setUp(self):
|
|
self.host = "127.0.0.1"
|
|
self.nonce = "deadbeef01234567"
|
|
self.token, _ = _make_bundle(self.host, self.nonce)
|
|
|
|
def _check(self, token=None) -> bool:
|
|
return k_server_app._verify_assertion_token(
|
|
self.token if token is None else token
|
|
)
|
|
|
|
def test_valid_token_accepted(self):
|
|
self.assertTrue(self._check())
|
|
|
|
def test_token_valid_for_any_path_on_host(self):
|
|
# Domain-level binding: the same token covers all paths on the host.
|
|
self.assertTrue(self._check())
|
|
|
|
def test_tampered_nonce_invalidates_challenge(self):
|
|
tampered = _tamper(self.token, "nonce", lambda _: "tampered00000000")
|
|
self.assertFalse(self._check(tampered))
|
|
|
|
def test_tampered_host_invalidates_challenge(self):
|
|
tampered = _tamper(self.token, "host", lambda _: "attacker.com")
|
|
self.assertFalse(self._check(tampered))
|
|
|
|
def test_tampered_signature_rejected(self):
|
|
def flip_last_byte(b64: str) -> str:
|
|
raw = bytearray(_b64u_decode(b64))
|
|
raw[-1] ^= 0xFF
|
|
return _b64u_encode(bytes(raw))
|
|
|
|
tampered = _tamper(self.token, "sig", flip_last_byte)
|
|
self.assertFalse(self._check(tampered))
|
|
|
|
def test_wrong_public_key_rejected(self):
|
|
other = generate_private_key(SECP256R1(), default_backend())
|
|
pub = other.public_key().public_numbers()
|
|
x = pub.x.to_bytes(32, "big")
|
|
y = pub.y.to_bytes(32, "big")
|
|
new_cose = _cose_es256(x, y)
|
|
|
|
def swap_key(b64: str) -> str:
|
|
orig = _b64u_decode(b64)
|
|
cred_id_len = (orig[16] << 8) | orig[17]
|
|
return _b64u_encode(orig[:18 + cred_id_len] + new_cose)
|
|
|
|
tampered = _tamper(self.token, "cred", swap_key)
|
|
self.assertFalse(self._check(tampered))
|
|
|
|
def test_malformed_token_returns_false(self):
|
|
self.assertFalse(self._check(token="!!!not-base64!!!"))
|
|
|
|
def test_empty_token_returns_false(self):
|
|
self.assertFalse(self._check(token=""))
|
|
|
|
def test_missing_field_returns_false(self):
|
|
bundle = json.loads(_b64u_decode(self.token))
|
|
del bundle["sig"]
|
|
truncated = _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
|
self.assertFalse(self._check(truncated))
|
|
|
|
def test_cdj_wrong_type_rejected(self):
|
|
bundle = json.loads(_b64u_decode(self.token))
|
|
cdj = json.loads(_b64u_decode(bundle["cdj"]))
|
|
cdj["type"] = "webauthn.create" # wrong type
|
|
bundle["cdj"] = _b64u_encode(json.dumps(cdj, separators=(",", ":")).encode())
|
|
tampered = _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
|
self.assertFalse(self._check(tampered))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 2 — end-to-end round-trip via CardEmulator (needs fido2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@unittest.skipUnless(HAS_FIDO2, "fido2 not installed")
|
|
class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
|
"""Full round-trip: CardEmulator → assertion bundle → server verification.
|
|
|
|
Mirrors the actual k_phone flow:
|
|
make_credential (enrollment) → get_assertion (per-request binding)
|
|
→ bundle as k_server_app.py expects → _verify_assertion_token.
|
|
"""
|
|
|
|
def _register_and_assert(
|
|
self,
|
|
emulator: "CardEmulator",
|
|
host: str,
|
|
nonce: str,
|
|
) -> str:
|
|
"""Return a token string after one register + one assertion."""
|
|
# 1. Register — mirrors makeCredential in fido2_ops.dart
|
|
reg_cdh = hashlib.sha256(b"registration-placeholder").digest()
|
|
attest = emulator.make_credential(
|
|
client_data_hash=reg_cdh,
|
|
rp={"id": "localhost", "name": "ChromeCard"},
|
|
user={"id": b"testuid", "name": "alice"},
|
|
key_params=[{"type": "public-key", "alg": -7}],
|
|
)
|
|
# AttestedCredentialData = authData[37:]
|
|
auth_data_make = bytes(attest.auth_data)
|
|
cred_data = auth_data_make[37:]
|
|
cred_id_len = (cred_data[16] << 8) | cred_data[17]
|
|
cred_id = cred_data[18:18 + cred_id_len]
|
|
|
|
# 2. Assert with domain-level challenge — mirrors _handleAuthGetToken in proxy_service.dart
|
|
challenge_b64u = _b64u_encode(
|
|
hashlib.sha256(f"{host}|{nonce}".encode()).digest()
|
|
)
|
|
cdj = json.dumps(
|
|
{
|
|
"type": "webauthn.get",
|
|
"challenge": challenge_b64u,
|
|
"origin": "https://localhost",
|
|
"crossOrigin": False,
|
|
},
|
|
separators=(",", ":"),
|
|
)
|
|
cdj_bytes = cdj.encode()
|
|
assertion = emulator.get_assertion(
|
|
rp_id="localhost",
|
|
client_data_hash=hashlib.sha256(cdj_bytes).digest(),
|
|
allow_list=[{"type": "public-key", "id": cred_id}],
|
|
)
|
|
|
|
# 3. Encode bundle — same as proxy_service.dart _handleAuthGetToken
|
|
bundle = {
|
|
"v": 1,
|
|
"host": host,
|
|
"nonce": nonce,
|
|
"authData": _b64u_encode(bytes(assertion.auth_data)),
|
|
"sig": _b64u_encode(bytes(assertion.signature)),
|
|
"cdj": _b64u_encode(cdj_bytes),
|
|
"cred": _b64u_encode(cred_data),
|
|
"user": "alice",
|
|
}
|
|
return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
|
|
|
def test_roundtrip_accepted(self):
|
|
emulator = CardEmulator()
|
|
token = self._register_and_assert(emulator, "example.com", "cafebabe12345678")
|
|
self.assertTrue(
|
|
k_server_app._verify_assertion_token(token),
|
|
"valid round-trip token must be accepted",
|
|
)
|
|
|
|
def test_roundtrip_valid_for_any_path(self):
|
|
"""Domain-level token: accepted regardless of which path is requested."""
|
|
emulator = CardEmulator()
|
|
token = self._register_and_assert(emulator, "example.com", "aabbccdd11223344")
|
|
self.assertTrue(
|
|
k_server_app._verify_assertion_token(token),
|
|
"domain-level token must be accepted for any path on the host",
|
|
)
|
|
|
|
def test_roundtrip_same_token_tampered_nonce_rejected(self):
|
|
"""Changing the nonce in a real assertion bundle breaks verification."""
|
|
emulator = CardEmulator()
|
|
token = self._register_and_assert(emulator, "example.com", "original00000000")
|
|
tampered = _tamper(token, "nonce", lambda _: "tampered11111111")
|
|
self.assertFalse(
|
|
k_server_app._verify_assertion_token(tampered),
|
|
"tampered nonce must break challenge verification",
|
|
)
|
|
|
|
def test_roundtrip_tampered_host_rejected(self):
|
|
"""Changing the host in the bundle breaks challenge verification."""
|
|
emulator = CardEmulator()
|
|
token = self._register_and_assert(emulator, "example.com", "deadbeef00112233")
|
|
tampered = _tamper(token, "host", lambda _: "attacker.com")
|
|
self.assertFalse(
|
|
k_server_app._verify_assertion_token(tampered),
|
|
"tampered host must break challenge verification",
|
|
)
|
|
|
|
def test_roundtrip_replayed_for_different_user_rejected(self):
|
|
"""Two users register separate credentials; each token is only valid for its own key."""
|
|
em_a = CardEmulator()
|
|
em_b = CardEmulator()
|
|
host, nonce = "example.com", "00112233aabbccdd"
|
|
token_a = self._register_and_assert(em_a, host, nonce)
|
|
token_b = self._register_and_assert(em_b, host, nonce)
|
|
|
|
# token_a's signature was made with em_a's key — must not verify with em_b's public key.
|
|
# Tamper: swap cred (public key) from token_b into token_a's bundle.
|
|
bundle_a = json.loads(_b64u_decode(token_a))
|
|
bundle_b = json.loads(_b64u_decode(token_b))
|
|
bundle_a["cred"] = bundle_b["cred"]
|
|
cross = _b64u_encode(json.dumps(bundle_a, separators=(",", ":")).encode())
|
|
|
|
self.assertFalse(
|
|
k_server_app._verify_assertion_token(cross),
|
|
"cross-user key swap must fail verification",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|