pki_ca/ca_api/app.py

214 lines
6.1 KiB
Python

# ca_api/app.py
from __future__ import annotations
from typing import Any, Dict, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from ca_api.db import db_cursor
from ca_core import entity as entity_core
from ca_core import group_member as group_core
from ca_core import property as property_core
from ca_core import metadata as metadata_core
app = FastAPI(title="PKI CA API")
def bad_request(e: Exception) -> HTTPException:
return HTTPException(status_code=400, detail=str(e))
class CreatorIn(BaseModel):
name: str = Field(min_length=1, max_length=100)
public_key: str = Field(min_length=1, max_length=300)
class EnrollPersonIn(BaseModel):
name: str = Field(min_length=1, max_length=100)
public_key: str = Field(min_length=1, max_length=300)
creator_id: int
class CreateGroupIn(BaseModel):
name: str = Field(min_length=1, max_length=100)
public_key: str = Field(min_length=1, max_length=300)
creator_id: int
ca_reference: str = Field(min_length=1, max_length=100)
class AddMemberIn(BaseModel):
group_id: int
member_id: int
role: str = Field(min_length=1, max_length=10)
class SetPropertyIn(BaseModel):
entity_id: int
property_name: str = Field(min_length=1, max_length=100)
validation_policy: Optional[str] = Field(default="default", max_length=19)
source: Optional[str] = Field(default=None, max_length=150)
class DeletePropertyIn(BaseModel):
entity_id: int
property_name: str = Field(min_length=1, max_length=100)
class SetMetadataNameIn(BaseModel):
name: str = Field(min_length=1, max_length=100)
class SetMetadataDefensePIn(BaseModel):
defense_p: bool
@app.get("/health")
def health() -> Dict[str, Any]:
return {"ok": True}
# ---- Entities ----
@app.post("/creators", status_code=201)
def create_creator(payload: CreatorIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
creator_id = entity_core.insert_creator(cur, payload.name, payload.public_key)
return {"id": creator_id}
except (ValueError, TypeError) as e:
raise bad_request(e)
@app.post("/persons", status_code=201)
def enroll_person(payload: EnrollPersonIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
person_id = entity_core.enroll_person(cur, payload.name, payload.public_key, payload.creator_id)
return {"id": person_id}
except (ValueError, TypeError) as e:
raise bad_request(e)
@app.post("/groups", status_code=201)
def create_group(payload: CreateGroupIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
group_id = entity_core.create_group(
cur,
payload.name,
payload.public_key,
payload.creator_id,
payload.ca_reference,
)
return {"id": group_id}
except (ValueError, TypeError) as e:
raise bad_request(e)
@app.get("/entities/{entity_id}")
def get_entity(entity_id: int) -> Dict[str, Any]:
with db_cursor() as cur:
row = entity_core.get_entity(cur, entity_id)
if row is None:
raise HTTPException(status_code=404, detail="Entity not found")
return dict(row)
@app.post("/entities/{entity_id}/status")
def set_entity_status(entity_id: int, status: str, changed_by: int) -> Dict[str, Any]:
try:
with db_cursor() as cur:
entity_core.set_entity_status(cur, entity_id, status, changed_by)
return {"ok": True}
except (ValueError, TypeError) as e:
raise bad_request(e)
# ---- Group members ----
@app.post("/groups/members", status_code=201)
def add_member(payload: AddMemberIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
group_core.add_group_member(cur, payload.group_id, payload.member_id, payload.role)
return {"ok": True}
except (ValueError, TypeError) as e:
raise bad_request(e)
@app.get("/groups/{group_id}/members")
def list_members(group_id: int):
with db_cursor() as cur:
rows = group_core.get_members_of_group(cur, group_id)
return [dict(r) for r in rows]
# ---- Properties ----
@app.post("/properties", status_code=201)
def set_property(payload: SetPropertyIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
property_core.set_property(
cur,
payload.entity_id,
payload.property_name,
validation_policy=payload.validation_policy or "default",
source=payload.source,
)
return {"ok": True}
except (ValueError, TypeError) as e:
raise bad_request(e)
@app.get("/entities/{entity_id}/properties")
def get_properties(entity_id: int) -> Dict[str, Any]:
with db_cursor() as cur:
props = property_core.get_properties(cur, entity_id)
return {"entity_id": entity_id, "properties": props}
@app.delete("/properties")
def delete_property(payload: DeletePropertyIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
property_core.delete_property(cur, payload.entity_id, payload.property_name)
return {"ok": True}
except (ValueError, TypeError) as e:
raise bad_request(e)
# ---- Metadata ----
@app.get("/metadata")
def get_metadata() -> Dict[str, Any]:
with db_cursor() as cur:
return {
"name": metadata_core.get_name(cur),
"comment": metadata_core.get_comment(cur),
"public_key": metadata_core.get_public_key(cur),
"defense_p": metadata_core.get_defense_p(cur),
}
@app.post("/metadata/name")
def set_metadata_name(payload: SetMetadataNameIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
metadata_core.set_name(cur, payload.name)
return {"ok": True}
except (ValueError, TypeError) as e:
raise bad_request(e)
@app.post("/metadata/defense_p")
def set_metadata_defense_p(payload: SetMetadataDefensePIn) -> Dict[str, Any]:
try:
with db_cursor() as cur:
metadata_core.set_defense_p(cur, payload.defense_p)
return {"ok": True}
except (ValueError, TypeError) as e:
raise bad_request(e)