Compare commits

..

2 Commits

Author SHA1 Message Date
Morten V. Christiansen a1be210f58 felt rettelser 2026-02-26 17:28:43 +01:00
Morten V. Christiansen 7470cf7189 various 2026-02-26 16:24:26 +01:00
21 changed files with 662 additions and 418 deletions

167
PROJECT_CONTEXT.md Normal file
View File

@ -0,0 +1,167 @@
CA/PKI Backend Project Context
Stack
Python 3 + psycopg (dict_row cursors)
PostgreSQL database: ca
Unit tests: unittest (python3 -m unittest discover)
Database Schema (current assumptions)
entity
id INT identity PK
creation_ts TIMESTAMPTZ default now()
creator INT FK → entity(id) (the entity that created this one; nullable)
name VARCHAR(100) NOT NULL
type VARCHAR(...) NOT NULL (e.g. person, group, device, alias)
public_key VARCHAR(300) NOT NULL
symmetrical_key VARCHAR(100) NULL
status VARCHAR(...) NOT NULL default 'active' (values: 'active', 'revoked')
expiration DATE NULL
Index on entity(name) (and other indexes as needed)
group_member
group_id INT FK → entity(id) ON DELETE CASCADE
member_id INT FK → entity(id) ON DELETE CASCADE
role VARCHAR(10)
PK (group_id, member_id)
Index (member_id, group_id)
Groups can contain any entity type, including other groups and devices.
property
Columns: (id INT FK → entity(id), property_name VARCHAR(100))
PK (id, property_name)
Used for flags/roles such as "creator"
metadata
Intended “singleton row” table, enforced at application level
Columns: name, comment, private_key, public_key
log
id SERIAL PK
ts TIMESTAMPTZ default now()
entry TEXT NOT NULL
Every API mutation must log one row here.
Core Business Rules
Creators are not an entity type
creator is a property (property_name='creator') on a person entity.
insert_creator() creates a person entity and inserts the creator property.
Revoked entities are immutable
Any mutation on an entity requires ensure_entity_active(cursor, entity_id).
Revoked entities cannot:
Join groups or accept members
Add/delete properties
Change keys (public_key, symmetrical_key)
Change status again
Logging
All changes to entity, group_member, property, metadata must call log_change(cursor, "...")
Logging happens inside the same transaction (no extra commits).
Python Modules (current structure)
ca_core/entity.py
Must provide:
ensure_entity_active(cursor, entity_id)
insert_creator(cursor, name, public_key)
enroll_person(cursor, name, public_key, creator_id)
create_group(cursor, name, public_key, creator_id)
create_alias(cursor, target_entity_id)
get_entity(cursor, entity_id)
set_entity_status(cursor, entity_id, status, changed_by) (requires active entity)
set_entity_keys(cursor, entity_id, public_key, changed_by) (active-only)
set_symmetrical_key(cursor, entity_id, key, changed_by) (active-only)
get_symmetrical_key(cursor, entity_id)
ca_core/group_member.py
Uses member_id (not person_id)
Must prevent adding revoked groups/members (via ensure_entity_active)
Logs add/remove membership
ca_core/property.py
Table is property(id, property_name) (NOT entity_id/name)
Must reject mutations if entity revoked (immutability)
Logs set/delete property
ca_core/metadata.py
Updates metadata fields and logs changes
ca_core/db_logging.py
log_change(cursor, message: str) inserts into log(entry)
Tests
tests/test_entity.py, tests/test_group.py, tests/test_property.py, tests/test_metadata.py
Tests verify:
Core behaviors (create, enroll, group membership, revoke immutability)
Log entry is created for mutations (case-insensitive substring checks)
Run via:
python3 -m unittest discover
Known gotchas
Avoid naming a module logging.py (conflicts with stdlib). Use db_logging.py.
Schema and code must stay aligned (e.g., property.id/property_name, group_member.member_id).

Binary file not shown.

9
ca_core/db_logging.py Normal file
View File

@ -0,0 +1,9 @@
# db_logging.py
def log_change(cursor, message: str):
"""Insert a log entry into the log table."""
cursor.execute(
"INSERT INTO log (entry) VALUES (%s)",
(message,)
)

View File

