Add k_server assertion verification tests + clarify session login comment
tests/test_k_server.py:
- TestVerifyAssertionToken (12 tests): unit tests using raw P-256 keys —
valid accept, wrong path/method, tampered nonce/signature/key, cross-
resource replay, malformed/empty token, wrong cdj type, missing field.
- TestVerifyAssertionTokenRoundTrip (5 tests): end-to-end via CardEmulator
— register, getAssertion with bound challenge, build bundle as k_phone
does, verify on server. Tests include wrong path/method and cross-user
key swap. Skipped automatically if fido2 is not installed.
All 17 pass.
proxy_service.dart: add comment to _handleSessionLogin explaining why
random challenge is correct there (user-presence proof for portal session,
not per-request resource binding).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3bc47deb27
commit
6f08c7eed4
|
|
@ -311,7 +311,11 @@ class _ProxyServer {
|
|||
}
|
||||
|
||||
if (enrollment.hasCredential && _cardCid != null) {
|
||||
// FIDO2-direct: getAssertion + verify
|
||||
// FIDO2-direct: getAssertion + local verify.
|
||||
// Random challenge is intentional here: session login only proves the
|
||||
// user CAN authenticate (user-presence check). The resulting session token
|
||||
// is for portal access. Per-request resource binding (challenge = SHA256
|
||||
// of url|method|nonce) happens in _handleAuthGetToken, not here.
|
||||
GetAssertionResult assertionResult;
|
||||
try {
|
||||
assertionResult = await getAssertion(_cardCid!, enrollment.credentialDataB64!);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,349 @@
|
|||
"""
|
||||
Unit + round-trip tests for k_server_app._verify_assertion_token.
|
||||
|
||||
Run:
|
||||
uv run --python 3.12 --with fido2 --with cbor2 --with cryptography \
|
||||
python3 -m unittest tests/test_k_server.py
|
||||
|
||||
The unit tests (TestVerifyAssertionToken) only need cbor2 + cryptography.
|
||||
The round-trip tests (TestVerifyAssertionTokenRoundTrip) also need fido2
|
||||
(through CardEmulator) — they are skipped automatically if fido2 is absent.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
import k_server_app
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dependency guards
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
try:
|
||||
import cbor2 # noqa: F401
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import (
|
||||
ECDSA,
|
||||
SECP256R1,
|
||||
generate_private_key,
|
||||
)
|
||||
from cryptography.hazmat.primitives.hashes import SHA256
|
||||
HAS_CRYPTO = True
|
||||
except ImportError:
|
||||
HAS_CRYPTO = False
|
||||
|
||||
try:
|
||||
from card_emulator import CardEmulator
|
||||
HAS_FIDO2 = True
|
||||
except ImportError:
|
||||
HAS_FIDO2 = False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _b64u_encode(b: bytes) -> str:
|
||||
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
|
||||
|
||||
|
||||
def _b64u_decode(s: str) -> bytes:
|
||||
padded = s + "=" * ((4 - len(s) % 4) % 4)
|
||||
return base64.urlsafe_b64decode(padded)
|
||||
|
||||
|
||||
# COSE ES256 key layout matching card_emulator._cose_es256 exactly.
|
||||
def _cose_es256(x: bytes, y: bytes) -> bytes:
|
||||
return (
|
||||
bytes([0xA5, 0x01, 0x02, 0x03, 0x26, 0x20, 0x01, 0x21, 0x58, 0x20])
|
||||
+ x
|
||||
+ bytes([0x22, 0x58, 0x20])
|
||||
+ y
|
||||
)
|
||||
|
||||
|
||||
def _make_bundle(url: str, method: str, nonce: str) -> tuple[str, object]:
|
||||
"""Return (base64url-token, private_key) for a fresh P-256 assertion.
|
||||
|
||||
Mirrors exactly what proxy_service.dart's _handleAuthGetToken produces.
|
||||
"""
|
||||
priv = generate_private_key(SECP256R1(), default_backend())
|
||||
pub = priv.public_key().public_numbers()
|
||||
x = pub.x.to_bytes(32, "big")
|
||||
y = pub.y.to_bytes(32, "big")
|
||||
|
||||
cose_key = _cose_es256(x, y)
|
||||
|
||||
# AttestedCredentialData: aaguid(16) + credIdLen(2) + credId + coseKey
|
||||
aaguid = bytes.fromhex("1234567890abcdef0123456789abcdef")
|
||||
cred_id = os.urandom(16)
|
||||
cred_data = aaguid + struct.pack(">H", len(cred_id)) + cred_id + cose_key
|
||||
|
||||
# Challenge = SHA256(url|method|nonce) — same as proxy_service.dart
|
||||
challenge_b64u = _b64u_encode(hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest())
|
||||
|
||||
cdj = json.dumps(
|
||||
{
|
||||
"type": "webauthn.get",
|
||||
"challenge": challenge_b64u,
|
||||
"origin": "https://localhost",
|
||||
"crossOrigin": False,
|
||||
},
|
||||
separators=(",", ":"),
|
||||
)
|
||||
cdj_bytes = cdj.encode()
|
||||
cdh = hashlib.sha256(cdj_bytes).digest()
|
||||
|
||||
# authData: rpIdHash(32) + flags(1) + signCount(4)
|
||||
auth_data = hashlib.sha256(b"localhost").digest() + b"\x01" + struct.pack(">I", 1)
|
||||
|
||||
sig = priv.sign(auth_data + cdh, ECDSA(SHA256()))
|
||||
|
||||
bundle = {
|
||||
"v": 1,
|
||||
"url": url,
|
||||
"method": method,
|
||||
"nonce": nonce,
|
||||
"authData": _b64u_encode(auth_data),
|
||||
"sig": _b64u_encode(sig),
|
||||
"cdj": _b64u_encode(cdj_bytes),
|
||||
"cred": _b64u_encode(cred_data),
|
||||
"user": "testuser",
|
||||
}
|
||||
return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode()), priv
|
||||
|
||||
|
||||
def _tamper(token: str, key: str, transform) -> str:
|
||||
bundle = json.loads(_b64u_decode(token))
|
||||
bundle[key] = transform(bundle[key])
|
||||
return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Group 1 — unit tests (cbor2 + cryptography only)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@unittest.skipUnless(HAS_CRYPTO, "cbor2 / cryptography not installed")
|
||||
class TestVerifyAssertionToken(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.url = "https://127.0.0.1:8780/resource/counter"
|
||||
self.method = "POST"
|
||||
self.nonce = "deadbeef01234567"
|
||||
self.token, _ = _make_bundle(self.url, self.method, self.nonce)
|
||||
|
||||
def _check(self, token=None, path="/resource/counter", method="POST") -> bool:
|
||||
return k_server_app._verify_assertion_token(
|
||||
self.token if token is None else token, request_path=path, request_method=method
|
||||
)
|
||||
|
||||
def test_valid_token_accepted(self):
|
||||
self.assertTrue(self._check())
|
||||
|
||||
def test_wrong_path_rejected(self):
|
||||
self.assertFalse(self._check(path="/resource/other"))
|
||||
|
||||
def test_wrong_method_rejected(self):
|
||||
self.assertFalse(self._check(method="GET"))
|
||||
|
||||
def test_method_comparison_case_insensitive(self):
|
||||
self.assertTrue(self._check(method="post"))
|
||||
self.assertTrue(self._check(method="POST"))
|
||||
|
||||
def test_tampered_nonce_invalidates_challenge(self):
|
||||
tampered = _tamper(self.token, "nonce", lambda _: "tampered00000000")
|
||||
self.assertFalse(self._check(tampered))
|
||||
|
||||
def test_tampered_signature_rejected(self):
|
||||
def flip_last_byte(b64: str) -> str:
|
||||
raw = bytearray(_b64u_decode(b64))
|
||||
raw[-1] ^= 0xFF
|
||||
return _b64u_encode(bytes(raw))
|
||||
|
||||
tampered = _tamper(self.token, "sig", flip_last_byte)
|
||||
self.assertFalse(self._check(tampered))
|
||||
|
||||
def test_wrong_public_key_rejected(self):
|
||||
other = generate_private_key(SECP256R1(), default_backend())
|
||||
pub = other.public_key().public_numbers()
|
||||
x = pub.x.to_bytes(32, "big")
|
||||
y = pub.y.to_bytes(32, "big")
|
||||
new_cose = _cose_es256(x, y)
|
||||
|
||||
def swap_key(b64: str) -> str:
|
||||
orig = _b64u_decode(b64)
|
||||
cred_id_len = (orig[16] << 8) | orig[17]
|
||||
return _b64u_encode(orig[:18 + cred_id_len] + new_cose)
|
||||
|
||||
tampered = _tamper(self.token, "cred", swap_key)
|
||||
self.assertFalse(self._check(tampered))
|
||||
|
||||
def test_cross_resource_replay_rejected(self):
|
||||
# Token bound to /resource/counter must not pass for /resource/admin.
|
||||
self.assertFalse(self._check(path="/resource/admin"))
|
||||
|
||||
def test_malformed_token_returns_false(self):
|
||||
self.assertFalse(self._check(token="!!!not-base64!!!"))
|
||||
|
||||
def test_empty_token_returns_false(self):
|
||||
self.assertFalse(self._check(token=""))
|
||||
|
||||
def test_missing_field_returns_false(self):
|
||||
bundle = json.loads(_b64u_decode(self.token))
|
||||
del bundle["sig"]
|
||||
truncated = _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
||||
self.assertFalse(self._check(truncated))
|
||||
|
||||
def test_cdj_wrong_type_rejected(self):
|
||||
bundle = json.loads(_b64u_decode(self.token))
|
||||
cdj = json.loads(_b64u_decode(bundle["cdj"]))
|
||||
cdj["type"] = "webauthn.create" # wrong type
|
||||
bundle["cdj"] = _b64u_encode(json.dumps(cdj, separators=(",", ":")).encode())
|
||||
tampered = _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
||||
self.assertFalse(self._check(tampered))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Group 2 — end-to-end round-trip via CardEmulator (needs fido2)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@unittest.skipUnless(HAS_FIDO2, "fido2 not installed")
|
||||
class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
||||
"""Full round-trip: CardEmulator → assertion bundle → server verification.
|
||||
|
||||
Mirrors the actual k_phone flow:
|
||||
make_credential (enrollment) → get_assertion (per-request binding)
|
||||
→ bundle as k_server_app.py expects → _verify_assertion_token.
|
||||
"""
|
||||
|
||||
def _register_and_assert(
|
||||
self,
|
||||
emulator: "CardEmulator",
|
||||
url: str,
|
||||
method: str,
|
||||
nonce: str,
|
||||
) -> str:
|
||||
"""Return a token string after one register + one assertion."""
|
||||
# 1. Register — mirrors makeCredential in fido2_ops.dart
|
||||
reg_cdh = hashlib.sha256(b"registration-placeholder").digest()
|
||||
attest = emulator.make_credential(
|
||||
client_data_hash=reg_cdh,
|
||||
rp={"id": "localhost", "name": "ChromeCard"},
|
||||
user={"id": b"testuid", "name": "alice"},
|
||||
key_params=[{"type": "public-key", "alg": -7}],
|
||||
)
|
||||
# AttestedCredentialData = authData[37:]
|
||||
auth_data_make = bytes(attest.auth_data)
|
||||
cred_data = auth_data_make[37:]
|
||||
cred_id_len = (cred_data[16] << 8) | cred_data[17]
|
||||
cred_id = cred_data[18:18 + cred_id_len]
|
||||
|
||||
# 2. Assert with bound challenge — mirrors _handleAuthGetToken in proxy_service.dart
|
||||
challenge_b64u = _b64u_encode(
|
||||
hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest()
|
||||
)
|
||||
cdj = json.dumps(
|
||||
{
|
||||
"type": "webauthn.get",
|
||||
"challenge": challenge_b64u,
|
||||
"origin": "https://localhost",
|
||||
"crossOrigin": False,
|
||||
},
|
||||
separators=(",", ":"),
|
||||
)
|
||||
cdj_bytes = cdj.encode()
|
||||
assertion = emulator.get_assertion(
|
||||
rp_id="localhost",
|
||||
client_data_hash=hashlib.sha256(cdj_bytes).digest(),
|
||||
allow_list=[{"type": "public-key", "id": cred_id}],
|
||||
)
|
||||
|
||||
# 3. Encode bundle — same as proxy_service.dart _handleAuthGetToken
|
||||
bundle = {
|
||||
"v": 1,
|
||||
"url": url,
|
||||
"method": method,
|
||||
"nonce": nonce,
|
||||
"authData": _b64u_encode(bytes(assertion.auth_data)),
|
||||
"sig": _b64u_encode(bytes(assertion.signature)),
|
||||
"cdj": _b64u_encode(cdj_bytes),
|
||||
"cred": _b64u_encode(cred_data),
|
||||
"user": "alice",
|
||||
}
|
||||
return _b64u_encode(json.dumps(bundle, separators=(",", ":")).encode())
|
||||
|
||||
def test_roundtrip_accepted(self):
|
||||
emulator = CardEmulator()
|
||||
token = self._register_and_assert(
|
||||
emulator, "https://example.com/api/data", "GET", "cafebabe12345678"
|
||||
)
|
||||
self.assertTrue(
|
||||
k_server_app._verify_assertion_token(token, "/api/data", "GET"),
|
||||
"valid round-trip token must be accepted",
|
||||
)
|
||||
|
||||
def test_roundtrip_wrong_path_rejected(self):
|
||||
emulator = CardEmulator()
|
||||
token = self._register_and_assert(
|
||||
emulator, "https://example.com/api/data", "GET", "aabbccdd11223344"
|
||||
)
|
||||
self.assertFalse(
|
||||
k_server_app._verify_assertion_token(token, "/api/other", "GET"),
|
||||
"token for /api/data must not pass for /api/other",
|
||||
)
|
||||
|
||||
def test_roundtrip_wrong_method_rejected(self):
|
||||
emulator = CardEmulator()
|
||||
token = self._register_and_assert(
|
||||
emulator, "https://example.com/submit", "POST", "1122334455667788"
|
||||
)
|
||||
self.assertFalse(
|
||||
k_server_app._verify_assertion_token(token, "/submit", "GET"),
|
||||
"token for POST must not pass for GET",
|
||||
)
|
||||
|
||||
def test_roundtrip_same_token_twice_rejected_after_nonce_tamper(self):
|
||||
"""Changing the nonce in a real assertion bundle breaks verification."""
|
||||
emulator = CardEmulator()
|
||||
token = self._register_and_assert(
|
||||
emulator, "https://example.com/resource/counter", "POST", "original00000000"
|
||||
)
|
||||
tampered = _tamper(token, "nonce", lambda _: "tampered11111111")
|
||||
self.assertFalse(
|
||||
k_server_app._verify_assertion_token(tampered, "/resource/counter", "POST"),
|
||||
"tampered nonce must break challenge verification",
|
||||
)
|
||||
|
||||
def test_roundtrip_replayed_for_different_user_rejected(self):
|
||||
"""Two users register separate credentials; each token is only valid for its own key."""
|
||||
em_a = CardEmulator()
|
||||
em_b = CardEmulator()
|
||||
url, method, nonce = "https://example.com/protected", "GET", "00112233aabbccdd"
|
||||
token_a = self._register_and_assert(em_a, url, method, nonce)
|
||||
token_b = self._register_and_assert(em_b, url, method, nonce)
|
||||
|
||||
# token_a's signature was made with em_a's key — must not verify with em_b's public key.
|
||||
# Tamper: swap cred (public key) from token_b into token_a's bundle.
|
||||
bundle_a = json.loads(_b64u_decode(token_a))
|
||||
bundle_b = json.loads(_b64u_decode(token_b))
|
||||
bundle_a["cred"] = bundle_b["cred"]
|
||||
cross = _b64u_encode(json.dumps(bundle_a, separators=(",", ":")).encode())
|
||||
|
||||
self.assertFalse(
|
||||
k_server_app._verify_assertion_token(cross, "/protected", "GET"),
|
||||
"cross-user key swap must fail verification",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
Loading…
Reference in New Issue