Compare commits
2 Commits
139698cab5
...
592af0c314
| Author | SHA1 | Date |
|---|---|---|
|
|
592af0c314 | |
|
|
4b719a0846 |
|
|
@ -406,7 +406,7 @@ class FilterProxy {
|
||||||
) async {
|
) async {
|
||||||
String token;
|
String token;
|
||||||
try {
|
try {
|
||||||
token = await _getAuthToken(uri, method);
|
token = await _getAuthToken(uri);
|
||||||
} catch (_) {
|
} catch (_) {
|
||||||
_deny(client, sub, 407, 'Proxy Authentication Required');
|
_deny(client, sub, 407, 'Proxy Authentication Required');
|
||||||
return;
|
return;
|
||||||
|
|
@ -450,14 +450,13 @@ class FilterProxy {
|
||||||
await _forwardHttpRequest(client, sub, upstream, out.toString(), remainder, contentLength);
|
await _forwardHttpRequest(client, sub, upstream, out.toString(), remainder, contentLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calls POST /auth/get-token on Component 2 with per-request binding and
|
// Calls POST /auth/get-token on Component 2 with domain-level binding and
|
||||||
// returns the bearer token (a self-contained FIDO2 assertion bundle).
|
// returns the bearer token (a self-contained FIDO2 assertion bundle).
|
||||||
// Throws if card is unavailable or Component 2 is unreachable.
|
// Throws if card is unavailable or Component 2 is unreachable.
|
||||||
Future<String> _getAuthToken(Uri uri, String method) async {
|
Future<String> _getAuthToken(Uri uri) async {
|
||||||
final nonce = _secureHex(16);
|
final nonce = _secureHex(16);
|
||||||
final payload = utf8.encode(jsonEncode({
|
final payload = utf8.encode(jsonEncode({
|
||||||
'url': uri.toString(),
|
'host': uri.host,
|
||||||
'method': method,
|
|
||||||
'nonce': nonce,
|
'nonce': nonce,
|
||||||
}));
|
}));
|
||||||
final httpClient = HttpClient()
|
final httpClient = HttpClient()
|
||||||
|
|
|
||||||
|
|
@ -448,11 +448,11 @@ class _ProxyServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Auth token endpoint (v2 architecture — per-request token binding)
|
// Auth token endpoint (v2 architecture — domain-level token binding)
|
||||||
//
|
//
|
||||||
// Component 1 (filter_proxy) and Component 3 (Go binary) call this with
|
// Component 1 (filter_proxy) and Component 3 (Go binary) call this with
|
||||||
// {url, method, nonce} for each gated request. A fresh FIDO2 assertion is
|
// {host, nonce} for each gated request. A fresh FIDO2 assertion is
|
||||||
// produced with challenge = SHA256(url|method|nonce). The self-contained
|
// produced with challenge = SHA256(host|nonce). The self-contained
|
||||||
// assertion bundle is returned as a base64url Bearer token the server can
|
// assertion bundle is returned as a base64url Bearer token the server can
|
||||||
// verify without calling back to this service.
|
// verify without calling back to this service.
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
@ -461,12 +461,11 @@ class _ProxyServer {
|
||||||
final body = await _readJson(req);
|
final body = await _readJson(req);
|
||||||
if (body == null) return;
|
if (body == null) return;
|
||||||
|
|
||||||
final url = body['url'] as String? ?? '';
|
final host = body['host'] as String? ?? '';
|
||||||
final method = body['method'] as String? ?? '';
|
|
||||||
final nonce = body['nonce'] as String? ?? '';
|
final nonce = body['nonce'] as String? ?? '';
|
||||||
|
|
||||||
if (url.isEmpty || method.isEmpty || nonce.isEmpty) {
|
if (host.isEmpty || nonce.isEmpty) {
|
||||||
await _send(req.response, 400, {'ok': false, 'error': 'url, method, nonce required'});
|
await _send(req.response, 400, {'ok': false, 'error': 'host, nonce required'});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -487,9 +486,9 @@ class _ProxyServer {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Challenge = SHA256(url | "|" | method | "|" | nonce)
|
// Challenge = SHA256(host | "|" | nonce)
|
||||||
final challenge = Uint8List.fromList(
|
final challenge = Uint8List.fromList(
|
||||||
sha256.convert(utf8.encode('$url|$method|$nonce')).bytes,
|
sha256.convert(utf8.encode('$host|$nonce')).bytes,
|
||||||
);
|
);
|
||||||
|
|
||||||
GetAssertionResult assertionResult;
|
GetAssertionResult assertionResult;
|
||||||
|
|
@ -503,8 +502,7 @@ class _ProxyServer {
|
||||||
// Self-contained bundle the server can verify without calling back to the phone.
|
// Self-contained bundle the server can verify without calling back to the phone.
|
||||||
final bundleJson = jsonEncode({
|
final bundleJson = jsonEncode({
|
||||||
'v': 1,
|
'v': 1,
|
||||||
'url': url,
|
'host': host,
|
||||||
'method': method,
|
|
||||||
'nonce': nonce,
|
'nonce': nonce,
|
||||||
'authData': base64Url.encode(assertionResult.authData).replaceAll('=', ''),
|
'authData': base64Url.encode(assertionResult.authData).replaceAll('=', ''),
|
||||||
'sig': base64Url.encode(assertionResult.signature).replaceAll('=', ''),
|
'sig': base64Url.encode(assertionResult.signature).replaceAll('=', ''),
|
||||||
|
|
|
||||||
|
|
@ -250,7 +250,7 @@ void main() {
|
||||||
expect(req.uri.path, '/auth/get-token');
|
expect(req.uri.path, '/auth/get-token');
|
||||||
});
|
});
|
||||||
|
|
||||||
test('gated host: /auth/get-token body carries url, method, nonce', () async {
|
test('gated host: /auth/get-token body carries host, nonce', () async {
|
||||||
await _round(
|
await _round(
|
||||||
proxy.port,
|
proxy.port,
|
||||||
'GET http://127.0.0.1:${endpoint.port}/api?x=1 HTTP/1.1\r\n'
|
'GET http://127.0.0.1:${endpoint.port}/api?x=1 HTTP/1.1\r\n'
|
||||||
|
|
@ -259,13 +259,14 @@ void main() {
|
||||||
final (:req, :rawBody) = await comp2TokenReq.future.timeout(_kTimeout);
|
final (:req, :rawBody) = await comp2TokenReq.future.timeout(_kTimeout);
|
||||||
expect(req.uri.path, '/auth/get-token');
|
expect(req.uri.path, '/auth/get-token');
|
||||||
final body = jsonDecode(rawBody) as Map<String, dynamic>;
|
final body = jsonDecode(rawBody) as Map<String, dynamic>;
|
||||||
expect(body['url'], contains('/api?x=1'));
|
expect(body['host'], '127.0.0.1');
|
||||||
expect(body['method'], 'GET');
|
expect(body.containsKey('url'), isFalse);
|
||||||
|
expect(body.containsKey('method'), isFalse);
|
||||||
expect(body['nonce'], isA<String>());
|
expect(body['nonce'], isA<String>());
|
||||||
expect((body['nonce'] as String).length, greaterThan(8));
|
expect((body['nonce'] as String).length, greaterThan(8));
|
||||||
});
|
});
|
||||||
|
|
||||||
test('gated POST: /auth/get-token body carries method POST', () async {
|
test('gated POST: /auth/get-token body carries host, nonce (no method)', () async {
|
||||||
const postBody = '{"key":"val"}';
|
const postBody = '{"key":"val"}';
|
||||||
await _round(
|
await _round(
|
||||||
proxy.port,
|
proxy.port,
|
||||||
|
|
@ -278,8 +279,9 @@ void main() {
|
||||||
final (:req, :rawBody) = await comp2TokenReq.future.timeout(_kTimeout);
|
final (:req, :rawBody) = await comp2TokenReq.future.timeout(_kTimeout);
|
||||||
expect(req.uri.path, '/auth/get-token');
|
expect(req.uri.path, '/auth/get-token');
|
||||||
final body = jsonDecode(rawBody) as Map<String, dynamic>;
|
final body = jsonDecode(rawBody) as Map<String, dynamic>;
|
||||||
expect(body['method'], 'POST');
|
expect(body['host'], '127.0.0.1');
|
||||||
expect(body['url'], contains('/submit'));
|
expect(body.containsKey('method'), isFalse);
|
||||||
|
expect(body['nonce'], isA<String>());
|
||||||
});
|
});
|
||||||
|
|
||||||
test('gated host: request goes directly to endpoint with Bearer token', () async {
|
test('gated host: request goes directly to endpoint with Bearer token', () async {
|
||||||
|
|
|
||||||
|
|
@ -26,19 +26,21 @@ def _b64u_decode(s: str) -> bytes:
|
||||||
return base64.urlsafe_b64decode(padded)
|
return base64.urlsafe_b64decode(padded)
|
||||||
|
|
||||||
|
|
||||||
def _verify_assertion_token(token: str, request_path: str, request_method: str) -> bool:
|
def _verify_assertion_token(token: str, expected_host: str) -> bool:
|
||||||
"""Verify a base64url-encoded FIDO2 per-request assertion bundle.
|
"""Verify a base64url-encoded FIDO2 domain-level assertion bundle.
|
||||||
|
|
||||||
Bundle fields (JSON, then base64url-encoded):
|
Bundle fields (JSON, then base64url-encoded):
|
||||||
v version (1)
|
v version (1)
|
||||||
url full URL used to derive the challenge
|
host hostname used to derive the challenge
|
||||||
method HTTP method used to derive the challenge
|
|
||||||
nonce random hex nonce used to derive the challenge
|
nonce random hex nonce used to derive the challenge
|
||||||
authData base64url authenticator data
|
authData base64url authenticator data
|
||||||
sig base64url ECDSA signature
|
sig base64url ECDSA signature
|
||||||
cdj base64url clientDataJson bytes
|
cdj base64url clientDataJson bytes
|
||||||
cred base64url AttestedCredentialData (aaguid+credIdLen+credId+coseKey)
|
cred base64url AttestedCredentialData (aaguid+credIdLen+credId+coseKey)
|
||||||
user enrolled username (informational)
|
user enrolled username (informational)
|
||||||
|
|
||||||
|
expected_host must match bundle["host"] exactly (case-insensitive) to prevent
|
||||||
|
cross-server replay: a token issued for server-a must not pass on server-b.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
import cbor2
|
import cbor2
|
||||||
|
|
@ -53,19 +55,14 @@ def _verify_assertion_token(token: str, request_path: str, request_method: str)
|
||||||
|
|
||||||
bundle = json.loads(_b64u_decode(token).decode("utf-8"))
|
bundle = json.loads(_b64u_decode(token).decode("utf-8"))
|
||||||
|
|
||||||
# Path and method must match the actual request.
|
host = bundle["host"]
|
||||||
bundle_path = urlparse(bundle["url"]).path
|
|
||||||
if bundle_path != request_path:
|
|
||||||
return False
|
|
||||||
if bundle["method"].upper() != request_method.upper():
|
|
||||||
return False
|
|
||||||
|
|
||||||
url = bundle["url"]
|
|
||||||
method = bundle["method"]
|
|
||||||
nonce = bundle["nonce"]
|
nonce = bundle["nonce"]
|
||||||
|
|
||||||
# Verify challenge claim: challenge == b64u(SHA256(url|method|nonce))
|
if host.lower() != expected_host.lower():
|
||||||
binding = f"{url}|{method}|{nonce}".encode()
|
return False
|
||||||
|
|
||||||
|
# Verify challenge claim: challenge == b64u(SHA256(host|nonce))
|
||||||
|
binding = f"{host}|{nonce}".encode()
|
||||||
expected_challenge = base64.urlsafe_b64encode(hashlib.sha256(binding).digest()).rstrip(b"=").decode()
|
expected_challenge = base64.urlsafe_b64encode(hashlib.sha256(binding).digest()).rstrip(b"=").decode()
|
||||||
|
|
||||||
cdj_bytes = _b64u_decode(bundle["cdj"])
|
cdj_bytes = _b64u_decode(bundle["cdj"])
|
||||||
|
|
@ -103,8 +100,9 @@ def _verify_assertion_token(token: str, request_path: str, request_method: str)
|
||||||
|
|
||||||
class ServerState:
|
class ServerState:
|
||||||
# All state is process-local; a restart resets the counter to zero.
|
# All state is process-local; a restart resets the counter to zero.
|
||||||
def __init__(self, proxy_token: str):
|
def __init__(self, proxy_token: str, protected_host: str = "127.0.0.1"):
|
||||||
self.proxy_token = proxy_token
|
self.proxy_token = proxy_token
|
||||||
|
self.protected_host = protected_host
|
||||||
self.counter = 0
|
self.counter = 0
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
|
@ -139,11 +137,7 @@ class Handler(BaseHTTPRequestHandler):
|
||||||
return True
|
return True
|
||||||
auth = self.headers.get("Authorization", "")
|
auth = self.headers.get("Authorization", "")
|
||||||
if auth.startswith("Bearer "):
|
if auth.startswith("Bearer "):
|
||||||
return _verify_assertion_token(
|
return _verify_assertion_token(auth[7:].strip(), self.state.protected_host)
|
||||||
auth[7:].strip(),
|
|
||||||
request_path=urlparse(self.path).path,
|
|
||||||
request_method=self.command,
|
|
||||||
)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def do_GET(self) -> None: # noqa: N802
|
def do_GET(self) -> None: # noqa: N802
|
||||||
|
|
@ -193,6 +187,11 @@ def parse_args() -> argparse.Namespace:
|
||||||
default="dev-proxy-token",
|
default="dev-proxy-token",
|
||||||
help="Shared token expected in X-Proxy-Token from k_proxy",
|
help="Shared token expected in X-Proxy-Token from k_proxy",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--protected-host",
|
||||||
|
default="127.0.0.1",
|
||||||
|
help="Hostname this server protects; Bearer tokens must be issued for this host",
|
||||||
|
)
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -201,7 +200,7 @@ def main() -> int:
|
||||||
if bool(args.tls_certfile) != bool(args.tls_keyfile):
|
if bool(args.tls_certfile) != bool(args.tls_keyfile):
|
||||||
raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS")
|
raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS")
|
||||||
|
|
||||||
state = ServerState(proxy_token=args.proxy_token)
|
state = ServerState(proxy_token=args.proxy_token, protected_host=args.protected_host)
|
||||||
Handler.state = state
|
Handler.state = state
|
||||||
server = ThreadingHTTPServer((args.host, args.port), Handler)
|
server = ThreadingHTTPServer((args.host, args.port), Handler)
|
||||||
scheme = "http"
|
scheme = "http"
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,7 @@ def _cose_es256(x: bytes, y: bytes) -> bytes:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _make_bundle(url: str, method: str, nonce: str) -> tuple[str, object]:
|
def _make_bundle(host: str, nonce: str) -> tuple[str, object]:
|
||||||
"""Return (base64url-token, private_key) for a fresh P-256 assertion.
|
"""Return (base64url-token, private_key) for a fresh P-256 assertion.
|
||||||
|
|
||||||
Mirrors exactly what proxy_service.dart's _handleAuthGetToken produces.
|
Mirrors exactly what proxy_service.dart's _handleAuthGetToken produces.
|
||||||
|
|
@ -90,8 +90,8 @@ def _make_bundle(url: str, method: str, nonce: str) -> tuple[str, object]:
|
||||||
cred_id = os.urandom(16)
|
cred_id = os.urandom(16)
|
||||||
cred_data = aaguid + struct.pack(">H", len(cred_id)) + cred_id + cose_key
|
cred_data = aaguid + struct.pack(">H", len(cred_id)) + cred_id + cose_key
|
||||||
|
|
||||||
# Challenge = SHA256(url|method|nonce) — same as proxy_service.dart
|
# Challenge = SHA256(host|nonce) — same as proxy_service.dart
|
||||||
challenge_b64u = _b64u_encode(hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest())
|
challenge_b64u = _b64u_encode(hashlib.sha256(f"{host}|{nonce}".encode()).digest())
|
||||||
|
|
||||||
cdj = json.dumps(
|
cdj = json.dumps(
|
||||||
{
|
{
|
||||||
|
|
@ -112,8 +112,7 @@ def _make_bundle(url: str, method: str, nonce: str) -> tuple[str, object]:
|
||||||
|
|
||||||
bundle = {
|
bundle = {
|
||||||
"v": 1,
|
"v": 1,
|
||||||
"url": url,
|
"host": host,
|
||||||
"method": method,
|
|
||||||
"nonce": nonce,
|
"nonce": nonce,
|
||||||
"authData": _b64u_encode(auth_data),
|
"authData": _b64u_encode(auth_data),
|
||||||
"sig": _b64u_encode(sig),
|
"sig": _b64u_encode(sig),
|
||||||
|
|
@ -137,33 +136,40 @@ def _tamper(token: str, key: str, transform) -> str:
|
||||||
@unittest.skipUnless(HAS_CRYPTO, "cbor2 / cryptography not installed")
|
@unittest.skipUnless(HAS_CRYPTO, "cbor2 / cryptography not installed")
|
||||||
class TestVerifyAssertionToken(unittest.TestCase):
|
class TestVerifyAssertionToken(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.url = "https://127.0.0.1:8780/resource/counter"
|
self.host = "127.0.0.1"
|
||||||
self.method = "POST"
|
|
||||||
self.nonce = "deadbeef01234567"
|
self.nonce = "deadbeef01234567"
|
||||||
self.token, _ = _make_bundle(self.url, self.method, self.nonce)
|
self.token, _ = _make_bundle(self.host, self.nonce)
|
||||||
|
|
||||||
def _check(self, token=None, path="/resource/counter", method="POST") -> bool:
|
def _check(self, token=None, host=None) -> bool:
|
||||||
return k_server_app._verify_assertion_token(
|
return k_server_app._verify_assertion_token(
|
||||||
self.token if token is None else token, request_path=path, request_method=method
|
self.token if token is None else token,
|
||||||
|
host if host is not None else self.host,
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_valid_token_accepted(self):
|
def test_valid_token_accepted(self):
|
||||||
self.assertTrue(self._check())
|
self.assertTrue(self._check())
|
||||||
|
|
||||||
def test_wrong_path_rejected(self):
|
def test_token_valid_for_any_path_on_host(self):
|
||||||
self.assertFalse(self._check(path="/resource/other"))
|
# Domain-level binding: the same token covers all paths on the host.
|
||||||
|
self.assertTrue(self._check())
|
||||||
|
|
||||||
def test_wrong_method_rejected(self):
|
def test_cross_server_replay_rejected(self):
|
||||||
self.assertFalse(self._check(method="GET"))
|
# Token issued for self.host must not pass when a different server verifies it.
|
||||||
|
self.assertFalse(self._check(host="other-server.com"))
|
||||||
|
|
||||||
def test_method_comparison_case_insensitive(self):
|
def test_cross_server_replay_case_insensitive(self):
|
||||||
self.assertTrue(self._check(method="post"))
|
# Case variation of the expected host still rejects a token for a different host.
|
||||||
self.assertTrue(self._check(method="POST"))
|
token_b, _ = _make_bundle("BANK.com", self.nonce)
|
||||||
|
self.assertFalse(k_server_app._verify_assertion_token(token_b, "evil.com"))
|
||||||
|
|
||||||
def test_tampered_nonce_invalidates_challenge(self):
|
def test_tampered_nonce_invalidates_challenge(self):
|
||||||
tampered = _tamper(self.token, "nonce", lambda _: "tampered00000000")
|
tampered = _tamper(self.token, "nonce", lambda _: "tampered00000000")
|
||||||
self.assertFalse(self._check(tampered))
|
self.assertFalse(self._check(tampered))
|
||||||
|
|
||||||
|
def test_tampered_host_invalidates_challenge(self):
|
||||||
|
tampered = _tamper(self.token, "host", lambda _: "attacker.com")
|
||||||
|
self.assertFalse(self._check(tampered))
|
||||||
|
|
||||||
def test_tampered_signature_rejected(self):
|
def test_tampered_signature_rejected(self):
|
||||||
def flip_last_byte(b64: str) -> str:
|
def flip_last_byte(b64: str) -> str:
|
||||||
raw = bytearray(_b64u_decode(b64))
|
raw = bytearray(_b64u_decode(b64))
|
||||||
|
|
@ -188,10 +194,6 @@ class TestVerifyAssertionToken(unittest.TestCase):
|
||||||
tampered = _tamper(self.token, "cred", swap_key)
|
tampered = _tamper(self.token, "cred", swap_key)
|
||||||
self.assertFalse(self._check(tampered))
|
self.assertFalse(self._check(tampered))
|
||||||
|
|
||||||
def test_cross_resource_replay_rejected(self):
|
|
||||||
# Token bound to /resource/counter must not pass for /resource/admin.
|
|
||||||
self.assertFalse(self._check(path="/resource/admin"))
|
|
||||||
|
|
||||||
def test_malformed_token_returns_false(self):
|
def test_malformed_token_returns_false(self):
|
||||||
self.assertFalse(self._check(token="!!!not-base64!!!"))
|
self.assertFalse(self._check(token="!!!not-base64!!!"))
|
||||||
|
|
||||||
|
|
@ -229,8 +231,7 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
||||||
def _register_and_assert(
|
def _register_and_assert(
|
||||||
self,
|
self,
|
||||||
emulator: "CardEmulator",
|
emulator: "CardEmulator",
|
||||||
url: str,
|
host: str,
|
||||||
method: str,
|
|
||||||
nonce: str,
|
nonce: str,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Return a token string after one register + one assertion."""
|
"""Return a token string after one register + one assertion."""
|
||||||
|
|
@ -248,9 +249,9 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
||||||
cred_id_len = (cred_data[16] << 8) | cred_data[17]
|
cred_id_len = (cred_data[16] << 8) | cred_data[17]
|
||||||
cred_id = cred_data[18:18 + cred_id_len]
|
cred_id = cred_data[18:18 + cred_id_len]
|
||||||
|
|
||||||
# 2. Assert with bound challenge — mirrors _handleAuthGetToken in proxy_service.dart
|
# 2. Assert with domain-level challenge — mirrors _handleAuthGetToken in proxy_service.dart
|
||||||
challenge_b64u = _b64u_encode(
|
challenge_b64u = _b64u_encode(
|
||||||
hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest()
|
hashlib.sha256(f"{host}|{nonce}".encode()).digest()
|
||||||
)
|
)
|
||||||
cdj = json.dumps(
|
cdj = json.dumps(
|
||||||
{
|
{
|
||||||
|
|
@ -271,8 +272,7 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
||||||
# 3. Encode bundle — same as proxy_service.dart _handleAuthGetToken
|
# 3. Encode bundle — same as proxy_service.dart _handleAuthGetToken
|
||||||
bundle = {
|
bundle = {
|
||||||
"v": 1,
|
"v": 1,
|
||||||
"url": url,
|
"host": host,
|
||||||
"method": method,
|
|
||||||
"nonce": nonce,
|
"nonce": nonce,
|
||||||
"authData": _b64u_encode(bytes(assertion.auth_data)),
|
"authData": _b64u_encode(bytes(assertion.auth_data)),
|
||||||
"sig": _b64u_encode(bytes(assertion.signature)),
|
"sig": _b64u_encode(bytes(assertion.signature)),
|
||||||
|
|
@ -284,53 +284,66 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
||||||
|
|
||||||
def test_roundtrip_accepted(self):
|
def test_roundtrip_accepted(self):
|
||||||
emulator = CardEmulator()
|
emulator = CardEmulator()
|
||||||
token = self._register_and_assert(
|
token = self._register_and_assert(emulator, "example.com", "cafebabe12345678")
|
||||||
emulator, "https://example.com/api/data", "GET", "cafebabe12345678"
|
|
||||||
)
|
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
k_server_app._verify_assertion_token(token, "/api/data", "GET"),
|
k_server_app._verify_assertion_token(token, "example.com"),
|
||||||
"valid round-trip token must be accepted",
|
"valid round-trip token must be accepted",
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_roundtrip_wrong_path_rejected(self):
|
def test_roundtrip_valid_for_any_path(self):
|
||||||
|
"""Domain-level token: accepted regardless of which path is requested."""
|
||||||
emulator = CardEmulator()
|
emulator = CardEmulator()
|
||||||
token = self._register_and_assert(
|
token = self._register_and_assert(emulator, "example.com", "aabbccdd11223344")
|
||||||
emulator, "https://example.com/api/data", "GET", "aabbccdd11223344"
|
self.assertTrue(
|
||||||
)
|
k_server_app._verify_assertion_token(token, "example.com"),
|
||||||
self.assertFalse(
|
"domain-level token must be accepted for any path on the host",
|
||||||
k_server_app._verify_assertion_token(token, "/api/other", "GET"),
|
|
||||||
"token for /api/data must not pass for /api/other",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_roundtrip_wrong_method_rejected(self):
|
def test_roundtrip_same_token_tampered_nonce_rejected(self):
|
||||||
emulator = CardEmulator()
|
|
||||||
token = self._register_and_assert(
|
|
||||||
emulator, "https://example.com/submit", "POST", "1122334455667788"
|
|
||||||
)
|
|
||||||
self.assertFalse(
|
|
||||||
k_server_app._verify_assertion_token(token, "/submit", "GET"),
|
|
||||||
"token for POST must not pass for GET",
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_roundtrip_same_token_twice_rejected_after_nonce_tamper(self):
|
|
||||||
"""Changing the nonce in a real assertion bundle breaks verification."""
|
"""Changing the nonce in a real assertion bundle breaks verification."""
|
||||||
emulator = CardEmulator()
|
emulator = CardEmulator()
|
||||||
token = self._register_and_assert(
|
token = self._register_and_assert(emulator, "example.com", "original00000000")
|
||||||
emulator, "https://example.com/resource/counter", "POST", "original00000000"
|
|
||||||
)
|
|
||||||
tampered = _tamper(token, "nonce", lambda _: "tampered11111111")
|
tampered = _tamper(token, "nonce", lambda _: "tampered11111111")
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
k_server_app._verify_assertion_token(tampered, "/resource/counter", "POST"),
|
k_server_app._verify_assertion_token(tampered, "example.com"),
|
||||||
"tampered nonce must break challenge verification",
|
"tampered nonce must break challenge verification",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_roundtrip_tampered_host_rejected(self):
|
||||||
|
"""Changing the host in the bundle breaks challenge verification."""
|
||||||
|
emulator = CardEmulator()
|
||||||
|
token = self._register_and_assert(emulator, "example.com", "deadbeef00112233")
|
||||||
|
tampered = _tamper(token, "host", lambda _: "attacker.com")
|
||||||
|
self.assertFalse(
|
||||||
|
k_server_app._verify_assertion_token(tampered, "example.com"),
|
||||||
|
"tampered host must break challenge verification",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_roundtrip_cross_server_replay_rejected(self):
|
||||||
|
"""Token issued for server-a must not validate on server-b."""
|
||||||
|
emulator = CardEmulator()
|
||||||
|
token = self._register_and_assert(emulator, "server-a.com", "1122334455667788")
|
||||||
|
self.assertFalse(
|
||||||
|
k_server_app._verify_assertion_token(token, "server-b.com"),
|
||||||
|
"cross-server replay: token for server-a must be rejected by server-b",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_roundtrip_cross_server_replay_accepted_on_correct_server(self):
|
||||||
|
"""Sanity: same token is accepted on the server it was issued for."""
|
||||||
|
emulator = CardEmulator()
|
||||||
|
token = self._register_and_assert(emulator, "server-a.com", "aabbccdd99887766")
|
||||||
|
self.assertTrue(
|
||||||
|
k_server_app._verify_assertion_token(token, "server-a.com"),
|
||||||
|
"token must still be valid on the correct server",
|
||||||
|
)
|
||||||
|
|
||||||
def test_roundtrip_replayed_for_different_user_rejected(self):
|
def test_roundtrip_replayed_for_different_user_rejected(self):
|
||||||
"""Two users register separate credentials; each token is only valid for its own key."""
|
"""Two users register separate credentials; each token is only valid for its own key."""
|
||||||
em_a = CardEmulator()
|
em_a = CardEmulator()
|
||||||
em_b = CardEmulator()
|
em_b = CardEmulator()
|
||||||
url, method, nonce = "https://example.com/protected", "GET", "00112233aabbccdd"
|
host, nonce = "example.com", "00112233aabbccdd"
|
||||||
token_a = self._register_and_assert(em_a, url, method, nonce)
|
token_a = self._register_and_assert(em_a, host, nonce)
|
||||||
token_b = self._register_and_assert(em_b, url, method, nonce)
|
token_b = self._register_and_assert(em_b, host, nonce)
|
||||||
|
|
||||||
# token_a's signature was made with em_a's key — must not verify with em_b's public key.
|
# token_a's signature was made with em_a's key — must not verify with em_b's public key.
|
||||||
# Tamper: swap cred (public key) from token_b into token_a's bundle.
|
# Tamper: swap cred (public key) from token_b into token_a's bundle.
|
||||||
|
|
@ -340,7 +353,7 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
|
||||||
cross = _b64u_encode(json.dumps(bundle_a, separators=(",", ":")).encode())
|
cross = _b64u_encode(json.dumps(bundle_a, separators=(",", ":")).encode())
|
||||||
|
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
k_server_app._verify_assertion_token(cross, "/protected", "GET"),
|
k_server_app._verify_assertion_token(cross, host),
|
||||||
"cross-user key swap must fail verification",
|
"cross-user key swap must fail verification",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue