k_card/tests/card_emulator_bridge.py

350 lines
12 KiB
Python

#!/usr/bin/env python3
"""
card_emulator_bridge.py — CTAPHID TCP bridge for Android emulator testing.
The Dart ctaphid_channel.dart speaks raw 64-byte CTAPHID packets over TCP.
This bridge listens on :8772 (Android emulator reaches the Mac host at
10.0.2.2), translates CTAPHID frames into CardEmulator calls, and sends
framed CTAPHID responses back.
Run:
uv run --python 3.12 --with fido2 --with cbor2 --with cryptography \\
tests/card_emulator_bridge.py
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import struct
import sys
from typing import Any
import cbor2
from fido2.ctap import CtapError
sys.path.insert(0, os.path.dirname(__file__))
from card_emulator import CardEmulator
LOG = logging.getLogger("bridge")
# CTAPHID constants
_BROADCAST_CID = 0xFFFFFFFF
_CMD_INIT = 0x06
_CMD_CBOR = 0x10
_CMD_ERROR = 0x3F
_CMD_KEEPALIVE = 0x3B
_HID_SIZE = 64
_INIT_PAYLOAD = _HID_SIZE - 7 # 57 bytes usable in an init packet
_CONT_PAYLOAD = _HID_SIZE - 5 # 59 bytes usable in a continuation packet
# CTAP2 authenticator command codes (first byte of CTAPHID_CBOR payload)
_CTAP2_MAKE_CREDENTIAL = 0x01
_CTAP2_GET_ASSERTION = 0x02
_CTAP2_GET_INFO = 0x04
# CTAP error codes
_ERR_INVALID_CMD = 0x01
_ERR_INVALID_LEN = 0x03
# ---------------------------------------------------------------------------
# Packet helpers
# ---------------------------------------------------------------------------
def _pack(cid: int, cmd: int, payload: bytes) -> list[bytes]:
"""Fragment payload into CTAPHID init + continuation packets."""
packets: list[bytes] = []
cid_b = struct.pack(">I", cid)
init = bytearray(_HID_SIZE)
init[:4] = cid_b
init[4] = (cmd & 0x7F) | 0x80
init[5] = (len(payload) >> 8) & 0xFF
init[6] = len(payload) & 0xFF
first_chunk = payload[:_INIT_PAYLOAD]
init[7: 7 + len(first_chunk)] = first_chunk
packets.append(bytes(init))
offset, seq = len(first_chunk), 0
while offset < len(payload):
cont = bytearray(_HID_SIZE)
cont[:4] = cid_b
cont[4] = seq & 0x7F
chunk = payload[offset: offset + _CONT_PAYLOAD]
cont[5: 5 + len(chunk)] = chunk
packets.append(bytes(cont))
offset += len(chunk)
seq += 1
return packets
def _error_pkt(cid: int, code: int) -> bytes:
return _pack(cid, _CMD_ERROR, bytes([code]))[0]
def _keepalive_pkt(cid: int) -> bytes:
# Status 0x02 = TUP_NEEDED (card is processing)
return _pack(cid, _CMD_KEEPALIVE, bytes([0x02]))[0]
# ---------------------------------------------------------------------------
# Per-connection handler
# ---------------------------------------------------------------------------
class _Handler:
"""Handles one Android emulator TCP connection."""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
emulator: CardEmulator,
) -> None:
self._r = reader
self._w = writer
self._emulator = emulator
self._allocated_cid = 1 # fixed CID for single-connection bridge
async def run(self) -> None:
peer = self._w.get_extra_info("peername")
LOG.info("connect from %s", peer)
try:
while True:
cid, cmd, data = await self._recv_message()
await self._dispatch(cid, cmd, data)
except (asyncio.IncompleteReadError, ConnectionResetError, EOFError):
LOG.info("disconnect from %s", peer)
except Exception:
LOG.exception("handler error")
finally:
self._w.close()
try:
await self._w.wait_closed()
except Exception:
pass
# ---- I/O ----------------------------------------------------------------
async def _recv_pkt(self) -> bytes:
return await self._r.readexactly(_HID_SIZE)
async def _send(self, packets: list[bytes]) -> None:
for pkt in packets:
self._w.write(pkt)
await self._w.drain()
# ---- Message reassembly -------------------------------------------------
async def _recv_message(self) -> tuple[int, int, bytes]:
"""Read one full CTAPHID message (init + any continuations).
After every non-final packet the Dart client is blocked waiting for a
response. We send a keepalive so it resumes and sends the next packet.
"""
init_pkt = await self._recv_pkt()
cid = struct.unpack(">I", init_pkt[:4])[0]
cmd = init_pkt[4] & 0x7F
bcnt = (init_pkt[5] << 8) | init_pkt[6]
buf = bytearray(init_pkt[7: 7 + min(bcnt, _INIT_PAYLOAD)])
while len(buf) < bcnt:
# Unblock Dart's _sendPacket which is awaiting a response.
self._w.write(_keepalive_pkt(cid))
await self._w.drain()
cont = await self._recv_pkt()
remaining = bcnt - len(buf)
buf.extend(cont[5: 5 + min(remaining, _CONT_PAYLOAD)])
return cid, cmd, bytes(buf)
# ---- Dispatch -----------------------------------------------------------
async def _dispatch(self, cid: int, cmd: int, data: bytes) -> None:
if cmd == _CMD_INIT:
await self._handle_init(cid, data)
elif cmd == _CMD_CBOR:
await self._handle_cbor(cid, data)
else:
LOG.warning("unknown CTAPHID cmd=0x%02x cid=0x%08x", cmd, cid)
await self._send([_error_pkt(cid, _ERR_INVALID_CMD)])
# ---- CTAPHID INIT -------------------------------------------------------
async def _handle_init(self, cid: int, data: bytes) -> None:
if len(data) < 8:
await self._send([_error_pkt(cid, _ERR_INVALID_LEN)])
return
nonce = data[:8]
new_cid = self._allocated_cid
# Response payload: nonce(8) + new_cid(4) + CTAPHID_version(1)
# + major(1) + minor(1) + build(1) + capabilities(1)
payload = nonce + struct.pack(">I", new_cid) + bytes([2, 1, 0, 0, 0x04])
await self._send(_pack(_BROADCAST_CID, _CMD_INIT, payload))
LOG.info("INIT → CID=0x%08x", new_cid)
# ---- CTAPHID CBOR (CTAP2) -----------------------------------------------
async def _handle_cbor(self, cid: int, data: bytes) -> None:
if not data:
await self._send([_error_pkt(cid, _ERR_INVALID_LEN)])
return
ctap2_cmd = data[0]
body = data[1:] if len(data) > 1 else b""
LOG.info("CTAP2 cmd=0x%02x body=%d bytes", ctap2_cmd, len(body))
try:
if ctap2_cmd == _CTAP2_MAKE_CREDENTIAL:
resp_cbor = self._make_credential(body)
elif ctap2_cmd == _CTAP2_GET_ASSERTION:
resp_cbor = self._get_assertion(body)
elif ctap2_cmd == _CTAP2_GET_INFO:
resp_cbor = self._get_info()
else:
LOG.warning("unsupported CTAP2 cmd=0x%02x", ctap2_cmd)
await self._send([_error_pkt(cid, _ERR_INVALID_CMD)])
return
except CtapError as exc:
code = exc.code.value if hasattr(exc.code, "value") else int(exc.code)
LOG.warning("CtapError 0x%02x: %s", code, exc)
await self._send(_pack(cid, _CMD_CBOR, bytes([code])))
return
except Exception:
LOG.exception("CTAP2 processing error")
await self._send(_pack(cid, _CMD_CBOR, bytes([0x01])))
return
# Success: status 0x00 + CBOR-encoded response map
await self._send(_pack(cid, _CMD_CBOR, bytes([0x00]) + resp_cbor))
# ---- CTAP2 operations ---------------------------------------------------
def _make_credential(self, body: bytes) -> bytes:
params: dict[Any, Any] = cbor2.loads(body)
# CTAP2 spec integer keys: 1=clientDataHash, 2=rp, 3=user,
# 4=pubKeyCredParams, 7=options
client_data_hash: bytes = bytes(params[1])
rp: dict = dict(params[2])
user: dict = dict(params[3])
key_params: list = [dict(kp) for kp in params[4]]
options: dict = dict(params.get(7, {}))
LOG.info("makeCredential rp_id=%r user=%r", rp.get("id"), user.get("name"))
resp = self._emulator.make_credential(
client_data_hash=client_data_hash,
rp=rp,
user=user,
key_params=key_params,
options=options,
)
auth_data_bytes = bytes(resp.auth_data)
LOG.info("makeCredential OK auth_data=%d bytes", len(auth_data_bytes))
# CTAP2 makeCredential response map: 1=fmt, 2=authData, 3=attStmt
return cbor2.dumps({
1: resp.fmt,
2: auth_data_bytes,
3: resp.att_stmt or {},
})
def _get_assertion(self, body: bytes) -> bytes:
params: dict[Any, Any] = cbor2.loads(body)
# CTAP2 spec integer keys: 1=rpId, 2=clientDataHash, 3=allowList, 5=options
rp_id: str = params[1]
client_data_hash: bytes = bytes(params[2])
allow_list_raw: list = list(params.get(3, []))
options: dict = dict(params.get(5, {}))
allow_list = [dict(item) for item in allow_list_raw] or None
LOG.info("getAssertion rp_id=%r allow_list_len=%s",
rp_id, len(allow_list) if allow_list else 0)
resp = self._emulator.get_assertion(
rp_id=rp_id,
client_data_hash=client_data_hash,
allow_list=allow_list,
options=options,
)
auth_data_bytes = bytes(resp.auth_data)
signature = bytes(resp.signature)
# Build credential descriptor for key 1
cred = resp.credential
if hasattr(cred, "id"):
cred_map: dict = {"type": "public-key", "id": bytes(cred.id)}
else:
cred_map = {
k: (bytes(v) if isinstance(v, (bytes, bytearray, memoryview)) else v)
for k, v in (cred or {}).items()
}
LOG.info("getAssertion OK auth_data=%d bytes sig=%d bytes",
len(auth_data_bytes), len(signature))
# CTAP2 getAssertion response map: 1=credential, 2=authData, 3=signature
resp_map: dict[int, Any] = {1: cred_map, 2: auth_data_bytes, 3: signature}
if resp.user:
u = resp.user
resp_map[4] = dict(u) if hasattr(u, "items") else u
return cbor2.dumps(resp_map)
def _get_info(self) -> bytes:
# Minimal getInfo response
return cbor2.dumps({
1: ["FIDO_2_0"], # versions
3: b"\x00" * 16, # aaguid
})
# ---------------------------------------------------------------------------
# Server bootstrap
# ---------------------------------------------------------------------------
async def _serve(host: str, port: int) -> None:
emulator = CardEmulator()
LOG.info("card_emulator_bridge listening on %s:%d — ctrl-C to stop", host, port)
async def _on_connect(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
await _Handler(reader, writer, emulator).run()
server = await asyncio.start_server(_on_connect, host, port)
async with server:
await server.serve_forever()
def _main() -> None:
ap = argparse.ArgumentParser(
description="CTAPHID TCP bridge for Android emulator ↔ CardEmulator"
)
ap.add_argument("--host", default="0.0.0.0", help="Listen host (default 0.0.0.0)")
ap.add_argument("--port", type=int, default=8772, help="Listen port (default 8772)")
args = ap.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
asyncio.run(_serve(args.host, args.port))
if __name__ == "__main__":
_main()