# ca_api/app.py from __future__ import annotations from typing import Any, Dict, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from ca_api.db import db_cursor from ca_core import entity as entity_core from ca_core import group_member as group_core from ca_core import property as property_core from ca_core import metadata as metadata_core app = FastAPI(title="PKI CA API") def bad_request(e: Exception) -> HTTPException: return HTTPException(status_code=400, detail=str(e)) class CreatorIn(BaseModel): name: str = Field(min_length=1, max_length=100) public_key: str = Field(min_length=1, max_length=300) class EnrollPersonIn(BaseModel): name: str = Field(min_length=1, max_length=100) public_key: str = Field(min_length=1, max_length=300) creator_id: int class CreateGroupIn(BaseModel): name: str = Field(min_length=1, max_length=100) public_key: str = Field(min_length=1, max_length=300) creator_id: int ca_reference: str = Field(min_length=1, max_length=100) class AddMemberIn(BaseModel): group_id: int member_id: int role: str = Field(min_length=1, max_length=10) class SetPropertyIn(BaseModel): entity_id: int property_name: str = Field(min_length=1, max_length=100) validation_policy: Optional[str] = Field(default="default", max_length=19) source: Optional[str] = Field(default=None, max_length=150) class DeletePropertyIn(BaseModel): entity_id: int property_name: str = Field(min_length=1, max_length=100) class SetMetadataNameIn(BaseModel): name: str = Field(min_length=1, max_length=100) class SetMetadataDefensePIn(BaseModel): defense_p: bool @app.get("/health") def health() -> Dict[str, Any]: return {"ok": True} # ---- Entities ---- @app.post("/creators", status_code=201) def create_creator(payload: CreatorIn) -> Dict[str, Any]: try: with db_cursor() as cur: creator_id = entity_core.insert_creator(cur, payload.name, payload.public_key) return {"id": creator_id} except (ValueError, TypeError) as e: raise bad_request(e) @app.post("/persons", status_code=201) def enroll_person(payload: EnrollPersonIn) -> Dict[str, Any]: try: with db_cursor() as cur: person_id = entity_core.enroll_person(cur, payload.name, payload.public_key, payload.creator_id) return {"id": person_id} except (ValueError, TypeError) as e: raise bad_request(e) @app.post("/groups", status_code=201) def create_group(payload: CreateGroupIn) -> Dict[str, Any]: try: with db_cursor() as cur: group_id = entity_core.create_group( cur, payload.name, payload.public_key, payload.creator_id, payload.ca_reference, ) return {"id": group_id} except (ValueError, TypeError) as e: raise bad_request(e) @app.get("/entities/{entity_id}") def get_entity(entity_id: int) -> Dict[str, Any]: with db_cursor() as cur: row = entity_core.get_entity(cur, entity_id) if row is None: raise HTTPException(status_code=404, detail="Entity not found") return dict(row) @app.post("/entities/{entity_id}/status") def set_entity_status(entity_id: int, status: str, changed_by: int) -> Dict[str, Any]: try: with db_cursor() as cur: entity_core.set_entity_status(cur, entity_id, status, changed_by) return {"ok": True} except (ValueError, TypeError) as e: raise bad_request(e) # ---- Group members ---- @app.post("/groups/members", status_code=201) def add_member(payload: AddMemberIn) -> Dict[str, Any]: try: with db_cursor() as cur: group_core.add_group_member(cur, payload.group_id, payload.member_id, payload.role) return {"ok": True} except (ValueError, TypeError) as e: raise bad_request(e) @app.get("/groups/{group_id}/members") def list_members(group_id: int): with db_cursor() as cur: rows = group_core.get_members_of_group(cur, group_id) return [dict(r) for r in rows] # ---- Properties ---- @app.post("/properties", status_code=201) def set_property(payload: SetPropertyIn) -> Dict[str, Any]: try: with db_cursor() as cur: property_core.set_property( cur, payload.entity_id, payload.property_name, validation_policy=payload.validation_policy or "default", source=payload.source, ) return {"ok": True} except (ValueError, TypeError) as e: raise bad_request(e) @app.get("/entities/{entity_id}/properties") def get_properties(entity_id: int) -> Dict[str, Any]: with db_cursor() as cur: props = property_core.get_properties(cur, entity_id) return {"entity_id": entity_id, "properties": props} @app.delete("/properties") def delete_property(payload: DeletePropertyIn) -> Dict[str, Any]: try: with db_cursor() as cur: property_core.delete_property(cur, payload.entity_id, payload.property_name) return {"ok": True} except (ValueError, TypeError) as e: raise bad_request(e) # ---- Metadata ---- @app.get("/metadata") def get_metadata() -> Dict[str, Any]: with db_cursor() as cur: return { "name": metadata_core.get_name(cur), "comment": metadata_core.get_comment(cur), "public_key": metadata_core.get_public_key(cur), "defense_p": metadata_core.get_defense_p(cur), } @app.post("/metadata/name") def set_metadata_name(payload: SetMetadataNameIn) -> Dict[str, Any]: try: with db_cursor() as cur: metadata_core.set_name(cur, payload.name) return {"ok": True} except (ValueError, TypeError) as e: raise bad_request(e) @app.post("/metadata/defense_p") def set_metadata_defense_p(payload: SetMetadataDefensePIn) -> Dict[str, Any]: try: with db_cursor() as cur: metadata_core.set_defense_p(cur, payload.defense_p) return {"ok": True} except (ValueError, TypeError) as e: raise bad_request(e)