Compare commits

..

No commits in common. "592af0c31417db3235de6eb19edb0ccabe302276" and "139698cab50fa777efb1bb404104554e02ee8c5b" have entirely different histories.

5 changed files with 101 additions and 112 deletions

View File

@ -406,7 +406,7 @@ class FilterProxy {
) async {
String token;
try {
token = await _getAuthToken(uri);
token = await _getAuthToken(uri, method);
} catch (_) {
_deny(client, sub, 407, 'Proxy Authentication Required');
return;
@ -450,13 +450,14 @@ class FilterProxy {
await _forwardHttpRequest(client, sub, upstream, out.toString(), remainder, contentLength);
}
// Calls POST /auth/get-token on Component 2 with domain-level binding and
// Calls POST /auth/get-token on Component 2 with per-request binding and
// returns the bearer token (a self-contained FIDO2 assertion bundle).
// Throws if card is unavailable or Component 2 is unreachable.
Future<String> _getAuthToken(Uri uri) async {
Future<String> _getAuthToken(Uri uri, String method) async {
final nonce = _secureHex(16);
final payload = utf8.encode(jsonEncode({
'host': uri.host,
'url': uri.toString(),
'method': method,
'nonce': nonce,
}));
final httpClient = HttpClient()

View File

@ -448,11 +448,11 @@ class _ProxyServer {
}
// -------------------------------------------------------------------------
// Auth token endpoint (v2 architecture domain-level token binding)
// Auth token endpoint (v2 architecture per-request token binding)
//
// Component 1 (filter_proxy) and Component 3 (Go binary) call this with
// {host, nonce} for each gated request. A fresh FIDO2 assertion is
// produced with challenge = SHA256(host|nonce). The self-contained
// {url, method, nonce} for each gated request. A fresh FIDO2 assertion is
// produced with challenge = SHA256(url|method|nonce). The self-contained
// assertion bundle is returned as a base64url Bearer token the server can
// verify without calling back to this service.
// -------------------------------------------------------------------------
@ -461,11 +461,12 @@ class _ProxyServer {
final body = await _readJson(req);
if (body == null) return;
final host = body['host'] as String? ?? '';
final url = body['url'] as String? ?? '';
final method = body['method'] as String? ?? '';
final nonce = body['nonce'] as String? ?? '';
if (host.isEmpty || nonce.isEmpty) {
await _send(req.response, 400, {'ok': false, 'error': 'host, nonce required'});
if (url.isEmpty || method.isEmpty || nonce.isEmpty) {
await _send(req.response, 400, {'ok': false, 'error': 'url, method, nonce required'});
return;
}
@ -486,9 +487,9 @@ class _ProxyServer {
return;
}
// Challenge = SHA256(host | "|" | nonce)
// Challenge = SHA256(url | "|" | method | "|" | nonce)
final challenge = Uint8List.fromList(
sha256.convert(utf8.encode('$host|$nonce')).bytes,
sha256.convert(utf8.encode('$url|$method|$nonce')).bytes,
);
GetAssertionResult assertionResult;
@ -502,7 +503,8 @@ class _ProxyServer {
// Self-contained bundle the server can verify without calling back to the phone.
final bundleJson = jsonEncode({
'v': 1,
'host': host,
'url': url,
'method': method,
'nonce': nonce,
'authData': base64Url.encode(assertionResult.authData).replaceAll('=', ''),
'sig': base64Url.encode(assertionResult.signature).replaceAll('=', ''),

View File

@ -250,7 +250,7 @@ void main() {
expect(req.uri.path, '/auth/get-token');
});
test('gated host: /auth/get-token body carries host, nonce', () async {
test('gated host: /auth/get-token body carries url, method, nonce', () async {
await _round(
proxy.port,
'GET http://127.0.0.1:${endpoint.port}/api?x=1 HTTP/1.1\r\n'
@ -259,14 +259,13 @@ void main() {
final (:req, :rawBody) = await comp2TokenReq.future.timeout(_kTimeout);
expect(req.uri.path, '/auth/get-token');
final body = jsonDecode(rawBody) as Map<String, dynamic>;
expect(body['host'], '127.0.0.1');
expect(body.containsKey('url'), isFalse);
expect(body.containsKey('method'), isFalse);
expect(body['url'], contains('/api?x=1'));
expect(body['method'], 'GET');
expect(body['nonce'], isA<String>());
expect((body['nonce'] as String).length, greaterThan(8));
});
test('gated POST: /auth/get-token body carries host, nonce (no method)', () async {
test('gated POST: /auth/get-token body carries method POST', () async {
const postBody = '{"key":"val"}';
await _round(
proxy.port,
@ -279,9 +278,8 @@ void main() {
final (:req, :rawBody) = await comp2TokenReq.future.timeout(_kTimeout);
expect(req.uri.path, '/auth/get-token');
final body = jsonDecode(rawBody) as Map<String, dynamic>;
expect(body['host'], '127.0.0.1');
expect(body.containsKey('method'), isFalse);
expect(body['nonce'], isA<String>());
expect(body['method'], 'POST');
expect(body['url'], contains('/submit'));
});
test('gated host: request goes directly to endpoint with Bearer token', () async {

View File

@ -26,21 +26,19 @@ def _b64u_decode(s: str) -> bytes:
return base64.urlsafe_b64decode(padded)
def _verify_assertion_token(token: str, expected_host: str) -> bool:
"""Verify a base64url-encoded FIDO2 domain-level assertion bundle.
def _verify_assertion_token(token: str, request_path: str, request_method: str) -> bool:
"""Verify a base64url-encoded FIDO2 per-request assertion bundle.
Bundle fields (JSON, then base64url-encoded):
v version (1)
host hostname used to derive the challenge
url full URL used to derive the challenge
method HTTP method used to derive the challenge
nonce random hex nonce used to derive the challenge
authData base64url authenticator data
sig base64url ECDSA signature
cdj base64url clientDataJson bytes
cred base64url AttestedCredentialData (aaguid+credIdLen+credId+coseKey)
user enrolled username (informational)
expected_host must match bundle["host"] exactly (case-insensitive) to prevent
cross-server replay: a token issued for server-a must not pass on server-b.
"""
try:
import cbor2
@ -55,14 +53,19 @@ def _verify_assertion_token(token: str, expected_host: str) -> bool:
bundle = json.loads(_b64u_decode(token).decode("utf-8"))
host = bundle["host"]
nonce = bundle["nonce"]
if host.lower() != expected_host.lower():
# Path and method must match the actual request.
bundle_path = urlparse(bundle["url"]).path
if bundle_path != request_path:
return False
if bundle["method"].upper() != request_method.upper():
return False
# Verify challenge claim: challenge == b64u(SHA256(host|nonce))
binding = f"{host}|{nonce}".encode()
url = bundle["url"]
method = bundle["method"]
nonce = bundle["nonce"]
# Verify challenge claim: challenge == b64u(SHA256(url|method|nonce))
binding = f"{url}|{method}|{nonce}".encode()
expected_challenge = base64.urlsafe_b64encode(hashlib.sha256(binding).digest()).rstrip(b"=").decode()
cdj_bytes = _b64u_decode(bundle["cdj"])
@ -100,9 +103,8 @@ def _verify_assertion_token(token: str, expected_host: str) -> bool:
class ServerState:
# All state is process-local; a restart resets the counter to zero.
def __init__(self, proxy_token: str, protected_host: str = "127.0.0.1"):
def __init__(self, proxy_token: str):
self.proxy_token = proxy_token
self.protected_host = protected_host
self.counter = 0
self.lock = threading.Lock()
@ -137,7 +139,11 @@ class Handler(BaseHTTPRequestHandler):
return True
auth = self.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return _verify_assertion_token(auth[7:].strip(), self.state.protected_host)
return _verify_assertion_token(
auth[7:].strip(),
request_path=urlparse(self.path).path,
request_method=self.command,
)
return False
def do_GET(self) -> None: # noqa: N802
@ -187,11 +193,6 @@ def parse_args() -> argparse.Namespace:
default="dev-proxy-token",
help="Shared token expected in X-Proxy-Token from k_proxy",
)
parser.add_argument(
"--protected-host",
default="127.0.0.1",
help="Hostname this server protects; Bearer tokens must be issued for this host",
)
return parser.parse_args()
@ -200,7 +201,7 @@ def main() -> int:
if bool(args.tls_certfile) != bool(args.tls_keyfile):
raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS")
state = ServerState(proxy_token=args.proxy_token, protected_host=args.protected_host)
state = ServerState(proxy_token=args.proxy_token)
Handler.state = state
server = ThreadingHTTPServer((args.host, args.port), Handler)
scheme = "http"

View File

@ -73,7 +73,7 @@ def _cose_es256(x: bytes, y: bytes) -> bytes:
)
def _make_bundle(host: str, nonce: str) -> tuple[str, object]:
def _make_bundle(url: str, method: str, nonce: str) -> tuple[str, object]:
"""Return (base64url-token, private_key) for a fresh P-256 assertion.
Mirrors exactly what proxy_service.dart's _handleAuthGetToken produces.
@ -90,8 +90,8 @@ def _make_bundle(host: str, nonce: str) -> tuple[str, object]:
cred_id = os.urandom(16)
cred_data = aaguid + struct.pack(">H", len(cred_id)) + cred_id + cose_key
# Challenge = SHA256(host|nonce) — same as proxy_service.dart
challenge_b64u = _b64u_encode(hashlib.sha256(f"{host}|{nonce}".encode()).digest())
# Challenge = SHA256(url|method|nonce) — same as proxy_service.dart
challenge_b64u = _b64u_encode(hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest())
cdj = json.dumps(
{
@ -112,7 +112,8 @@ def _make_bundle(host: str, nonce: str) -> tuple[str, object]:
bundle = {
"v": 1,
"host": host,
"url": url,
"method": method,
"nonce": nonce,
"authData": _b64u_encode(auth_data),
"sig": _b64u_encode(sig),
@ -136,40 +137,33 @@ def _tamper(token: str, key: str, transform) -> str:
@unittest.skipUnless(HAS_CRYPTO, "cbor2 / cryptography not installed")
class TestVerifyAssertionToken(unittest.TestCase):
def setUp(self):
self.host = "127.0.0.1"
self.url = "https://127.0.0.1:8780/resource/counter"
self.method = "POST"
self.nonce = "deadbeef01234567"
self.token, _ = _make_bundle(self.host, self.nonce)
self.token, _ = _make_bundle(self.url, self.method, self.nonce)
def _check(self, token=None, host=None) -> bool:
def _check(self, token=None, path="/resource/counter", method="POST") -> bool:
return k_server_app._verify_assertion_token(
self.token if token is None else token,
host if host is not None else self.host,
self.token if token is None else token, request_path=path, request_method=method
)
def test_valid_token_accepted(self):
self.assertTrue(self._check())
def test_token_valid_for_any_path_on_host(self):
# Domain-level binding: the same token covers all paths on the host.
self.assertTrue(self._check())
def test_wrong_path_rejected(self):
self.assertFalse(self._check(path="/resource/other"))
def test_cross_server_replay_rejected(self):
# Token issued for self.host must not pass when a different server verifies it.
self.assertFalse(self._check(host="other-server.com"))
def test_wrong_method_rejected(self):
self.assertFalse(self._check(method="GET"))
def test_cross_server_replay_case_insensitive(self):
# Case variation of the expected host still rejects a token for a different host.
token_b, _ = _make_bundle("BANK.com", self.nonce)
self.assertFalse(k_server_app._verify_assertion_token(token_b, "evil.com"))
def test_method_comparison_case_insensitive(self):
self.assertTrue(self._check(method="post"))
self.assertTrue(self._check(method="POST"))
def test_tampered_nonce_invalidates_challenge(self):
tampered = _tamper(self.token, "nonce", lambda _: "tampered00000000")
self.assertFalse(self._check(tampered))
def test_tampered_host_invalidates_challenge(self):
tampered = _tamper(self.token, "host", lambda _: "attacker.com")
self.assertFalse(self._check(tampered))
def test_tampered_signature_rejected(self):
def flip_last_byte(b64: str) -> str:
raw = bytearray(_b64u_decode(b64))
@ -194,6 +188,10 @@ class TestVerifyAssertionToken(unittest.TestCase):
tampered = _tamper(self.token, "cred", swap_key)
self.assertFalse(self._check(tampered))
def test_cross_resource_replay_rejected(self):
# Token bound to /resource/counter must not pass for /resource/admin.
self.assertFalse(self._check(path="/resource/admin"))
def test_malformed_token_returns_false(self):
self.assertFalse(self._check(token="!!!not-base64!!!"))
@ -231,7 +229,8 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
def _register_and_assert(
self,
emulator: "CardEmulator",
host: str,
url: str,
method: str,
nonce: str,
) -> str:
"""Return a token string after one register + one assertion."""
@ -249,9 +248,9 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
cred_id_len = (cred_data[16] << 8) | cred_data[17]
cred_id = cred_data[18:18 + cred_id_len]
# 2. Assert with domain-level challenge — mirrors _handleAuthGetToken in proxy_service.dart
# 2. Assert with bound challenge — mirrors _handleAuthGetToken in proxy_service.dart
challenge_b64u = _b64u_encode(
hashlib.sha256(f"{host}|{nonce}".encode()).digest()
hashlib.sha256(f"{url}|{method}|{nonce}".encode()).digest()
)
cdj = json.dumps(
{
@ -272,7 +271,8 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
# 3. Encode bundle — same as proxy_service.dart _handleAuthGetToken
bundle = {
"v": 1,
"host": host,
"url": url,
"method": method,
"nonce": nonce,
"authData": _b64u_encode(bytes(assertion.auth_data)),
"sig": _b64u_encode(bytes(assertion.signature)),
@ -284,66 +284,53 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
def test_roundtrip_accepted(self):
emulator = CardEmulator()
token = self._register_and_assert(emulator, "example.com", "cafebabe12345678")
token = self._register_and_assert(
emulator, "https://example.com/api/data", "GET", "cafebabe12345678"
)
self.assertTrue(
k_server_app._verify_assertion_token(token, "example.com"),
k_server_app._verify_assertion_token(token, "/api/data", "GET"),
"valid round-trip token must be accepted",
)
def test_roundtrip_valid_for_any_path(self):
"""Domain-level token: accepted regardless of which path is requested."""
def test_roundtrip_wrong_path_rejected(self):
emulator = CardEmulator()
token = self._register_and_assert(emulator, "example.com", "aabbccdd11223344")
self.assertTrue(
k_server_app._verify_assertion_token(token, "example.com"),
"domain-level token must be accepted for any path on the host",
token = self._register_and_assert(
emulator, "https://example.com/api/data", "GET", "aabbccdd11223344"
)
self.assertFalse(
k_server_app._verify_assertion_token(token, "/api/other", "GET"),
"token for /api/data must not pass for /api/other",
)
def test_roundtrip_same_token_tampered_nonce_rejected(self):
def test_roundtrip_wrong_method_rejected(self):
emulator = CardEmulator()
token = self._register_and_assert(
emulator, "https://example.com/submit", "POST", "1122334455667788"
)
self.assertFalse(
k_server_app._verify_assertion_token(token, "/submit", "GET"),
"token for POST must not pass for GET",
)
def test_roundtrip_same_token_twice_rejected_after_nonce_tamper(self):
"""Changing the nonce in a real assertion bundle breaks verification."""
emulator = CardEmulator()
token = self._register_and_assert(emulator, "example.com", "original00000000")
token = self._register_and_assert(
emulator, "https://example.com/resource/counter", "POST", "original00000000"
)
tampered = _tamper(token, "nonce", lambda _: "tampered11111111")
self.assertFalse(
k_server_app._verify_assertion_token(tampered, "example.com"),
k_server_app._verify_assertion_token(tampered, "/resource/counter", "POST"),
"tampered nonce must break challenge verification",
)
def test_roundtrip_tampered_host_rejected(self):
"""Changing the host in the bundle breaks challenge verification."""
emulator = CardEmulator()
token = self._register_and_assert(emulator, "example.com", "deadbeef00112233")
tampered = _tamper(token, "host", lambda _: "attacker.com")
self.assertFalse(
k_server_app._verify_assertion_token(tampered, "example.com"),
"tampered host must break challenge verification",
)
def test_roundtrip_cross_server_replay_rejected(self):
"""Token issued for server-a must not validate on server-b."""
emulator = CardEmulator()
token = self._register_and_assert(emulator, "server-a.com", "1122334455667788")
self.assertFalse(
k_server_app._verify_assertion_token(token, "server-b.com"),
"cross-server replay: token for server-a must be rejected by server-b",
)
def test_roundtrip_cross_server_replay_accepted_on_correct_server(self):
"""Sanity: same token is accepted on the server it was issued for."""
emulator = CardEmulator()
token = self._register_and_assert(emulator, "server-a.com", "aabbccdd99887766")
self.assertTrue(
k_server_app._verify_assertion_token(token, "server-a.com"),
"token must still be valid on the correct server",
)
def test_roundtrip_replayed_for_different_user_rejected(self):
"""Two users register separate credentials; each token is only valid for its own key."""
em_a = CardEmulator()
em_b = CardEmulator()
host, nonce = "example.com", "00112233aabbccdd"
token_a = self._register_and_assert(em_a, host, nonce)
token_b = self._register_and_assert(em_b, host, nonce)
url, method, nonce = "https://example.com/protected", "GET", "00112233aabbccdd"
token_a = self._register_and_assert(em_a, url, method, nonce)
token_b = self._register_and_assert(em_b, url, method, nonce)
# token_a's signature was made with em_a's key — must not verify with em_b's public key.
# Tamper: swap cred (public key) from token_b into token_a's bundle.
@ -353,7 +340,7 @@ class TestVerifyAssertionTokenRoundTrip(unittest.TestCase):
cross = _b64u_encode(json.dumps(bundle_a, separators=(",", ":")).encode())
self.assertFalse(
k_server_app._verify_assertion_token(cross, host),
k_server_app._verify_assertion_token(cross, "/protected", "GET"),
"cross-user key swap must fail verification",
)