Start Phase 5 proxy/server session reuse prototype

This commit is contained in:
Morten V. Christiansen 2026-04-24 10:30:40 +02:00
parent 3dcac21dd0
commit 37600548ac
5 changed files with 510 additions and 4 deletions

80
PHASE5_RUNBOOK.md Normal file
View File

@ -0,0 +1,80 @@
# Phase 5 Runbook (Session Reuse Prototype)
This runbook starts a minimal `k_server` + `k_proxy` prototype for session reuse testing.
## What This Prototype Covers
- `k_proxy` creates short-lived sessions.
- Session creation uses a card-presence check (`fido2_probe.py --json`) as the current auth gate.
- Valid sessions can repeatedly access a protected `k_server` counter endpoint without re-running card auth each request.
- Session status and logout/invalidation paths are implemented.
## Start Services
In `k_server` VM:
```bash
python3 /home/user/chromecard/k_server_app.py --host 127.0.0.1 --port 8780 --proxy-token dev-proxy-token
```
In `k_proxy` VM:
```bash
python3 /home/user/chromecard/k_proxy_app.py \
--host 127.0.0.1 \
--port 8770 \
--session-ttl 300 \
--server-base-url http://127.0.0.1:8780 \
--proxy-token dev-proxy-token
```
## Test Flow
Create a session (runs auth gate once):
```bash
curl -sS -X POST http://127.0.0.1:8770/session/login \
-H 'Content-Type: application/json' \
-d '{"username":"alice"}'
```
Copy `session_token` from response, then:
```bash
TOKEN='<paste-token>'
```
Check session:
```bash
curl -sS -X POST http://127.0.0.1:8770/session/status \
-H "Authorization: Bearer $TOKEN"
```
Call protected resource multiple times (should not require new login):
```bash
curl -sS -X POST http://127.0.0.1:8770/resource/counter \
-H "Authorization: Bearer $TOKEN"
curl -sS -X POST http://127.0.0.1:8770/resource/counter \
-H "Authorization: Bearer $TOKEN"
```
Logout/invalidate:
```bash
curl -sS -X POST http://127.0.0.1:8770/session/logout \
-H "Authorization: Bearer $TOKEN"
```
Re-check after logout (should fail with 401):
```bash
curl -i -X POST http://127.0.0.1:8770/resource/counter \
-H "Authorization: Bearer $TOKEN"
```
## Current Limitation
- This uses card-presence probing, not a full WebAuthn assertion verification path.
- Intended as a Phase 5 starter for session semantics and proxy/server behavior.

View File

@ -129,6 +129,10 @@ Thread-safety expectation:
- Local WebAuthn demo (`http://localhost:8765` in `k_proxy`) succeeded:
- register: `ok=true`, `username=alice`, `credential_count=1`
- login/auth: `ok=true`, `username=alice`, `authenticated=true`
- Phase 5 prototype services are now available:
- `/home/user/chromecard/k_proxy_app.py`
- `/home/user/chromecard/k_server_app.py`
- `/home/user/chromecard/PHASE5_RUNBOOK.md`
- `west` is not currently installed/in PATH: `west not found`.
- The checked-out `CR_SDK_CK-main` tree appears incomplete for documented sysbuild role layout:
- missing: `mvp`, `setup`, `components`, `samples`
@ -150,6 +154,7 @@ Session note (2026-04-24):
- After USB assignment to `k_proxy`, `/dev/hidraw0` and `/dev/hidraw1` appeared.
- CTAP probe re-run succeeded with detected ChromeCard device and valid CTAP2 `getInfo` response.
- Local WebAuthn demo completed successfully for user `alice` (register + login).
- Phase 5 starter implementation added with session TTL, logout/invalidation, and proxy->server protected counter forwarding.
## Known FIDO2 Transport Boundary
@ -204,13 +209,10 @@ SUBSYSTEM=="hidraw", ATTRS{idVendor}=="1209", ATTRS{idProduct}=="0005", MODE="06
## Open Gaps To Resolve
- Why no `/dev/hidraw*` device is visible despite USB connection.
- Whether udev rule is missing or device VID/PID differs from expected.
- Whether current firmware on card exposes the FIDO2 HID interface.
- Whether a full `CR_SDK_CK-main` checkout (with role directories) is available locally.
- Whether server-side code should be pulled now for broader CIP/WebAuthn integration testing.
- Exact Qubes firewall and service binding rules to enforce the `k_client -> k_proxy -> k_server` chain.
- Exact enrollment process interface running in `k_client` and how it reaches `k_proxy`.
- Concrete session format/lifetime so cached sessions reduce card prompts without weakening security.
- Upgrade Phase 5 auth gate from card-presence probe to full WebAuthn assertion verification for session creation.
- Precise ownership split of session/user state between `k_proxy` and `k_server`.
- Concrete concurrency limits and acceptance criteria (requests/sec, parallel clients, latency/error thresholds).

