pki_ca/ca_core/property.py

105 lines
3.2 KiB
Python

from db_logging import log_change
from entity import ensure_entity_active
def _validate_validation_policy(validation_policy: str) -> str:
if validation_policy is None:
return "default"
if not isinstance(validation_policy, str):
raise TypeError("validation_policy must be a string")
vp = validation_policy.strip()
if not vp:
raise ValueError("validation_policy cannot be empty")
if len(vp) > 19:
raise ValueError("validation_policy must be at most 19 characters")
return vp
def _validate_source(source):
if source is None:
return None
if not isinstance(source, str):
raise TypeError("source must be a string or None")
s = source.strip()
if len(s) > 150:
raise ValueError("source must be at most 150 characters")
# Allow empty string -> treat as NULL for cleanliness
return s if s else None
def set_property(cursor, entity_id, property_name, validation_policy="default", source=None):
"""
Revoked entities are immutable: cannot add/update properties.
Schema: property(id, property_name, validation_policy, source)
- validation_policy: CHAR(19) NOT NULL DEFAULT 'default'
- source: VARCHAR(150) NULL
"""
ensure_entity_active(cursor, entity_id)
if not isinstance(property_name, str) or not property_name.strip():
raise ValueError("property_name must be a non-empty string")
if len(property_name) > 100:
raise ValueError("property_name must be at most 100 characters")
vp = _validate_validation_policy(validation_policy)
src = _validate_source(source)
cursor.execute(
"""
INSERT INTO property (id, property_name, validation_policy, source)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id, property_name)
DO UPDATE SET
validation_policy = EXCLUDED.validation_policy,
source = EXCLUDED.source
""",
(entity_id, property_name, vp, src),
)
log_change(
cursor,
f"Set property '{property_name}' for entity {entity_id} "
f"(validation_policy={vp}, source={src})",
)
def get_properties(cursor, entity_id):
"""
Returns a list of property_name values for the entity.
"""
cursor.execute(
"SELECT property_name FROM property WHERE id = %s",
(entity_id,),
)
rows = cursor.fetchall()
return [r["property_name"] for r in rows]
def get_property(cursor, entity_id, property_name):
"""
Returns a dict_row with keys: property_name, validation_policy, source
or None if not found.
"""
cursor.execute(
"""
SELECT property_name, validation_policy, source
FROM property
WHERE id = %s AND property_name = %s
""",
(entity_id, property_name),
)
return cursor.fetchone()
def delete_property(cursor, entity_id, property_name):
"""
Revoked entities are immutable: cannot delete properties.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute(
"DELETE FROM property WHERE id = %s AND property_name = %s",
(entity_id, property_name),
)
log_change(cursor, f"Deleted property '{property_name}' for entity {entity_id}")