#!/usr/bin/env python3 """ Minimal k_proxy service for Phase 5 bring-up. Behavior: - Creates short-lived sessions after a card-presence check. - Reuses valid sessions to access k_server protected counter endpoint. - Supports session status and logout. Notes: - Session login uses `fido2_probe.py --json` command success as auth gate for now. - This is a Phase 5 starter and not a final production auth design. """ from __future__ import annotations import argparse import json import secrets import ssl import subprocess import threading import time from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen @dataclass class Session: username: str expires_at: float class ProxyState: def __init__( self, session_ttl_s: int, auth_command: str, server_base_url: str, server_ca_file: str | None, proxy_token: str, ): self.session_ttl_s = session_ttl_s self.auth_command = auth_command self.server_base_url = server_base_url.rstrip("/") self.server_ca_file = server_ca_file self.proxy_token = proxy_token self.lock = threading.Lock() self.sessions: dict[str, Session] = {} def _now(self) -> float: return time.time() def _gc_locked(self) -> None: now = self._now() dead = [token for token, sess in self.sessions.items() if sess.expires_at <= now] for token in dead: del self.sessions[token] def create_session(self, username: str) -> tuple[str, float]: token = secrets.token_urlsafe(32) now = self._now() expires_at = now + self.session_ttl_s with self.lock: self._gc_locked() self.sessions[token] = Session(username=username, expires_at=expires_at) return token, expires_at def get_session(self, token: str) -> Session | None: with self.lock: self._gc_locked() return self.sessions.get(token) def invalidate_session(self, token: str) -> bool: with self.lock: return self.sessions.pop(token, None) is not None def active_session_count(self) -> int: with self.lock: self._gc_locked() return len(self.sessions) def authenticate_with_card(self) -> tuple[bool, str]: try: proc = subprocess.run( self.auth_command, shell=True, capture_output=True, text=True, timeout=10, check=False, ) except Exception as exc: return False, f"auth command failed: {exc}" if proc.returncode != 0: stderr = proc.stderr.strip() stdout = proc.stdout.strip() details = stderr if stderr else stdout return False, details or f"auth command exit code {proc.returncode}" return True, "card presence check succeeded" def fetch_counter(self) -> tuple[int, dict[str, Any]]: url = f"{self.server_base_url}/resource/counter" req = Request(url, method="POST") req.add_header("X-Proxy-Token", self.proxy_token) req.add_header("Content-Type", "application/json") body = b"{}" ssl_context = None if self.server_base_url.startswith("https://"): ssl_context = ssl.create_default_context(cafile=self.server_ca_file) try: with urlopen(req, data=body, timeout=5, context=ssl_context) as resp: data = json.loads(resp.read().decode("utf-8")) return resp.status, data except HTTPError as exc: try: data = json.loads(exc.read().decode("utf-8")) except Exception: data = {"ok": False, "error": f"server http error {exc.code}"} return exc.code, data except URLError as exc: return 502, {"ok": False, "error": f"server unavailable: {exc.reason}"} except Exception as exc: return 502, {"ok": False, "error": f"server call failed: {exc}"} class Handler(BaseHTTPRequestHandler): state: ProxyState def _json(self, status: int, payload: dict[str, Any]) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length) if not raw: return {} return json.loads(raw.decode("utf-8")) def _bearer_token(self) -> str | None: value = self.headers.get("Authorization", "") if not value.startswith("Bearer "): return None token = value[7:].strip() return token or None def _require_session(self) -> tuple[str, Session] | None: token = self._bearer_token() if not token: self._json(401, {"ok": False, "error": "missing bearer token"}) return None session = self.state.get_session(token) if not session: self._json(401, {"ok": False, "error": "invalid or expired session"}) return None return token, session def do_GET(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/health": self._json( 200, { "ok": True, "service": "k_proxy", "active_sessions": self.state.active_session_count(), "time": int(time.time()), }, ) return self.send_error(404) def do_POST(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/session/login": self._session_login() return if path == "/session/status": self._session_status() return if path == "/session/logout": self._session_logout() return if path == "/resource/counter": self._resource_counter() return self.send_error(404) def _session_login(self) -> None: try: data = self._read_json() except Exception: self._json(400, {"ok": False, "error": "invalid json"}) return username = str(data.get("username", "")).strip() if not username: self._json(400, {"ok": False, "error": "username required"}) return ok, message = self.state.authenticate_with_card() if not ok: self._json(401, {"ok": False, "error": "card auth failed", "details": message}) return token, expires_at = self.state.create_session(username) self._json( 200, { "ok": True, "username": username, "session_token": token, "expires_at": int(expires_at), "ttl_seconds": self.state.session_ttl_s, "auth_mode": "card_presence_probe", }, ) def _session_status(self) -> None: got = self._require_session() if not got: return _, session = got self._json( 200, { "ok": True, "username": session.username, "expires_at": int(session.expires_at), "seconds_remaining": max(0, int(session.expires_at - time.time())), }, ) def _session_logout(self) -> None: token = self._bearer_token() if not token: self._json(401, {"ok": False, "error": "missing bearer token"}) return removed = self.state.invalidate_session(token) self._json(200, {"ok": True, "invalidated": removed}) def _resource_counter(self) -> None: got = self._require_session() if not got: return _, session = got status, upstream = self.state.fetch_counter() if status != 200: self._json(status, {"ok": False, "error": "upstream failed", "upstream": upstream}) return self._json( 200, { "ok": True, "username": session.username, "session_reused": True, "upstream": upstream, }, ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run k_proxy session gateway") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8770) parser.add_argument("--tls-certfile", help="PEM certificate chain for HTTPS listener") parser.add_argument("--tls-keyfile", help="PEM private key for HTTPS listener") parser.add_argument("--session-ttl", type=int, default=300, help="Session TTL in seconds") parser.add_argument( "--auth-command", default="python3 /home/user/chromecard/fido2_probe.py --json", help="Command used for session creation auth gate", ) parser.add_argument( "--server-base-url", default="http://127.0.0.1:8780", help="Base URL for k_server", ) parser.add_argument( "--server-ca-file", help="CA certificate used to verify HTTPS certificate presented by k_server", ) parser.add_argument( "--proxy-token", default="dev-proxy-token", help="Shared token to authorize requests to k_server", ) return parser.parse_args() def main() -> int: args = parse_args() if bool(args.tls_certfile) != bool(args.tls_keyfile): raise SystemExit("Both --tls-certfile and --tls-keyfile are required to enable HTTPS") if args.server_base_url.startswith("https://") and not args.server_ca_file: raise SystemExit("--server-ca-file is required when --server-base-url uses https") state = ProxyState( session_ttl_s=args.session_ttl, auth_command=args.auth_command, server_base_url=args.server_base_url, server_ca_file=args.server_ca_file, proxy_token=args.proxy_token, ) Handler.state = state server = ThreadingHTTPServer((args.host, args.port), Handler) scheme = "http" if args.tls_certfile: context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(certfile=args.tls_certfile, keyfile=args.tls_keyfile) server.socket = context.wrap_socket(server.socket, server_side=True) scheme = "https" print(f"k_proxy listening on {scheme}://{args.host}:{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())