View File

@ -153,6 +153,18 @@ Exit criteria:
- Repeated authorized requests do not require card interaction until session expiry.
- Expired/invalid sessions are correctly rejected.
Status (2026-04-24):
- Started with a runnable prototype:
- `/home/user/chromecard/k_proxy_app.py`
- `/home/user/chromecard/k_server_app.py`
- `/home/user/chromecard/PHASE5_RUNBOOK.md`
- Implemented in prototype:
- session create/status/logout endpoints in `k_proxy`
- TTL-based server-side session store with expiry garbage collection
- protected monotonic counter endpoint in `k_server` with thread-safe increments
- proxy forwarding from `k_proxy` to `k_server` using a shared upstream token
- Current auth gate for session creation is card-presence probe (`fido2_probe.py --json`), pending upgrade to full assertion verification path.
## Phase 5.5: Implement Dummy Resource + Access Policy on `k_server`
1. Protected dummy resource.

306
k_proxy_app.py Normal file
View File

@ -0,0 +1,306 @@
#!/usr/bin/env python3
"""
Minimal k_proxy service for Phase 5 bring-up.
Behavior:
- Creates short-lived sessions after a card-presence check.
- Reuses valid sessions to access k_server protected counter endpoint.
- Supports session status and logout.
Notes:
- Session login uses `fido2_probe.py --json` command success as auth gate for now.
- This is a Phase 5 starter and not a final production auth design.
"""
from __future__ import annotations
import argparse
import json
import secrets
import subprocess
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
@dataclass
class Session:
username: str
expires_at: float
class ProxyState:
def __init__(
self,
session_ttl_s: int,
auth_command: str,
server_base_url: str,
proxy_token: str,
):
self.session_ttl_s = session_ttl_s
self.auth_command = auth_command
self.server_base_url = server_base_url.rstrip("/")
self.proxy_token = proxy_token
self.lock = threading.Lock()
self.sessions: dict[str, Session] = {}
def _now(self) -> float:
return time.time()
def _gc_locked(self) -> None:
now = self._now()
dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now]
for token in dead:
del self.sessions[token]
def create_session(self, username: str) -> tuple[str, float]:
token = secrets.token_urlsafe(32)
now = self._now()
expires_at = now + self.session_ttl_s
with self.lock:
self._gc_locked()
self.sessions[token] = Session(username=username, expires_at=expires_at)
return token, expires_at
def get_session(self, token: str) -> Session | None:
with self.lock:
self._gc_locked()
return self.sessions.get(token)
def invalidate_session(self, token: str) -> bool:
with self.lock:
return self.sessions.pop(token, None) is not None
def active_session_count(self) -> int:
with self.lock:
self._gc_locked()
return len(self.sessions)
def authenticate_with_card(self) -> tuple[bool, str]:
try:
proc = subprocess.run(
self.auth_command,
shell=True,
capture_output=True,
text=True,
timeout=10,
check=False,
)
except Exception as exc:
return False, f"auth command failed: {exc}"
if proc.returncode != 0:
stderr = proc.stderr.strip()
stdout = proc.stdout.strip()
details = stderr if stderr else stdout
return False, details or f"auth command exit code {proc.returncode}"
return True, "card presence check succeeded"
def fetch_counter(self) -> tuple[int, dict[str, Any]]:
url = f"{self.server_base_url}/resource/counter"
req = Request(url, method="POST")
req.add_header("X-Proxy-Token", self.proxy_token)
req.add_header("Content-Type", "application/json")
body = b"{}"
try:
with urlopen(req, data=body, timeout=5) as resp:
data = json.loads(resp.read().decode("utf-8"))
return resp.status, data
except HTTPError as exc:
try:
data = json.loads(exc.read().decode("utf-8"))
except Exception:
data = {"ok": False, "error": f"server http error {exc.code}"}
return exc.code, data
except URLError as exc:
return 502, {"ok": False, "error": f"server unavailable: {exc.reason}"}
except Exception as exc:
return 502, {"ok": False, "error": f"server call failed: {exc}"}
class Handler(BaseHTTPRequestHandler):
state: ProxyState
def _json(self, status: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def _bearer_token(self) -> str | None:
value = self.headers.get("Authorization", "")
if not value.startswith("Bearer "):
return None
token = value[7:].strip()
return token or None
def _require_session(self) -> tuple[str, Session] | None:
token = self._bearer_token()
if not token:
self._json(401, {"ok": False, "error": "missing bearer token"})
return None
session = self.state.get_session(token)
if not session:
self._json(401, {"ok": False, "error": "invalid or expired session"})
return None
return token, session
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/health":
self._json(
200,
{
"ok": True,
"service": "k_proxy",
"active_sessions": self.state.active_session_count(),
"time": int(time.time()),
},
)
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/session/login":
self._session_login()
return
if path == "/session/status":
self._session_status()
return
if path == "/session/logout":
self._session_logout()
return
if path == "/resource/counter":
self._resource_counter()
return
self.send_error(404)
def _session_login(self) -> None:
try:
data = self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return
username = str(data.get("username", "")).strip()
if not username:
self._json(400, {"ok": False, "error": "username required"})
return
ok, message = self.state.authenticate_with_card()
if not ok:
self._json(401, {"ok": False, "error": "card auth failed", "details": message})
return
token, expires_at = self.state.create_session(username)
self._json(
200,
{
"ok": True,
"username": username,
"session_token": token,
"expires_at": int(expires_at),
"ttl_seconds": self.state.session_ttl_s,
"auth_mode": "card_presence_probe",
},
)
def _session_status(self) -> None:
got = self._require_session()
if not got:
return
_, session = got
self._json(
200,
{
"ok": True,
"username": session.username,
"expires_at": int(session.expires_at),
"seconds_remaining": max(0, int(session.expires_at - time.time())),
},
)
def _session_logout(self) -> None:
token = self._bearer_token()
if not token:
self._json(401, {"ok": False, "error": "missing bearer token"})
return
removed = self.state.invalidate_session(token)
self._json(200, {"ok": True, "invalidated": removed})
def _resource_counter(self) -> None:
got = self._require_session()
if not got:
return
_, session = got
status, upstream = self.state.fetch_counter()
if status != 200:
self._json(status, {"ok": False, "error": "upstream failed", "upstream": upstream})
return
self._json(
200,
{
"ok": True,
"username": session.username,
"session_reused": True,
"upstream": upstream,
},
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run k_proxy session gateway")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8770)
parser.add_argument("--session-ttl", type=int, default=300, help="Session TTL in seconds")
parser.add_argument(
"--auth-command",
default="python3 /home/user/chromecard/fido2_probe.py --json",
help="Command used for session creation auth gate",
)
parser.add_argument(
"--server-base-url",
default="http://127.0.0.1:8780",
help="Base URL for k_server",
)
parser.add_argument(
"--proxy-token",
default="dev-proxy-token",
help="Shared token to authorize requests to k_server",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
state = ProxyState(
session_ttl_s=args.session_ttl,
auth_command=args.auth_command,
server_base_url=args.server_base_url,
proxy_token=args.proxy_token,
)
Handler.state = state
server = ThreadingHTTPServer((args.host, args.port), Handler)
print(f"k_proxy listening on http://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())

106
k_server_app.py Normal file
View File

@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Minimal k_server service for Phase 5/5.5 bring-up.
Behavior:
- Exposes a protected monotonic counter endpoint.
- Accepts only requests from k_proxy via a shared proxy token header.
- Uses thread-safe counter increments.
"""
from __future__ import annotations
import argparse
import json
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import urlparse
class ServerState:
def __init__(self, proxy_token: str):
self.proxy_token = proxy_token
self.counter = 0
self.lock = threading.Lock()
def next_counter(self) -> int:
with self.lock:
self.counter += 1
return self.counter
class Handler(BaseHTTPRequestHandler):
state: ServerState
def _json(self, status: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _is_proxy_authorized(self) -> bool:
return self.headers.get("X-Proxy-Token") == self.state.proxy_token
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/health":
self._json(
200,
{
"ok": True,
"service": "k_server",
"time": int(time.time()),
},
)
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path != "/resource/counter":
self.send_error(404)
return
if not self._is_proxy_authorized():
self._json(401, {"ok": False, "error": "unauthorized proxy"})
return
value = self.state.next_counter()
self._json(
200,
{
"ok": True,
"resource": "counter",
"value": value,
"time": int(time.time()),
},
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run k_server counter service")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8780)
parser.add_argument(
"--proxy-token",
default="dev-proxy-token",
help="Shared token expected in X-Proxy-Token from k_proxy",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
state = ServerState(proxy_token=args.proxy_token)
Handler.state = state
server = ThreadingHTTPServer((args.host, args.port), Handler)
print(f"k_server listening on http://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())