@ -1,169 +1,132 @@
import random
import string
from db_logging import log_change
# ------------------------
# Helper for ownership checks
# ------------------------
def _verify_ownership(cursor, entity_id, requesting_creator_id):
cursor.execute(
"SELECT id, creator, type, status FROM entity WHERE id=%s", (entity_id,)
)
def ensure_entity_active(cursor, entity_id):
"""
Ensure an entity exists and is active.
Revoked entities are immutable.
"""
cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Entity not found or inactive")
owner_id = row["creator"]
entity_type = row["type"]
entity_id_db = row["id"]
if entity_type == "creator":
if requesting_creator_id != entity_id_db:
raise ValueError("Creator ID does not match entity owner")
else:
if requesting_creator_id != owner_id:
raise ValueError("Creator ID does not match entity owner")
if row is None:
raise ValueError("Entity does not exist")
if row["status"] != "active":
raise ValueError("Entity is not active")
def _validate_ca_reference_for_group(ca_reference):
if ca_reference is None:
raise ValueError("ca_reference is required for groups")
if not isinstance(ca_reference, str):
raise ValueError("ca_reference must be a string")
if len(ca_reference) == 0:
raise ValueError("ca_reference cannot be empty")
if len(ca_reference) > 100:
raise ValueError("ca_reference must be at most 100 characters")
# ------------------------
# Insertions
# ------------------------
def insert_creator(cursor, name, public_key):
"""
Creators are persons with property 'creator' in the property table.
"""
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'creator', %s, NULL, 'active')
INSERT INTO entity (name, type, public_key, status, ca_reference)
VALUES (%s, 'person', %s, 'active', NULL)
RETURNING id
""",
(name, public_key)
(name, public_key),
)
return cursor.fetchone()["id"]
creator_id = cursor.fetchone()["id"]
# Mark as creator via property table (schema: property(id, property_name))
cursor.execute(
"""
INSERT INTO property (id, property_name)
VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING
""",
(creator_id, "creator"),
)
log_change(cursor, f"Created creator entity {creator_id} with name {name}")
return creator_id
def enroll_person(cursor, name, public_key, creator_id):
cursor.execute(
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
row = cursor.fetchone()
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
ensure_entity_active(cursor, creator_id)
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, %s, 'active')
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
VALUES (%s, 'person', %s, %s, 'active', NULL)
RETURNING id
""",
(name, public_key, creator_id)
(name, public_key, creator_id),
)
return cursor.fetchone()["id"]
person_id = cursor.fetchone()["id"]
log_change(cursor, f"Enrolled person {person_id} under creator {creator_id}")
return person_id
def create_group(cursor, name, public_key, creator_id):
cursor.execute(
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
row = cursor.fetchone()
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
def create_group(cursor, name, public_key, creator_id, ca_reference):
ensure_entity_active(cursor, creator_id)
_validate_ca_reference_for_group(ca_reference)
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'group', %s, %s, 'active')
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
VALUES (%s, 'group', %s, %s, 'active', %s)
RETURNING id
""",
(name, public_key, creator_id)
(name, public_key, creator_id, ca_reference),
)
return cursor.fetchone()["id"]
group_id = cursor.fetchone()["id"]
def create_alias(cursor, person_id):
cursor.execute(
"SELECT id, type, public_key, status FROM entity WHERE id=%s", (person_id,)
log_change(
cursor,
f"Created group {group_id} under creator {creator_id} with ca_reference {ca_reference}",
)
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Person not found or inactive")
if row["type"] != "person":
raise ValueError("Only persons can create aliases")
random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, %s, 'active')
RETURNING id
""",
(random_name, row["public_key"], person_id)
)
return cursor.fetchone()["id"]
return group_id
# ------------------------
# Soft-delete / revocation
# ------------------------
def revoke_entity(cursor, entity_id, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET status=%s WHERE id=%s", ("revoked", entity_id)
)
# ------------------------
# Getters / Setters
# ------------------------
def get_entity(cursor, entity_id):
cursor.execute("SELECT * FROM entity WHERE id = %s", (entity_id,))
return cursor.fetchone()
def set_entity_status(cursor, entity_id, status, changed_by):
"""
Only active entities can change status. Once revoked, immutable.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute("UPDATE entity SET status = %s WHERE id = %s", (status, entity_id))
log_change(cursor, f"Set status of entity {entity_id} to {status} by {changed_by}")
def set_symmetrical_key(cursor, entity_id, key_value, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute(
"SELECT * FROM entity WHERE id=%s AND status='active'", (entity_id,)
"UPDATE entity SET symmetrical_key = %s WHERE id = %s",
(key_value, entity_id),
)
log_change(cursor, f"Set symmetrical_key for entity {entity_id} by {changed_by}")
def get_symmetrical_key(cursor, entity_id):
cursor.execute("SELECT symmetrical_key FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row
return row["symmetrical_key"] if row else None
def get_entity_id(cursor, name):
def set_entity_keys(cursor, entity_id, public_key, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute(
"SELECT id FROM entity WHERE name=%s AND status='active'", (name,)
"UPDATE entity SET public_key = %s WHERE id = %s",
(public_key, entity_id),
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["id"]
def get_entity_public_key(cursor, entity_id):
cursor.execute(
"SELECT public_key FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["public_key"]
def get_entity_name(cursor, entity_id):
cursor.execute(
"SELECT name FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["name"]
def set_entity_name(cursor, entity_id, new_name, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id))
def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id)
)
def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id):
set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id)
log_change(cursor, f"Updated public key for entity {entity_id} by {changed_by}")

View File

@ -1,42 +1,32 @@
# ca_core/group_member.py
from db_logging import log_change
from entity import ensure_entity_active
def add_group_member(cursor, group_id: int, member_id: int, role: str):
# Verify group exists and is active
cursor.execute("SELECT type, status FROM entity WHERE id=%s", (group_id,))
row = cursor.fetchone()
if not row or row["status"] != "active" or row["type"] != "group":
raise ValueError("Invalid or inactive group")
# Verify member exists and is active
cursor.execute("SELECT status FROM entity WHERE id=%s", (member_id,))
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Invalid or inactive member")
def add_group_member(cursor, group_id, member_id, role):
ensure_entity_active(cursor, group_id)
ensure_entity_active(cursor, member_id)
cursor.execute(
"INSERT INTO group_member (group_id, member_id, role) VALUES (%s, %s, %s)",
"""
INSERT INTO group_member (group_id, member_id, role)
VALUES (%s, %s, %s)
""",
(group_id, member_id, role)
)
def remove_group_member(cursor, group_id: int, member_id: int):
cursor.execute(
"DELETE FROM group_member WHERE group_id=%s AND member_id=%s",
(group_id, member_id)
log_change(
cursor,
f"Added member {member_id} to group {group_id} as {role}"
)
def get_groups_for_member(cursor, member_id: int):
def get_members_of_group(cursor, group_id):
cursor.execute(
"SELECT group_id, role FROM group_member WHERE member_id=%s",
(member_id,)
)
return cursor.fetchall()
def get_members_of_group(cursor, group_id: int):
cursor.execute(
"SELECT member_id, role FROM group_member WHERE group_id=%s",
"""
SELECT member_id, role
FROM group_member
WHERE group_id = %s
""",
(group_id,)
)
return cursor.fetchall()

View File

@ -1,40 +1,77 @@
# ca_core/metadata.py
from db_logging import log_change
def set_name(cursor, name):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (name) VALUES (%s)",
(name,)
)
log_change(cursor, f"Updated metadata name to {name}")
def get_name(cursor):
cursor.execute("SELECT name FROM metadata LIMIT 1")
row = cursor.fetchone()
return row['name'] if row else None
return row["name"] if row else None
def set_name(cursor, value):
cursor.execute("UPDATE metadata SET name=%s", (value,))
def set_comment(cursor, comment):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (comment) VALUES (%s)",
(comment,)
)
log_change(cursor, f"Updated metadata comment to {comment}")
def get_comment(cursor):
cursor.execute("SELECT comment FROM metadata LIMIT 1")
row = cursor.fetchone()
return row['comment'] if row else None
return row["comment"] if row else None
def set_comment(cursor, value):
cursor.execute("UPDATE metadata SET comment=%s", (value,))
def set_keys(cursor, public_key, private_key):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"""
INSERT INTO metadata (public_key, private_key)
VALUES (%s, %s)
""",
(public_key, private_key)
)
log_change(cursor, "Updated metadata keys")
def get_public_key(cursor):
cursor.execute("SELECT public_key FROM metadata LIMIT 1")
row = cursor.fetchone()
return row['public_key'] if row else None
return row["public_key"] if row else None
def get_private_key(cursor):
cursor.execute("SELECT private_key FROM metadata LIMIT 1")
row = cursor.fetchone()
return row['private_key'] if row else None
return row["private_key"] if row else None
def set_keys(cursor, public_key, private_key):
def set_defense_p(cursor, defense_p: bool):
"""Set the metadata defense_p flag.
This table is treated as a singleton row at the application level.
Current convention in this codebase is to wipe and re-insert.
"""
cursor.execute("DELETE FROM metadata")
cursor.execute(
"UPDATE metadata SET public_key=%s, private_key=%s",
(public_key, private_key)
"INSERT INTO metadata (defense_p) VALUES (%s)",
(defense_p,)
)
log_change(cursor, f"Updated metadata defense_p to {defense_p}")
def get_defense_p(cursor) -> bool:
cursor.execute("SELECT defense_p FROM metadata LIMIT 1")
row = cursor.fetchone()
if not row or row["defense_p"] is None:
return False
return bool(row["defense_p"])

View File

@ -1,29 +1,104 @@
# ca_core/property.py
from db_logging import log_change
from entity import ensure_entity_active
def _validate_validation_policy(validation_policy: str) -> str:
if validation_policy is None:
return "default"
if not isinstance(validation_policy, str):
raise TypeError("validation_policy must be a string")
vp = validation_policy.strip()
if not vp:
raise ValueError("validation_policy cannot be empty")
if len(vp) > 19:
raise ValueError("validation_policy must be at most 19 characters")
return vp
def _validate_source(source):
if source is None:
return None
if not isinstance(source, str):
raise TypeError("source must be a string or None")
s = source.strip()
if len(s) > 150:
raise ValueError("source must be at most 150 characters")
# Allow empty string -> treat as NULL for cleanliness
return s if s else None
def set_property(cursor, entity_id, property_name, validation_policy="default", source=None):
"""
Revoked entities are immutable: cannot add/update properties.
Schema: property(id, property_name, validation_policy, source)
- validation_policy: CHAR(19) NOT NULL DEFAULT 'default'
- source: VARCHAR(150) NULL
"""
ensure_entity_active(cursor, entity_id)
if not isinstance(property_name, str) or not property_name.strip():
raise ValueError("property_name must be a non-empty string")
if len(property_name) > 100:
raise ValueError("property_name must be at most 100 characters")
vp = _validate_validation_policy(validation_policy)
src = _validate_source(source)
def set_property(cursor, entity_id: int, property_name: str):
cursor.execute(
"""
INSERT INTO property (id, property_name)
VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING
INSERT INTO property (id, property_name, validation_policy, source)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id, property_name)
DO UPDATE SET
validation_policy = EXCLUDED.validation_policy,
source = EXCLUDED.source
""",
(entity_id, property_name)
(entity_id, property_name, vp, src),
)
log_change(
cursor,
f"Set property '{property_name}' for entity {entity_id} "
f"(validation_policy={vp}, source={src})",
)
def delete_property(cursor, entity_id: int, property_name: str):
cursor.execute(
"DELETE FROM property WHERE id=%s AND property_name=%s",
(entity_id, property_name)
)
if cursor.rowcount == 0:
raise ValueError("Property not found")
def get_properties(cursor, entity_id: int):
def get_properties(cursor, entity_id):
"""
Returns a list of property_name values for the entity.
"""
cursor.execute(
"SELECT property_name FROM property WHERE id = %s",
(entity_id,)
(entity_id,),
)
return [row['property_name'] for row in cursor.fetchall()]
rows = cursor.fetchall()
return [r["property_name"] for r in rows]
def get_property(cursor, entity_id, property_name):
"""
Returns a dict_row with keys: property_name, validation_policy, source
or None if not found.
"""
cursor.execute(
"""
SELECT property_name, validation_policy, source
FROM property
WHERE id = %s AND property_name = %s
""",
(entity_id, property_name),
)
return cursor.fetchone()
def delete_property(cursor, entity_id, property_name):
"""
Revoked entities are immutable: cannot delete properties.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute(
"DELETE FROM property WHERE id = %s AND property_name = %s",
(entity_id, property_name),
)
log_change(cursor, f"Deleted property '{property_name}' for entity {entity_id}")

