k_card/k_client_portal.py

347 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Minimal browser-facing client portal for Phase 6 bring-up.
This runs in k_client, keeps a local preferred username, and talks to k_proxy
over the localhost-forwarded TLS endpoint.
"""
from __future__ import annotations
import argparse
import json
import ssl
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
HTML = """<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>ChromeCard Client Bridge</title>
<style>
:root {
--bg: #f3efe8;
--panel: #fffdf8;
--ink: #181614;
--muted: #655f56;
--line: #d9cfbf;
--accent: #0c6a60;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "Iowan Old Style", "Palatino Linotype", serif;
background:
radial-gradient(circle at top left, rgba(12,106,96,0.12), transparent 34%),
linear-gradient(180deg, #f9f3e8 0%, var(--bg) 100%);
color: var(--ink);
}
main {
max-width: 760px;
margin: 0 auto;
padding: 32px 20px 56px;
}
.panel {
padding: 22px 24px;
border: 1px solid var(--line);
background: linear-gradient(135deg, rgba(255,253,248,0.98), rgba(242,237,228,0.94));
box-shadow: 0 18px 40px rgba(55, 41, 19, 0.08);
}
h1 {
margin: 0 0 8px;
font-size: clamp(2rem, 4vw, 3.4rem);
line-height: 0.95;
letter-spacing: -0.04em;
}
.subtitle {
margin: 0;
color: var(--muted);
max-width: 62ch;
font-size: 1rem;
}
.actions {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin-top: 18px;
}
a.button, button {
text-decoration: none;
border: 0;
padding: 10px 14px;
font: inherit;
color: #fff;
background: var(--accent);
cursor: pointer;
}
pre {
margin: 18px 0 0;
padding: 16px;
overflow: auto;
border: 1px solid var(--line);
background: #16130f;
color: #efe7da;
font-family: "SFMono-Regular", Consolas, monospace;
font-size: 0.9rem;
line-height: 1.45;
}
</style>
</head>
<body>
<main>
<section class="panel">
<h1>ChromeCard Client Bridge</h1>
<p class="subtitle">
Browser traffic should now target `k_proxy` directly at `https://127.0.0.1:9771/`.
This local service remains only as a temporary bridge and compatibility shim.
</p>
<div class="actions">
<a class="button" href="https://127.0.0.1:9771/">Open Proxy Portal</a>
</div>
<pre id="log"></pre>
</section>
</main>
<script>
const logNode = document.getElementById("log");
function log(message, payload) {
const stamp = new Date().toLocaleTimeString();
let line = `[${stamp}] ${message}`;
if (payload !== undefined) {
line += "\\n" + JSON.stringify(payload, null, 2);
}
logNode.textContent = line + "\\n\\n" + logNode.textContent;
}
log("Primary browser target moved", {open: "https://127.0.0.1:9771/"});
setTimeout(() => { window.location.href = "https://127.0.0.1:9771/"; }, 1200);
</script>
</body>
</html>
"""
@dataclass
class EnrollmentRecord:
username: str
class ClientState:
def __init__(self, proxy_base_url: str, proxy_ca_file: str | None, enroll_db: Path):
self.proxy_base_url = proxy_base_url.rstrip("/")
self.proxy_ca_file = proxy_ca_file
self.enroll_db = enroll_db
self.lock = threading.Lock()
self.preferred_enrollment: EnrollmentRecord | None = None
self.session_token: str | None = None
self.session_expires_at: int | None = None
self._load_preferred_enrollment()
def _ssl_context(self):
if self.proxy_base_url.startswith("https://"):
return ssl.create_default_context(cafile=self.proxy_ca_file)
return None
def _proxy_json(self, method: str, path: str, payload: dict[str, Any] | None = None) -> tuple[int, dict[str, Any]]:
req = Request(f"{self.proxy_base_url}{path}", method=method)
req.add_header("Content-Type", "application/json")
token = self.get_session_token()
if token:
req.add_header("Authorization", f"Bearer {token}")
body = json.dumps(payload or {}).encode("utf-8")
try:
with urlopen(req, data=body, timeout=10, context=self._ssl_context()) as resp:
return resp.status, json.loads(resp.read().decode("utf-8"))
except HTTPError as exc:
try:
return exc.code, json.loads(exc.read().decode("utf-8"))
except Exception:
return exc.code, {"ok": False, "error": f"proxy http error {exc.code}"}
except URLError as exc:
return 502, {"ok": False, "error": f"proxy unavailable: {exc.reason}"}
except Exception as exc:
return 502, {"ok": False, "error": f"proxy call failed: {exc}"}
def _load_preferred_enrollment(self) -> None:
if not self.enroll_db.exists():
return
try:
data = json.loads(self.enroll_db.read_text())
username = str(data.get("username", "")).strip()
if username:
self.preferred_enrollment = EnrollmentRecord(username=username)
except Exception:
self.preferred_enrollment = None
def _save_preferred_enrollment_locked(self) -> None:
self.enroll_db.parent.mkdir(parents=True, exist_ok=True)
payload = {"username": self.preferred_enrollment.username if self.preferred_enrollment else None}
self.enroll_db.write_text(json.dumps(payload, indent=2) + "\n")
def enroll(self, username: str) -> dict[str, Any]:
username = username.strip()
if not username:
return {"ok": False, "error": "username required"}
status, data = self._proxy_json("POST", "/enroll/register", {"username": username})
if status != 200:
return data
with self.lock:
self.preferred_enrollment = EnrollmentRecord(username=username)
self._save_preferred_enrollment_locked()
self.session_token = None
self.session_expires_at = None
return {
"ok": True,
"enrolled_username": username,
"proxy_enrollment": data,
}
def snapshot(self) -> dict[str, Any]:
with self.lock:
return {
"ok": True,
"enrolled_username": self.preferred_enrollment.username if self.preferred_enrollment else None,
"session_active": bool(self.session_token),
"session_expires_at": self.session_expires_at,
"proxy_base_url": self.proxy_base_url,
}
def get_session_token(self) -> str | None:
with self.lock:
return self.session_token
def login(self) -> tuple[int, dict[str, Any]]:
with self.lock:
if not self.preferred_enrollment:
return 400, {"ok": False, "error": "no enrolled user"}
username = self.preferred_enrollment.username
status, data = self._proxy_json("POST", "/session/login", {"username": username})
if status == 200 and data.get("session_token"):
with self.lock:
self.session_token = data["session_token"]
self.session_expires_at = int(data.get("expires_at", 0)) or None
return status, data
def status(self) -> tuple[int, dict[str, Any]]:
return self._proxy_json("POST", "/session/status")
def counter(self) -> tuple[int, dict[str, Any]]:
return self._proxy_json("POST", "/resource/counter")
def logout(self) -> tuple[int, dict[str, Any]]:
status, data = self._proxy_json("POST", "/session/logout")
if status == 200:
with self.lock:
self.session_token = None
self.session_expires_at = None
return status, data
class Handler(BaseHTTPRequestHandler):
state: ClientState
def _json(self, status: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _html(self, body: str) -> None:
data = body.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/":
self._html(HTML)
return
if path == "/health":
self._json(200, {"ok": True, "service": "k_client_portal", "time": int(time.time())})
return
if path == "/api/client/state":
self._json(200, self.state.snapshot())
return
self.send_error(404)
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/api/enroll":
try:
data = self._read_json()
except Exception:
self._json(400, {"ok": False, "error": "invalid json"})
return
result = self.state.enroll(str(data.get("username", "")))
self._json(200 if result.get("ok") else 400, result)
return
if path == "/api/login":
status, data = self.state.login()
self._json(status, data)
return
if path == "/api/status":
status, data = self.state.status()
self._json(status, data)
return
if path == "/api/resource/counter":
status, data = self.state.counter()
self._json(status, data)
return
if path == "/api/logout":
status, data = self.state.logout()
self._json(status, data)
return
self.send_error(404)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run browser-facing client portal in k_client")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8766)
parser.add_argument("--proxy-base-url", default="https://127.0.0.1:9771")
parser.add_argument("--proxy-ca-file", help="CA certificate used to verify k_proxy HTTPS certificate")
parser.add_argument("--enroll-db", default="/home/user/chromecard/k_client_enrollment.json")
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.proxy_base_url.startswith("https://") and not args.proxy_ca_file:
raise SystemExit("--proxy-ca-file is required when --proxy-base-url uses https")
Handler.state = ClientState(
proxy_base_url=args.proxy_base_url,
proxy_ca_file=args.proxy_ca_file,
enroll_db=Path(args.enroll_db),
)
server = ThreadingHTTPServer((args.host, args.port), Handler)
print(f"k_client_portal listening on http://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())