pki_ca/tests/test_api_integration.py

188 lines
6.5 KiB
Python

import os
import unittest
from pathlib import Path
import psycopg
from fastapi.testclient import TestClient
# Run these tests only when a real DB is provided.
DBURL = os.getenv("DATABASE_URL")
ROOT = Path(__file__).resolve().parents[1]
CREATE_TABLES = ROOT / "create_tables.sql"
@unittest.skipUnless(DBURL, "DATABASE_URL not set; skipping API integration tests")
class TestApiIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Ensure the API uses the same DB as the tests.
os.environ["DATABASE_URL"] = DBURL
# Import after setting env so ca_api.db picks it up.
from ca_api.app import app # noqa: WPS433
cls.client = TestClient(app)
# Recreate tables from scratch for a clean slate.
schema_sql = CREATE_TABLES.read_text(encoding="utf-8")
with psycopg.connect(DBURL) as conn:
with conn.cursor() as cur:
cur.execute(schema_sql)
def _log_count(self) -> int:
with psycopg.connect(DBURL) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM log")
return int(cur.fetchone()[0])
def test_01_health(self):
r = self.client.get("/health")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), {"ok": True})
def test_02_creator_person_roundtrip_and_log(self):
before = self._log_count()
r = self.client.post("/creators", json={"name": "Alice", "public_key": "pk-alice"})
self.assertEqual(r.status_code, 201)
creator_id = r.json()["id"]
# Creating a creator should log at least one entry (your core logs mutations).
after = self._log_count()
self.assertGreaterEqual(after, before + 1)
r = self.client.post(
"/persons",
json={"name": "Bob", "public_key": "pk-bob", "creator_id": creator_id},
)
self.assertEqual(r.status_code, 201)
person_id = r.json()["id"]
r = self.client.get(f"/entities/{person_id}")
self.assertEqual(r.status_code, 200)
body = r.json()
self.assertEqual(body["id"], person_id)
self.assertEqual(body["name"], "Bob")
self.assertEqual(body["type"], "person")
def test_03_group_create_add_member_list_members(self):
r = self.client.post("/creators", json={"name": "Admin", "public_key": "pk-admin"})
self.assertEqual(r.status_code, 201)
creator_id = r.json()["id"]
r = self.client.post(
"/groups",
json={
"name": "Engineering",
"public_key": "pk-eng",
"creator_id": creator_id,
"ca_reference": "ca-ref-001",
},
)
self.assertEqual(r.status_code, 201)
group_id = r.json()["id"]
r = self.client.post(
"/persons",
json={"name": "Carol", "public_key": "pk-carol", "creator_id": creator_id},
)
self.assertEqual(r.status_code, 201)
member_id = r.json()["id"]
r = self.client.post(
"/groups/members",
json={"group_id": group_id, "member_id": member_id, "role": "member"},
)
self.assertEqual(r.status_code, 201)
self.assertEqual(r.json(), {"ok": True})
r = self.client.get(f"/groups/{group_id}/members")
self.assertEqual(r.status_code, 200)
members = r.json()
self.assertTrue(any(m["member_id"] == member_id for m in members))
def test_04_property_set_get_delete(self):
r = self.client.post("/creators", json={"name": "PropAdmin", "public_key": "pk-pa"})
self.assertEqual(r.status_code, 201)
creator_id = r.json()["id"]
r = self.client.post(
"/persons",
json={"name": "Dave", "public_key": "pk-dave", "creator_id": creator_id},
)
self.assertEqual(r.status_code, 201)
entity_id = r.json()["id"]
r = self.client.post(
"/properties",
json={
"entity_id": entity_id,
"property_name": "email",
"validation_policy": "default",
"source": "hr-system",
},
)
self.assertEqual(r.status_code, 201)
self.assertEqual(r.json(), {"ok": True})
r = self.client.get(f"/entities/{entity_id}/properties")
self.assertEqual(r.status_code, 200)
props = r.json()["properties"]
self.assertIn("email", props)
r = self.client.request(
"DELETE",
"/properties",
json={"entity_id": entity_id, "property_name": "email"},
)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), {"ok": True})
r = self.client.get(f"/entities/{entity_id}/properties")
self.assertEqual(r.status_code, 200)
props = r.json()["properties"]
self.assertNotIn("email", props)
def test_05_revoked_entity_is_immutable(self):
r = self.client.post("/creators", json={"name": "Revoker", "public_key": "pk-r"})
self.assertEqual(r.status_code, 201)
creator_id = r.json()["id"]
r = self.client.post(
"/persons",
json={"name": "Eve", "public_key": "pk-eve", "creator_id": creator_id},
)
self.assertEqual(r.status_code, 201)
eve_id = r.json()["id"]
# Revoke Eve
r = self.client.post(f"/entities/{eve_id}/status?status=revoked&changed_by={creator_id}")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), {"ok": True})
# Attempt to mutate a revoked entity (should be rejected by core rule)
r = self.client.post(
"/properties",
json={"entity_id": eve_id, "property_name": "email", "validation_policy": "default"},
)
self.assertEqual(r.status_code, 400)
# detail message depends on your core exception text; just confirm it's a 400.
self.assertIn("detail", r.json())
def test_06_metadata_set_and_get(self):
r = self.client.post("/metadata/name", json={"name": "My CA"})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), {"ok": True})
r = self.client.post("/metadata/defense_p", json={"defense_p": True})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), {"ok": True})
r = self.client.get("/metadata")
self.assertEqual(r.status_code, 200)
body = r.json()
self.assertEqual(body["name"], "My CA")
self.assertEqual(body["defense_p"], True)