View File

@ -1,5 +1,5 @@
-- ------------------------
-- Metadata table (singleton)
-- Metadata (singleton row; enforced at application level)
-- ------------------------
DROP TABLE IF EXISTS metadata;
@ -7,13 +7,12 @@ CREATE TABLE metadata (
name VARCHAR(50),
comment VARCHAR(200),
private_key VARCHAR(500),
public_key VARCHAR(500)
public_key VARCHAR(500),
defense_p BOOLEAN NOT NULL DEFAULT false
);
INSERT INTO metadata DEFAULT VALUES;
-- ------------------------
-- Entity table
-- Entity
-- ------------------------
DROP TABLE IF EXISTS entity CASCADE;
@ -22,21 +21,23 @@ CREATE TABLE entity (
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person', -- 'creator', 'person', 'group', 'device'
geo_offset BIGINT,
type VARCHAR(20) NOT NULL, -- person, group, device
symmetrical_key VARCHAR(100),
public_key VARCHAR(300) NOT NULL,
ca_reference VARCHAR(100),
status VARCHAR(20) NOT NULL DEFAULT 'active',
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
CONSTRAINT entity_ca_reference_check CHECK (
(type = 'group' AND ca_reference IS NOT NULL)
OR
(type <> 'group' AND ca_reference IS NULL)
)
);
-- Indexes
CREATE INDEX idx_entity_name ON entity(name);
CREATE INDEX idx_entity_expiration ON entity(expiration);
ALTER TABLE entity ADD CONSTRAINT entity_name_unique UNIQUE (name);
-- ------------------------
-- Group Member table
-- Group Member
-- ------------------------
DROP TABLE IF EXISTS group_member;
@ -47,16 +48,30 @@ CREATE TABLE group_member (
PRIMARY KEY (group_id, member_id)
);
CREATE INDEX idx_group_member_member_group ON group_member(member_id, group_id);
CREATE INDEX idx_group_member ON group_member(member_id, group_id);
-- ------------------------
-- Property table
-- Property
-- ------------------------
DROP TABLE IF EXISTS property;
CREATE TABLE property(
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
property_name VARCHAR(100) NOT NULL,
validation_policy CHAR(19) NOT NULL DEFAULT 'default',
source VARCHAR(150),
PRIMARY KEY (id, property_name)
);
-- ------------------------
-- Log Table
-- ------------------------
DROP TABLE IF EXISTS log;
CREATE TABLE log(
id SERIAL PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
entry TEXT NOT NULL
);
CREATE INDEX idx_log_ts ON log(ts);

