#!/usr/bin/env python3 """ card_emulator_bridge.py — CTAPHID TCP bridge for Android emulator testing. The Dart ctaphid_channel.dart speaks raw 64-byte CTAPHID packets over TCP. This bridge listens on :8772 (Android emulator reaches the Mac host at 10.0.2.2), translates CTAPHID frames into CardEmulator calls, and sends framed CTAPHID responses back. Run: uv run --python 3.12 --with fido2 --with cbor2 --with cryptography \\ tests/card_emulator_bridge.py """ from __future__ import annotations import argparse import asyncio import logging import os import struct import sys from typing import Any import cbor2 from fido2.ctap import CtapError sys.path.insert(0, os.path.dirname(__file__)) from card_emulator import CardEmulator LOG = logging.getLogger("bridge") # CTAPHID constants _BROADCAST_CID = 0xFFFFFFFF _CMD_INIT = 0x06 _CMD_CBOR = 0x10 _CMD_ERROR = 0x3F _CMD_KEEPALIVE = 0x3B _HID_SIZE = 64 _INIT_PAYLOAD = _HID_SIZE - 7 # 57 bytes usable in an init packet _CONT_PAYLOAD = _HID_SIZE - 5 # 59 bytes usable in a continuation packet # CTAP2 authenticator command codes (first byte of CTAPHID_CBOR payload) _CTAP2_MAKE_CREDENTIAL = 0x01 _CTAP2_GET_ASSERTION = 0x02 _CTAP2_GET_INFO = 0x04 # CTAP error codes _ERR_INVALID_CMD = 0x01 _ERR_INVALID_LEN = 0x03 # --------------------------------------------------------------------------- # Packet helpers # --------------------------------------------------------------------------- def _pack(cid: int, cmd: int, payload: bytes) -> list[bytes]: """Fragment payload into CTAPHID init + continuation packets.""" packets: list[bytes] = [] cid_b = struct.pack(">I", cid) init = bytearray(_HID_SIZE) init[:4] = cid_b init[4] = (cmd & 0x7F) | 0x80 init[5] = (len(payload) >> 8) & 0xFF init[6] = len(payload) & 0xFF first_chunk = payload[:_INIT_PAYLOAD] init[7: 7 + len(first_chunk)] = first_chunk packets.append(bytes(init)) offset, seq = len(first_chunk), 0 while offset < len(payload): cont = bytearray(_HID_SIZE) cont[:4] = cid_b cont[4] = seq & 0x7F chunk = payload[offset: offset + _CONT_PAYLOAD] cont[5: 5 + len(chunk)] = chunk packets.append(bytes(cont)) offset += len(chunk) seq += 1 return packets def _error_pkt(cid: int, code: int) -> bytes: return _pack(cid, _CMD_ERROR, bytes([code]))[0] def _keepalive_pkt(cid: int) -> bytes: # Status 0x02 = TUP_NEEDED (card is processing) return _pack(cid, _CMD_KEEPALIVE, bytes([0x02]))[0] # --------------------------------------------------------------------------- # Per-connection handler # --------------------------------------------------------------------------- class _Handler: """Handles one Android emulator TCP connection.""" def __init__( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, emulator: CardEmulator, ) -> None: self._r = reader self._w = writer self._emulator = emulator self._allocated_cid = 1 # fixed CID for single-connection bridge async def run(self) -> None: peer = self._w.get_extra_info("peername") LOG.info("connect from %s", peer) try: while True: cid, cmd, data = await self._recv_message() await self._dispatch(cid, cmd, data) except (asyncio.IncompleteReadError, ConnectionResetError, EOFError): LOG.info("disconnect from %s", peer) except Exception: LOG.exception("handler error") finally: self._w.close() try: await self._w.wait_closed() except Exception: pass # ---- I/O ---------------------------------------------------------------- async def _recv_pkt(self) -> bytes: return await self._r.readexactly(_HID_SIZE) async def _send(self, packets: list[bytes]) -> None: for pkt in packets: self._w.write(pkt) await self._w.drain() # ---- Message reassembly ------------------------------------------------- async def _recv_message(self) -> tuple[int, int, bytes]: """Read one full CTAPHID message (init + any continuations). After every non-final packet the Dart client is blocked waiting for a response. We send a keepalive so it resumes and sends the next packet. """ init_pkt = await self._recv_pkt() cid = struct.unpack(">I", init_pkt[:4])[0] cmd = init_pkt[4] & 0x7F bcnt = (init_pkt[5] << 8) | init_pkt[6] buf = bytearray(init_pkt[7: 7 + min(bcnt, _INIT_PAYLOAD)]) while len(buf) < bcnt: # Unblock Dart's _sendPacket which is awaiting a response. self._w.write(_keepalive_pkt(cid)) await self._w.drain() cont = await self._recv_pkt() remaining = bcnt - len(buf) buf.extend(cont[5: 5 + min(remaining, _CONT_PAYLOAD)]) return cid, cmd, bytes(buf) # ---- Dispatch ----------------------------------------------------------- async def _dispatch(self, cid: int, cmd: int, data: bytes) -> None: if cmd == _CMD_INIT: await self._handle_init(cid, data) elif cmd == _CMD_CBOR: await self._handle_cbor(cid, data) else: LOG.warning("unknown CTAPHID cmd=0x%02x cid=0x%08x", cmd, cid) await self._send([_error_pkt(cid, _ERR_INVALID_CMD)]) # ---- CTAPHID INIT ------------------------------------------------------- async def _handle_init(self, cid: int, data: bytes) -> None: if len(data) < 8: await self._send([_error_pkt(cid, _ERR_INVALID_LEN)]) return nonce = data[:8] new_cid = self._allocated_cid # Response payload: nonce(8) + new_cid(4) + CTAPHID_version(1) # + major(1) + minor(1) + build(1) + capabilities(1) payload = nonce + struct.pack(">I", new_cid) + bytes([2, 1, 0, 0, 0x04]) await self._send(_pack(_BROADCAST_CID, _CMD_INIT, payload)) LOG.info("INIT → CID=0x%08x", new_cid) # ---- CTAPHID CBOR (CTAP2) ----------------------------------------------- async def _handle_cbor(self, cid: int, data: bytes) -> None: if not data: await self._send([_error_pkt(cid, _ERR_INVALID_LEN)]) return ctap2_cmd = data[0] body = data[1:] if len(data) > 1 else b"" LOG.info("CTAP2 cmd=0x%02x body=%d bytes", ctap2_cmd, len(body)) try: if ctap2_cmd == _CTAP2_MAKE_CREDENTIAL: resp_cbor = self._make_credential(body) elif ctap2_cmd == _CTAP2_GET_ASSERTION: resp_cbor = self._get_assertion(body) elif ctap2_cmd == _CTAP2_GET_INFO: resp_cbor = self._get_info() else: LOG.warning("unsupported CTAP2 cmd=0x%02x", ctap2_cmd) await self._send([_error_pkt(cid, _ERR_INVALID_CMD)]) return except CtapError as exc: code = exc.code.value if hasattr(exc.code, "value") else int(exc.code) LOG.warning("CtapError 0x%02x: %s", code, exc) await self._send(_pack(cid, _CMD_CBOR, bytes([code]))) return except Exception: LOG.exception("CTAP2 processing error") await self._send(_pack(cid, _CMD_CBOR, bytes([0x01]))) return # Success: status 0x00 + CBOR-encoded response map await self._send(_pack(cid, _CMD_CBOR, bytes([0x00]) + resp_cbor)) # ---- CTAP2 operations --------------------------------------------------- def _make_credential(self, body: bytes) -> bytes: params: dict[Any, Any] = cbor2.loads(body) # CTAP2 spec integer keys: 1=clientDataHash, 2=rp, 3=user, # 4=pubKeyCredParams, 7=options client_data_hash: bytes = bytes(params[1]) rp: dict = dict(params[2]) user: dict = dict(params[3]) key_params: list = [dict(kp) for kp in params[4]] options: dict = dict(params.get(7, {})) LOG.info("makeCredential rp_id=%r user=%r", rp.get("id"), user.get("name")) resp = self._emulator.make_credential( client_data_hash=client_data_hash, rp=rp, user=user, key_params=key_params, options=options, ) auth_data_bytes = bytes(resp.auth_data) LOG.info("makeCredential OK auth_data=%d bytes", len(auth_data_bytes)) # CTAP2 makeCredential response map: 1=fmt, 2=authData, 3=attStmt return cbor2.dumps({ 1: resp.fmt, 2: auth_data_bytes, 3: resp.att_stmt or {}, }) def _get_assertion(self, body: bytes) -> bytes: params: dict[Any, Any] = cbor2.loads(body) # CTAP2 spec integer keys: 1=rpId, 2=clientDataHash, 3=allowList, 5=options rp_id: str = params[1] client_data_hash: bytes = bytes(params[2]) allow_list_raw: list = list(params.get(3, [])) options: dict = dict(params.get(5, {})) allow_list = [dict(item) for item in allow_list_raw] or None LOG.info("getAssertion rp_id=%r allow_list_len=%s", rp_id, len(allow_list) if allow_list else 0) resp = self._emulator.get_assertion( rp_id=rp_id, client_data_hash=client_data_hash, allow_list=allow_list, options=options, ) auth_data_bytes = bytes(resp.auth_data) signature = bytes(resp.signature) # Build credential descriptor for key 1 cred = resp.credential if hasattr(cred, "id"): cred_map: dict = {"type": "public-key", "id": bytes(cred.id)} else: cred_map = { k: (bytes(v) if isinstance(v, (bytes, bytearray, memoryview)) else v) for k, v in (cred or {}).items() } LOG.info("getAssertion OK auth_data=%d bytes sig=%d bytes", len(auth_data_bytes), len(signature)) # CTAP2 getAssertion response map: 1=credential, 2=authData, 3=signature resp_map: dict[int, Any] = {1: cred_map, 2: auth_data_bytes, 3: signature} if resp.user: u = resp.user resp_map[4] = dict(u) if hasattr(u, "items") else u return cbor2.dumps(resp_map) def _get_info(self) -> bytes: # Minimal getInfo response return cbor2.dumps({ 1: ["FIDO_2_0"], # versions 3: b"\x00" * 16, # aaguid }) # --------------------------------------------------------------------------- # Server bootstrap # --------------------------------------------------------------------------- async def _serve(host: str, port: int) -> None: emulator = CardEmulator() LOG.info("card_emulator_bridge listening on %s:%d — ctrl-C to stop", host, port) async def _on_connect( reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: await _Handler(reader, writer, emulator).run() server = await asyncio.start_server(_on_connect, host, port) async with server: await server.serve_forever() def _main() -> None: ap = argparse.ArgumentParser( description="CTAPHID TCP bridge for Android emulator ↔ CardEmulator" ) ap.add_argument("--host", default="0.0.0.0", help="Listen host (default 0.0.0.0)") ap.add_argument("--port", type=int, default=8772, help="Listen port (default 8772)") args = ap.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) asyncio.run(_serve(args.host, args.port)) if __name__ == "__main__": _main()