350 lines
12 KiB
Python
350 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
card_emulator_bridge.py — CTAPHID TCP bridge for Android emulator testing.
|
|
|
|
The Dart ctaphid_channel.dart speaks raw 64-byte CTAPHID packets over TCP.
|
|
This bridge listens on :8772 (Android emulator reaches the Mac host at
|
|
10.0.2.2), translates CTAPHID frames into CardEmulator calls, and sends
|
|
framed CTAPHID responses back.
|
|
|
|
Run:
|
|
uv run --python 3.12 --with fido2 --with cbor2 --with cryptography \\
|
|
tests/card_emulator_bridge.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import struct
|
|
import sys
|
|
from typing import Any
|
|
|
|
import cbor2
|
|
from fido2.ctap import CtapError
|
|
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from card_emulator import CardEmulator
|
|
|
|
LOG = logging.getLogger("bridge")
|
|
|
|
# CTAPHID constants
|
|
_BROADCAST_CID = 0xFFFFFFFF
|
|
_CMD_INIT = 0x06
|
|
_CMD_CBOR = 0x10
|
|
_CMD_ERROR = 0x3F
|
|
_CMD_KEEPALIVE = 0x3B
|
|
|
|
_HID_SIZE = 64
|
|
_INIT_PAYLOAD = _HID_SIZE - 7 # 57 bytes usable in an init packet
|
|
_CONT_PAYLOAD = _HID_SIZE - 5 # 59 bytes usable in a continuation packet
|
|
|
|
# CTAP2 authenticator command codes (first byte of CTAPHID_CBOR payload)
|
|
_CTAP2_MAKE_CREDENTIAL = 0x01
|
|
_CTAP2_GET_ASSERTION = 0x02
|
|
_CTAP2_GET_INFO = 0x04
|
|
|
|
# CTAP error codes
|
|
_ERR_INVALID_CMD = 0x01
|
|
_ERR_INVALID_LEN = 0x03
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Packet helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _pack(cid: int, cmd: int, payload: bytes) -> list[bytes]:
|
|
"""Fragment payload into CTAPHID init + continuation packets."""
|
|
packets: list[bytes] = []
|
|
cid_b = struct.pack(">I", cid)
|
|
|
|
init = bytearray(_HID_SIZE)
|
|
init[:4] = cid_b
|
|
init[4] = (cmd & 0x7F) | 0x80
|
|
init[5] = (len(payload) >> 8) & 0xFF
|
|
init[6] = len(payload) & 0xFF
|
|
first_chunk = payload[:_INIT_PAYLOAD]
|
|
init[7: 7 + len(first_chunk)] = first_chunk
|
|
packets.append(bytes(init))
|
|
|
|
offset, seq = len(first_chunk), 0
|
|
while offset < len(payload):
|
|
cont = bytearray(_HID_SIZE)
|
|
cont[:4] = cid_b
|
|
cont[4] = seq & 0x7F
|
|
chunk = payload[offset: offset + _CONT_PAYLOAD]
|
|
cont[5: 5 + len(chunk)] = chunk
|
|
packets.append(bytes(cont))
|
|
offset += len(chunk)
|
|
seq += 1
|
|
|
|
return packets
|
|
|
|
|
|
def _error_pkt(cid: int, code: int) -> bytes:
|
|
return _pack(cid, _CMD_ERROR, bytes([code]))[0]
|
|
|
|
|
|
def _keepalive_pkt(cid: int) -> bytes:
|
|
# Status 0x02 = TUP_NEEDED (card is processing)
|
|
return _pack(cid, _CMD_KEEPALIVE, bytes([0x02]))[0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-connection handler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _Handler:
|
|
"""Handles one Android emulator TCP connection."""
|
|
|
|
def __init__(
|
|
self,
|
|
reader: asyncio.StreamReader,
|
|
writer: asyncio.StreamWriter,
|
|
emulator: CardEmulator,
|
|
) -> None:
|
|
self._r = reader
|
|
self._w = writer
|
|
self._emulator = emulator
|
|
self._allocated_cid = 1 # fixed CID for single-connection bridge
|
|
|
|
async def run(self) -> None:
|
|
peer = self._w.get_extra_info("peername")
|
|
LOG.info("connect from %s", peer)
|
|
try:
|
|
while True:
|
|
cid, cmd, data = await self._recv_message()
|
|
await self._dispatch(cid, cmd, data)
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, EOFError):
|
|
LOG.info("disconnect from %s", peer)
|
|
except Exception:
|
|
LOG.exception("handler error")
|
|
finally:
|
|
self._w.close()
|
|
try:
|
|
await self._w.wait_closed()
|
|
except Exception:
|
|
pass
|
|
|
|
# ---- I/O ----------------------------------------------------------------
|
|
|
|
async def _recv_pkt(self) -> bytes:
|
|
return await self._r.readexactly(_HID_SIZE)
|
|
|
|
async def _send(self, packets: list[bytes]) -> None:
|
|
for pkt in packets:
|
|
self._w.write(pkt)
|
|
await self._w.drain()
|
|
|
|
# ---- Message reassembly -------------------------------------------------
|
|
|
|
async def _recv_message(self) -> tuple[int, int, bytes]:
|
|
"""Read one full CTAPHID message (init + any continuations).
|
|
|
|
After every non-final packet the Dart client is blocked waiting for a
|
|
response. We send a keepalive so it resumes and sends the next packet.
|
|
"""
|
|
init_pkt = await self._recv_pkt()
|
|
|
|
cid = struct.unpack(">I", init_pkt[:4])[0]
|
|
cmd = init_pkt[4] & 0x7F
|
|
bcnt = (init_pkt[5] << 8) | init_pkt[6]
|
|
|
|
buf = bytearray(init_pkt[7: 7 + min(bcnt, _INIT_PAYLOAD)])
|
|
|
|
while len(buf) < bcnt:
|
|
# Unblock Dart's _sendPacket which is awaiting a response.
|
|
self._w.write(_keepalive_pkt(cid))
|
|
await self._w.drain()
|
|
|
|
cont = await self._recv_pkt()
|
|
remaining = bcnt - len(buf)
|
|
buf.extend(cont[5: 5 + min(remaining, _CONT_PAYLOAD)])
|
|
|
|
return cid, cmd, bytes(buf)
|
|
|
|
# ---- Dispatch -----------------------------------------------------------
|
|
|
|
async def _dispatch(self, cid: int, cmd: int, data: bytes) -> None:
|
|
if cmd == _CMD_INIT:
|
|
await self._handle_init(cid, data)
|
|
elif cmd == _CMD_CBOR:
|
|
await self._handle_cbor(cid, data)
|
|
else:
|
|
LOG.warning("unknown CTAPHID cmd=0x%02x cid=0x%08x", cmd, cid)
|
|
await self._send([_error_pkt(cid, _ERR_INVALID_CMD)])
|
|
|
|
# ---- CTAPHID INIT -------------------------------------------------------
|
|
|
|
async def _handle_init(self, cid: int, data: bytes) -> None:
|
|
if len(data) < 8:
|
|
await self._send([_error_pkt(cid, _ERR_INVALID_LEN)])
|
|
return
|
|
nonce = data[:8]
|
|
new_cid = self._allocated_cid
|
|
# Response payload: nonce(8) + new_cid(4) + CTAPHID_version(1)
|
|
# + major(1) + minor(1) + build(1) + capabilities(1)
|
|
payload = nonce + struct.pack(">I", new_cid) + bytes([2, 1, 0, 0, 0x04])
|
|
await self._send(_pack(_BROADCAST_CID, _CMD_INIT, payload))
|
|
LOG.info("INIT → CID=0x%08x", new_cid)
|
|
|
|
# ---- CTAPHID CBOR (CTAP2) -----------------------------------------------
|
|
|
|
async def _handle_cbor(self, cid: int, data: bytes) -> None:
|
|
if not data:
|
|
await self._send([_error_pkt(cid, _ERR_INVALID_LEN)])
|
|
return
|
|
|
|
ctap2_cmd = data[0]
|
|
body = data[1:] if len(data) > 1 else b""
|
|
|
|
LOG.info("CTAP2 cmd=0x%02x body=%d bytes", ctap2_cmd, len(body))
|
|
|
|
try:
|
|
if ctap2_cmd == _CTAP2_MAKE_CREDENTIAL:
|
|
resp_cbor = self._make_credential(body)
|
|
elif ctap2_cmd == _CTAP2_GET_ASSERTION:
|
|
resp_cbor = self._get_assertion(body)
|
|
elif ctap2_cmd == _CTAP2_GET_INFO:
|
|
resp_cbor = self._get_info()
|
|
else:
|
|
LOG.warning("unsupported CTAP2 cmd=0x%02x", ctap2_cmd)
|
|
await self._send([_error_pkt(cid, _ERR_INVALID_CMD)])
|
|
return
|
|
except CtapError as exc:
|
|
code = exc.code.value if hasattr(exc.code, "value") else int(exc.code)
|
|
LOG.warning("CtapError 0x%02x: %s", code, exc)
|
|
await self._send(_pack(cid, _CMD_CBOR, bytes([code])))
|
|
return
|
|
except Exception:
|
|
LOG.exception("CTAP2 processing error")
|
|
await self._send(_pack(cid, _CMD_CBOR, bytes([0x01])))
|
|
return
|
|
|
|
# Success: status 0x00 + CBOR-encoded response map
|
|
await self._send(_pack(cid, _CMD_CBOR, bytes([0x00]) + resp_cbor))
|
|
|
|
# ---- CTAP2 operations ---------------------------------------------------
|
|
|
|
def _make_credential(self, body: bytes) -> bytes:
|
|
params: dict[Any, Any] = cbor2.loads(body)
|
|
|
|
# CTAP2 spec integer keys: 1=clientDataHash, 2=rp, 3=user,
|
|
# 4=pubKeyCredParams, 7=options
|
|
client_data_hash: bytes = bytes(params[1])
|
|
rp: dict = dict(params[2])
|
|
user: dict = dict(params[3])
|
|
key_params: list = [dict(kp) for kp in params[4]]
|
|
options: dict = dict(params.get(7, {}))
|
|
|
|
LOG.info("makeCredential rp_id=%r user=%r", rp.get("id"), user.get("name"))
|
|
|
|
resp = self._emulator.make_credential(
|
|
client_data_hash=client_data_hash,
|
|
rp=rp,
|
|
user=user,
|
|
key_params=key_params,
|
|
options=options,
|
|
)
|
|
|
|
auth_data_bytes = bytes(resp.auth_data)
|
|
LOG.info("makeCredential OK auth_data=%d bytes", len(auth_data_bytes))
|
|
|
|
# CTAP2 makeCredential response map: 1=fmt, 2=authData, 3=attStmt
|
|
return cbor2.dumps({
|
|
1: resp.fmt,
|
|
2: auth_data_bytes,
|
|
3: resp.att_stmt or {},
|
|
})
|
|
|
|
def _get_assertion(self, body: bytes) -> bytes:
|
|
params: dict[Any, Any] = cbor2.loads(body)
|
|
|
|
# CTAP2 spec integer keys: 1=rpId, 2=clientDataHash, 3=allowList, 5=options
|
|
rp_id: str = params[1]
|
|
client_data_hash: bytes = bytes(params[2])
|
|
allow_list_raw: list = list(params.get(3, []))
|
|
options: dict = dict(params.get(5, {}))
|
|
|
|
allow_list = [dict(item) for item in allow_list_raw] or None
|
|
|
|
LOG.info("getAssertion rp_id=%r allow_list_len=%s",
|
|
rp_id, len(allow_list) if allow_list else 0)
|
|
|
|
resp = self._emulator.get_assertion(
|
|
rp_id=rp_id,
|
|
client_data_hash=client_data_hash,
|
|
allow_list=allow_list,
|
|
options=options,
|
|
)
|
|
|
|
auth_data_bytes = bytes(resp.auth_data)
|
|
signature = bytes(resp.signature)
|
|
|
|
# Build credential descriptor for key 1
|
|
cred = resp.credential
|
|
if hasattr(cred, "id"):
|
|
cred_map: dict = {"type": "public-key", "id": bytes(cred.id)}
|
|
else:
|
|
cred_map = {
|
|
k: (bytes(v) if isinstance(v, (bytes, bytearray, memoryview)) else v)
|
|
for k, v in (cred or {}).items()
|
|
}
|
|
|
|
LOG.info("getAssertion OK auth_data=%d bytes sig=%d bytes",
|
|
len(auth_data_bytes), len(signature))
|
|
|
|
# CTAP2 getAssertion response map: 1=credential, 2=authData, 3=signature
|
|
resp_map: dict[int, Any] = {1: cred_map, 2: auth_data_bytes, 3: signature}
|
|
if resp.user:
|
|
u = resp.user
|
|
resp_map[4] = dict(u) if hasattr(u, "items") else u
|
|
|
|
return cbor2.dumps(resp_map)
|
|
|
|
def _get_info(self) -> bytes:
|
|
# Minimal getInfo response
|
|
return cbor2.dumps({
|
|
1: ["FIDO_2_0"], # versions
|
|
3: b"\x00" * 16, # aaguid
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Server bootstrap
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _serve(host: str, port: int) -> None:
|
|
emulator = CardEmulator()
|
|
LOG.info("card_emulator_bridge listening on %s:%d — ctrl-C to stop", host, port)
|
|
|
|
async def _on_connect(
|
|
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
|
) -> None:
|
|
await _Handler(reader, writer, emulator).run()
|
|
|
|
server = await asyncio.start_server(_on_connect, host, port)
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
|
|
def _main() -> None:
|
|
ap = argparse.ArgumentParser(
|
|
description="CTAPHID TCP bridge for Android emulator ↔ CardEmulator"
|
|
)
|
|
ap.add_argument("--host", default="0.0.0.0", help="Listen host (default 0.0.0.0)")
|
|
ap.add_argument("--port", type=int, default=8772, help="Listen port (default 8772)")
|
|
args = ap.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
)
|
|
asyncio.run(_serve(args.host, args.port))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_main()
|