View File

@ -1,48 +0,0 @@
drop table metadata;
create table metadata(
name varchar(50),
comment varchar(200),
private_key varchar(500),
public_key varchar(500)
);
insert into metadata default vALUES;
drop table entity cascade;
create table entity(
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name varchar(100) NOT NULL,
group_p BOOLEAN NOT NULL,
geo_offset BIGINT,
--private_key VARCHAR(300) NOT NULL,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
);
CREATE INDEX idx_entity_name ON entity(name);
drop table group_member;
create table group_member(
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id,person_id)
);
CREATE INDEX idx_group_member_person_group ON group_member(person_id, group_id);
drop table property;
create table property(
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
);

View File

@ -1,15 +1,20 @@
import unittest
from pathlib import Path
import sys
from pathlib import Path
import psycopg
# Add code folder to path
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
import entity # the rewritten entity.py module
import entity
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestEntityFunctions(unittest.TestCase):
@ -18,56 +23,62 @@ class TestEntityFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure table exists
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.conn.commit()
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
def setUp(self):
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
self.conn.rollback()
# --- Insert and read ---
def test_insert_creator_and_get(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["name"], "Creator1")
self.assertEqual(row["type"], "creator")
self.assertIsNone(row["ca_reference"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("creator entity", log_entry)
self.assertIn(str(creator_id), log_entry)
def test_enroll_person(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, person_id), "Person1")
self.assertEqual(entity.get_entity(self.cur, person_id)["type"], "person")
def test_create_group(self):
row = entity.get_entity(self.cur, person_id)
self.assertIsNone(row["ca_reference"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("enrolled person", log_entry)
self.assertIn(str(person_id), log_entry)
def test_create_group_requires_ca_reference(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, group_id), "Group1")
self.assertEqual(entity.get_entity(self.cur, group_id)["type"], "group")
# --- Revocation ---
def test_revoke_entity(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
entity.revoke_entity(self.cur, creator_id, creator_id)
with self.assertRaises(ValueError):
entity.get_entity(self.cur, creator_id)
entity.create_group(self.cur, "GroupMissingRef", "pubkey_group", creator_id, None)
def test_create_group_sets_ca_reference(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-REF-1")
if __name__ == "__main__":
unittest.main()
row = entity.get_entity(self.cur, group_id)
self.assertEqual(row["ca_reference"], "CA-REF-1")
log_entry = get_last_log(self.cur).lower()
self.assertIn("created group", log_entry)
self.assertIn(str(group_id), log_entry)
self.assertIn("ca-ref-1".lower(), log_entry)
def test_set_and_get_symmetrical_key(self):
creator_id = entity.insert_creator(self.cur, "CreatorSym", "pubkey_sym")
entity.set_symmetrical_key(self.cur, creator_id, "symkey123", creator_id)
row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["symmetrical_key"], "symkey123")
log_entry = get_last_log(self.cur).lower()
self.assertIn("symmetrical_key", log_entry)

View File

@ -1,6 +1,6 @@
import unittest
from pathlib import Path
import sys
from pathlib import Path
import psycopg
code_path = Path(__file__).parent.parent / "ca_core"
@ -11,6 +11,12 @@ import group_member
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestGroupFunctions(unittest.TestCase):
@classmethod
@ -18,78 +24,65 @@ class TestGroupFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure tables exist
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS group_member (
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id, member_id)
)
""")
cls.conn.commit()
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
def setUp(self):
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
self.conn.rollback()
# --- Group membership tests ---
def test_add_and_get_members(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id)
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device")
# Add members
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-GROUP-1")
group_member.add_group_member(self.cur, group_id, person_id, "member")
group_member.add_group_member(self.cur, group_id, device_id, "device")
members = group_member.get_members_of_group(self.cur, group_id)
member_ids = [m["member_id"] for m in members]
self.assertIn(person_id, member_ids)
self.assertIn(device_id, member_ids)
self.assertTrue(
any(m["member_id"] == person_id and m["role"] == "member"
for m in members)
)
log_entry = get_last_log(self.cur).lower()
self.assertIn("added member", log_entry)
self.assertIn(str(group_id), log_entry)
def test_nested_groups(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id)
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id)
# Add child group as member of parent
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
members = group_member.get_members_of_group(self.cur, parent_group)
self.assertEqual(members[0]["member_id"], child_group)
self.assertEqual(members[0]["role"], "subgroup")
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_parent", creator_id, "CA-PARENT")
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_child", creator_id, "CA-CHILD")
def test_revoked_member_cannot_be_added(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id)
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
entity.revoke_entity(self.cur, person_id, creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
members = group_member.get_members_of_group(self.cur, parent_group)
self.assertTrue(
any(m["member_id"] == child_group and m["role"] == "subgroup"
for m in members)
)
def test_revoked_group_cannot_accept_members(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id)
entity.revoke_entity(self.cur, group_id, creator_id)
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "RevokedGroup", "pubkey_group", creator_id, "CA-REVOKED-GROUP")
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person", creator_id)
entity.set_entity_status(self.cur, group_id, "revoked", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_revoked_member_cannot_be_added(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "ActiveGroup", "pubkey_group", creator_id, "CA-ACTIVE-GROUP")
person_id = entity.enroll_person(self.cur, "RevokedPerson", "pubkey_person", creator_id)
if __name__ == "__main__":
unittest.main()
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")

54
tests/test_metadata.py Executable file → Normal file
View File

@ -3,74 +3,64 @@ import sys
from pathlib import Path
import psycopg
# Add the code directory to Python path
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
import metadata # your metadata.py module
import metadata
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestMetadataFunctions(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Connect to the database
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure table exists and has exactly one row
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS metadata (
name VARCHAR(50),
comment VARCHAR(200),
private_key VARCHAR(500),
public_key VARCHAR(500)
)
""")
cls.cur.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cls.cur.fetchone()
if row['cnt'] == 0:
cls.cur.execute("INSERT INTO metadata DEFAULT VALUES")
cls.conn.commit()
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
def setUp(self):
# Begin transaction for each test
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
# Rollback after each test
self.conn.rollback()
# --- Test name field ---
def test_set_and_get_name(self):
metadata.set_name(self.cur, "AppName")
self.assertEqual(metadata.get_name(self.cur), "AppName")
# --- Test comment field ---
log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata name", log_entry)
self.assertIn("appname", log_entry)
def test_set_and_get_comment(self):
metadata.set_comment(self.cur, "Test comment")
self.assertEqual(metadata.get_comment(self.cur), "Test comment")
# --- Test keys ---
log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata comment", log_entry)
def test_set_and_get_keys(self):
metadata.set_keys(self.cur, "pubkey123", "privkey456")
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
# --- Test keys overwrite ---
def test_keys_overwrite(self):
metadata.set_keys(self.cur, "pub1", "priv1")
metadata.set_keys(self.cur, "pub2", "priv2")
self.assertEqual(metadata.get_public_key(self.cur), "pub2")
self.assertEqual(metadata.get_private_key(self.cur), "priv2")
log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata keys", log_entry)
if __name__ == "__main__":
unittest.main()
def test_set_and_get_defense_p(self):
metadata.set_defense_p(self.cur, True)
self.assertEqual(metadata.get_defense_p(self.cur), True)
log_entry = get_last_log(self.cur).lower()
self.assertIn("defense_p", log_entry)

View File

@ -1,8 +1,9 @@
import unittest
from pathlib import Path
import sys
from pathlib import Path
import psycopg
# Add core directory to path
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
@ -11,6 +12,13 @@ import property
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestPropertyFunctions(unittest.TestCase):
@classmethod
@ -18,68 +26,102 @@ class TestPropertyFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure entity table exists
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS property (
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
)
""")
cls.conn.commit()
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
def setUp(self):
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
self.conn.rollback()
# ------------------------------------------------------------
# Basic property set/get
# ------------------------------------------------------------
def test_set_and_get_property(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
property.set_property(self.cur, person_id, "email")
person_id = entity.enroll_person(
self.cur, "Person1", "pubkey_person", creator_id
)
property.set_property(self.cur, person_id, "prop1")
props = property.get_properties(self.cur, person_id)
self.assertIn("email", props)
self.assertIn("prop1", props)
details = property.get_property(self.cur, person_id, "prop1")
self.assertIsNotNone(details)
# CHAR(19) is space-padded in PostgreSQL; strip for comparison.
self.assertEqual(details["validation_policy"].strip(), "default")
self.assertIsNone(details["source"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop1", log_entry)
# ------------------------------------------------------------
# Delete property
# ------------------------------------------------------------
def test_delete_property(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
property.set_property(self.cur, person_id, "phone")
property.delete_property(self.cur, person_id, "phone")
person_id = entity.enroll_person(
self.cur, "Person2", "pubkey_person", creator_id
)
property.set_property(self.cur, person_id, "prop2")
property.delete_property(self.cur, person_id, "prop2")
props = property.get_properties(self.cur, person_id)
self.assertNotIn("phone", props)
self.assertNotIn("prop2", props)
log_entry = get_last_log(self.cur).lower()
self.assertIn("deleted property", log_entry)
self.assertIn("prop2", log_entry)
def test_set_property_with_policy_and_source(self):
creator_id = entity.insert_creator(self.cur, "CreatorPolicy", "pubkey_policy")
person_id = entity.enroll_person(
self.cur, "PersonPolicy", "pubkey_person", creator_id
)
property.set_property(
self.cur,
person_id,
"prop_policy",
validation_policy="strict",
source="unit-test",
)
details = property.get_property(self.cur, person_id, "prop_policy")
self.assertIsNotNone(details)
self.assertEqual(details["validation_policy"].strip(), "strict")
self.assertEqual(details["source"], "unit-test")
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop_policy", log_entry)
self.assertIn("strict", log_entry)
# ------------------------------------------------------------
# Immutability: revoked entity cannot mutate properties
# ------------------------------------------------------------
def test_revoked_entity_has_no_properties(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
property.set_property(self.cur, person_id, "address")
entity.revoke_entity(self.cur, person_id, creator_id)
props = property.get_properties(self.cur, person_id)
# Optional: you can decide whether to return empty or raise; here we return all properties regardless of status
# If you want to ignore revoked entities:
cursor = self.cur
cursor.execute(
"SELECT property_name FROM property p JOIN entity e ON e.id=p.id WHERE e.id=%s AND e.status='active'",
(person_id,)
person_id = entity.enroll_person(
self.cur, "Person3", "pubkey_person", creator_id
)
props_active = [r['property_name'] for r in cursor.fetchall()]
self.assertNotIn("address", props_active)
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
if __name__ == "__main__":
unittest.main()
with self.assertRaises(ValueError):
property.set_property(self.cur, person_id, "prop3")
with self.assertRaises(ValueError):
property.delete_property(self.cur, person_id, "prop3")