pki_ca/ca_core/metadata.py

86 lines
2.7 KiB
Python

from .db_logging import log_change
def _ensure_singleton_row(cursor):
"""Ensure exactly one metadata row exists.
The metadata table is treated as a singleton at the application level.
Setters must ONLY update the relevant column(s) and must not wipe others.
If the table is empty, a single default row is inserted.
If the table contains more than one row, we raise to avoid ambiguous reads.
"""
cursor.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cursor.fetchone()
cnt = int(row["cnt"]) if row and row["cnt"] is not None else 0
if cnt == 0:
# Rely on column defaults (e.g., defense_p default false) and nullable columns.
cursor.execute("INSERT INTO metadata DEFAULT VALUES")
elif cnt > 1:
raise ValueError("metadata table must contain exactly one row")
def set_name(cursor, name):
_ensure_singleton_row(cursor)
cursor.execute("UPDATE metadata SET name = %s", (name,))
log_change(cursor, f"Updated metadata name to {name}")
def get_name(cursor):
cursor.execute("SELECT name FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["name"] if row else None
def set_comment(cursor, comment):
_ensure_singleton_row(cursor)
cursor.execute("UPDATE metadata SET comment = %s", (comment,))
log_change(cursor, f"Updated metadata comment to {comment}")
def get_comment(cursor):
cursor.execute("SELECT comment FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["comment"] if row else None
def set_keys(cursor, public_key, private_key):
_ensure_singleton_row(cursor)
cursor.execute(
"UPDATE metadata SET public_key = %s, private_key = %s",
(public_key, private_key),
)
log_change(cursor, "Updated metadata keys")
def get_public_key(cursor):
cursor.execute("SELECT public_key FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["public_key"] if row else None
def get_private_key(cursor):
cursor.execute("SELECT private_key FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["private_key"] if row else None
def set_defense_p(cursor, defense_p: bool):
"""Set the metadata defense_p flag.
This table is treated as a singleton row at the application level.
This setter updates ONLY defense_p and preserves all other columns.
"""
_ensure_singleton_row(cursor)
cursor.execute("UPDATE metadata SET defense_p = %s", (defense_p,))
log_change(cursor, f"Updated metadata defense_p to {defense_p}")
def get_defense_p(cursor) -> bool:
cursor.execute("SELECT defense_p FROM metadata LIMIT 1")
row = cursor.fetchone()
if not row or row["defense_p"] is None:
return False
return bool(row["defense_p"])