This commit is contained in:
Morten V. Christiansen 2026-02-26 16:24:26 +01:00
parent 2678737d5e
commit 7470cf7189
19 changed files with 347 additions and 360 deletions

Binary file not shown.

9
ca_core/db_logging.py Normal file
View File

@ -0,0 +1,9 @@
# db_logging.py
def log_change(cursor, message: str):
"""Insert a log entry into the log table."""
cursor.execute(
"INSERT INTO log (entry) VALUES (%s)",
(message,)
)

View File

@ -1,50 +1,49 @@
import random from db_logging import log_change
import string
# ------------------------
# Helper for ownership checks def ensure_entity_active(cursor, entity_id):
# ------------------------ """
def _verify_ownership(cursor, entity_id, requesting_creator_id): Ensure an entity exists and is active.
cursor.execute( Revoked entities are immutable.
"SELECT id, creator, type, status FROM entity WHERE id=%s", (entity_id,) """
) cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone() row = cursor.fetchone()
if not row or row["status"] != "active": if row is None:
raise ValueError("Entity not found or inactive") raise ValueError("Entity does not exist")
owner_id = row["creator"] if row["status"] != "active":
entity_type = row["type"] raise ValueError("Entity is not active")
entity_id_db = row["id"]
if entity_type == "creator":
if requesting_creator_id != entity_id_db:
raise ValueError("Creator ID does not match entity owner")
else:
if requesting_creator_id != owner_id:
raise ValueError("Creator ID does not match entity owner")
# ------------------------
# Insertions
# ------------------------
def insert_creator(cursor, name, public_key): def insert_creator(cursor, name, public_key):
"""
Creators are persons with property 'creator' in the property table.
"""
cursor.execute( cursor.execute(
""" """
INSERT INTO entity (name, type, public_key, creator, status) INSERT INTO entity (name, type, public_key, status)
VALUES (%s, 'creator', %s, NULL, 'active') VALUES (%s, 'person', %s, 'active')
RETURNING id RETURNING id
""", """,
(name, public_key) (name, public_key),
) )
return cursor.fetchone()["id"] creator_id = cursor.fetchone()["id"]
# Mark as creator via property table (schema: property(id, property_name))
cursor.execute(
"""
INSERT INTO property (id, property_name)
VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING
""",
(creator_id, "creator"),
)
log_change(cursor, f"Created creator entity {creator_id} with name {name}")
return creator_id
def enroll_person(cursor, name, public_key, creator_id): def enroll_person(cursor, name, public_key, creator_id):
cursor.execute( ensure_entity_active(cursor, creator_id)
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
row = cursor.fetchone()
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
cursor.execute( cursor.execute(
""" """
@ -52,18 +51,16 @@ def enroll_person(cursor, name, public_key, creator_id):
VALUES (%s, 'person', %s, %s, 'active') VALUES (%s, 'person', %s, %s, 'active')
RETURNING id RETURNING id
""", """,
(name, public_key, creator_id) (name, public_key, creator_id),
) )
return cursor.fetchone()["id"] person_id = cursor.fetchone()["id"]
log_change(cursor, f"Enrolled person {person_id} under creator {creator_id}")
return person_id
def create_group(cursor, name, public_key, creator_id): def create_group(cursor, name, public_key, creator_id):
cursor.execute( ensure_entity_active(cursor, creator_id)
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
row = cursor.fetchone()
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
cursor.execute( cursor.execute(
""" """
@ -71,99 +68,67 @@ def create_group(cursor, name, public_key, creator_id):
VALUES (%s, 'group', %s, %s, 'active') VALUES (%s, 'group', %s, %s, 'active')
RETURNING id RETURNING id
""", """,
(name, public_key, creator_id) (name, public_key, creator_id),
) )
return cursor.fetchone()["id"] group_id = cursor.fetchone()["id"]
log_change(cursor, f"Created group {group_id} under creator {creator_id}")
return group_id
def create_alias(cursor, person_id): def create_alias(cursor, target_entity_id):
cursor.execute( ensure_entity_active(cursor, target_entity_id)
"SELECT id, type, public_key, status FROM entity WHERE id=%s", (person_id,)
)
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Person not found or inactive")
if row["type"] != "person":
raise ValueError("Only persons can create aliases")
random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
cursor.execute( cursor.execute(
""" """
INSERT INTO entity (name, type, public_key, creator, status) INSERT INTO entity (name, type, creator, status)
VALUES (%s, 'person', %s, %s, 'active') VALUES (%s, 'alias', %s, 'active')
RETURNING id RETURNING id
""", """,
(random_name, row["public_key"], person_id) (f"alias_for_{target_entity_id}", target_entity_id),
) )
return cursor.fetchone()["id"] alias_id = cursor.fetchone()["id"]
log_change(cursor, f"Created alias {alias_id} for entity {target_entity_id}")
return alias_id
# ------------------------
# Soft-delete / revocation
# ------------------------
def revoke_entity(cursor, entity_id, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET status=%s WHERE id=%s", ("revoked", entity_id)
)
# ------------------------
# Getters / Setters
# ------------------------
def get_entity(cursor, entity_id): def get_entity(cursor, entity_id):
cursor.execute("SELECT * FROM entity WHERE id = %s", (entity_id,))
return cursor.fetchone()
def set_entity_status(cursor, entity_id, status, changed_by):
"""
Only active entities can change status. Once revoked, immutable.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute("UPDATE entity SET status = %s WHERE id = %s", (status, entity_id))
log_change(cursor, f"Set status of entity {entity_id} to {status} by {changed_by}")
def set_symmetrical_key(cursor, entity_id, key_value, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute( cursor.execute(
"SELECT * FROM entity WHERE id=%s AND status='active'", (entity_id,) "UPDATE entity SET symmetrical_key = %s WHERE id = %s",
(key_value, entity_id),
) )
log_change(cursor, f"Set symmetrical_key for entity {entity_id} by {changed_by}")
def get_symmetrical_key(cursor, entity_id):
cursor.execute("SELECT symmetrical_key FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone() row = cursor.fetchone()
if not row: return row["symmetrical_key"] if row else None
raise ValueError("Entity not found or inactive")
return row
def get_entity_id(cursor, name): def set_entity_keys(cursor, entity_id, public_key, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute( cursor.execute(
"SELECT id FROM entity WHERE name=%s AND status='active'", (name,) "UPDATE entity SET public_key = %s WHERE id = %s",
(public_key, entity_id),
) )
row = cursor.fetchone() log_change(cursor, f"Updated public key for entity {entity_id} by {changed_by}")
if not row:
raise ValueError("Entity not found or inactive")
return row["id"]
def get_entity_public_key(cursor, entity_id):
cursor.execute(
"SELECT public_key FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["public_key"]
def get_entity_name(cursor, entity_id):
cursor.execute(
"SELECT name FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["name"]
def set_entity_name(cursor, entity_id, new_name, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id))
def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id)
)
def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id):
set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id)

View File

@ -1,42 +1,32 @@
# ca_core/group_member.py from db_logging import log_change
from entity import ensure_entity_active
def add_group_member(cursor, group_id: int, member_id: int, role: str):
# Verify group exists and is active def add_group_member(cursor, group_id, member_id, role):
cursor.execute("SELECT type, status FROM entity WHERE id=%s", (group_id,)) ensure_entity_active(cursor, group_id)
row = cursor.fetchone() ensure_entity_active(cursor, member_id)
if not row or row["status"] != "active" or row["type"] != "group":
raise ValueError("Invalid or inactive group")
# Verify member exists and is active
cursor.execute("SELECT status FROM entity WHERE id=%s", (member_id,))
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Invalid or inactive member")
cursor.execute( cursor.execute(
"INSERT INTO group_member (group_id, member_id, role) VALUES (%s, %s, %s)", """
INSERT INTO group_member (group_id, member_id, role)
VALUES (%s, %s, %s)
""",
(group_id, member_id, role) (group_id, member_id, role)
) )
log_change(
def remove_group_member(cursor, group_id: int, member_id: int): cursor,
cursor.execute( f"Added member {member_id} to group {group_id} as {role}"
"DELETE FROM group_member WHERE group_id=%s AND member_id=%s",
(group_id, member_id)
) )
def get_groups_for_member(cursor, member_id: int): def get_members_of_group(cursor, group_id):
cursor.execute( cursor.execute(
"SELECT group_id, role FROM group_member WHERE member_id=%s", """
(member_id,) SELECT member_id, role
) FROM group_member
return cursor.fetchall() WHERE group_id = %s
""",
def get_members_of_group(cursor, group_id: int):
cursor.execute(
"SELECT member_id, role FROM group_member WHERE group_id=%s",
(group_id,) (group_id,)
) )
return cursor.fetchall() return cursor.fetchall()

View File

@ -1,40 +1,56 @@
# ca_core/metadata.py from db_logging import log_change
def set_name(cursor, name):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (name) VALUES (%s)",
(name,)
)
log_change(cursor, f"Updated metadata name to {name}")
def get_name(cursor): def get_name(cursor):
cursor.execute("SELECT name FROM metadata LIMIT 1") cursor.execute("SELECT name FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row['name'] if row else None return row["name"] if row else None
def set_name(cursor, value): def set_comment(cursor, comment):
cursor.execute("UPDATE metadata SET name=%s", (value,)) cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (comment) VALUES (%s)",
(comment,)
)
log_change(cursor, f"Updated metadata comment to {comment}")
def get_comment(cursor): def get_comment(cursor):
cursor.execute("SELECT comment FROM metadata LIMIT 1") cursor.execute("SELECT comment FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row['comment'] if row else None return row["comment"] if row else None
def set_comment(cursor, value): def set_keys(cursor, public_key, private_key):
cursor.execute("UPDATE metadata SET comment=%s", (value,)) cursor.execute("DELETE FROM metadata")
cursor.execute(
"""
INSERT INTO metadata (public_key, private_key)
VALUES (%s, %s)
""",
(public_key, private_key)
)
log_change(cursor, "Updated metadata keys")
def get_public_key(cursor): def get_public_key(cursor):
cursor.execute("SELECT public_key FROM metadata LIMIT 1") cursor.execute("SELECT public_key FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row['public_key'] if row else None return row["public_key"] if row else None
def get_private_key(cursor): def get_private_key(cursor):
cursor.execute("SELECT private_key FROM metadata LIMIT 1") cursor.execute("SELECT private_key FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row['private_key'] if row else None return row["private_key"] if row else None
def set_keys(cursor, public_key, private_key):
cursor.execute(
"UPDATE metadata SET public_key=%s, private_key=%s",
(public_key, private_key)
)

View File

@ -1,29 +1,42 @@
# ca_core/property.py from db_logging import log_change
from entity import ensure_entity_active
def set_property(cursor, entity_id, property_name):
"""
Revoked entities are immutable: cannot add properties.
Schema: property(id, property_name)
"""
ensure_entity_active(cursor, entity_id)
def set_property(cursor, entity_id: int, property_name: str):
cursor.execute( cursor.execute(
""" """
INSERT INTO property (id, property_name) INSERT INTO property (id, property_name)
VALUES (%s, %s) VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING ON CONFLICT (id, property_name) DO NOTHING
""", """,
(entity_id, property_name) (entity_id, property_name),
) )
log_change(cursor, f"Set property '{property_name}' for entity {entity_id}")
def delete_property(cursor, entity_id: int, property_name: str): def get_properties(cursor, entity_id):
cursor.execute( cursor.execute(
"DELETE FROM property WHERE id=%s AND property_name=%s", "SELECT property_name FROM property WHERE id = %s",
(entity_id, property_name) (entity_id,),
) )
if cursor.rowcount == 0: rows = cursor.fetchall()
raise ValueError("Property not found") return [r["property_name"] for r in rows]
def get_properties(cursor, entity_id: int): def delete_property(cursor, entity_id, property_name):
"""
Revoked entities are immutable: cannot delete properties.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute( cursor.execute(
"SELECT property_name FROM property WHERE id=%s", "DELETE FROM property WHERE id = %s AND property_name = %s",
(entity_id,) (entity_id, property_name),
) )
return [row['property_name'] for row in cursor.fetchall()] log_change(cursor, f"Deleted property '{property_name}' for entity {entity_id}")

View File

@ -1,62 +1,69 @@
-- ------------------------ -- ------------------------
-- Metadata table (singleton) -- Metadata
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS metadata; DROP TABLE IF EXISTS metadata;
CREATE TABLE metadata ( CREATE TABLE metadata(
name VARCHAR(50), name VARCHAR(50),
comment VARCHAR(200), comment VARCHAR(200),
private_key VARCHAR(500), private_key VARCHAR(500),
public_key VARCHAR(500) public_key VARCHAR(500)
); );
INSERT INTO metadata DEFAULT VALUES;
-- ------------------------ -- ------------------------
-- Entity table -- Entity
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS entity CASCADE; DROP TABLE IF EXISTS entity CASCADE;
CREATE TABLE entity ( CREATE TABLE entity(
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id), creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL, name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person', -- 'creator', 'person', 'group', 'device' type VARCHAR(20) NOT NULL, -- person, group, device
geo_offset BIGINT, symmetrical_key VARCHAR(100),
public_key VARCHAR(300) NOT NULL, public_key VARCHAR(300) NOT NULL,
expiration DATE, status VARCHAR(20) NOT NULL DEFAULT 'active',
status VARCHAR(10) NOT NULL DEFAULT 'active' expiration DATE
); );
-- Indexes
CREATE INDEX idx_entity_name ON entity(name); CREATE INDEX idx_entity_name ON entity(name);
CREATE INDEX idx_entity_expiration ON entity(expiration);
ALTER TABLE entity ADD CONSTRAINT entity_name_unique UNIQUE (name);
-- ------------------------ -- ------------------------
-- Group Member table -- Group Member
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS group_member; DROP TABLE IF EXISTS group_member;
CREATE TABLE group_member ( CREATE TABLE group_member(
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10), role VARCHAR(10),
PRIMARY KEY (group_id, member_id) PRIMARY KEY (group_id, member_id)
); );
CREATE INDEX idx_group_member_member_group ON group_member(member_id, group_id); CREATE INDEX idx_group_member ON group_member(member_id, group_id);
-- ------------------------ -- ------------------------
-- Property table -- Property
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS property; DROP TABLE IF EXISTS property;
CREATE TABLE property ( CREATE TABLE property(
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100), property_name VARCHAR(100),
PRIMARY KEY (id, property_name) PRIMARY KEY (id, property_name)
); );
-- ------------------------
-- Log Table
-- ------------------------
DROP TABLE IF EXISTS log;
CREATE TABLE log(
id SERIAL PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
entry TEXT NOT NULL
);
CREATE INDEX idx_log_ts ON log(ts);

View File

@ -1,15 +1,20 @@
import unittest import unittest
from pathlib import Path
import sys import sys
from pathlib import Path
import psycopg import psycopg
# Add code folder to path
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path)) sys.path.insert(0, str(code_path))
import entity # the rewritten entity.py module
import entity
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestEntityFunctions(unittest.TestCase): class TestEntityFunctions(unittest.TestCase):
@ -18,56 +23,49 @@ class TestEntityFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure table exists @classmethod
cls.cur.execute(""" def tearDownClass(cls):
CREATE TABLE IF NOT EXISTS entity ( cls.cur.close()
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, cls.conn.close()
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.conn.commit()
def setUp(self): def setUp(self):
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
self.conn.rollback() self.conn.rollback()
# --- Insert and read ---
def test_insert_creator_and_get(self): def test_insert_creator_and_get(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
row = entity.get_entity(self.cur, creator_id) row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["name"], "Creator1") self.assertEqual(row["name"], "Creator1")
self.assertEqual(row["type"], "creator")
log_entry = get_last_log(self.cur).lower()
self.assertIn("creator entity", log_entry)
self.assertIn(str(creator_id), log_entry)
def test_enroll_person(self): def test_enroll_person(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, person_id), "Person1")
self.assertEqual(entity.get_entity(self.cur, person_id)["type"], "person") log_entry = get_last_log(self.cur).lower()
self.assertIn("enrolled person", log_entry)
self.assertIn(str(person_id), log_entry)
def test_create_group(self): def test_create_group(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id) group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, group_id), "Group1")
self.assertEqual(entity.get_entity(self.cur, group_id)["type"], "group")
# --- Revocation --- log_entry = get_last_log(self.cur).lower()
def test_revoke_entity(self): self.assertIn("created group", log_entry)
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4") self.assertIn(str(group_id), log_entry)
entity.revoke_entity(self.cur, creator_id, creator_id)
with self.assertRaises(ValueError):
entity.get_entity(self.cur, creator_id)
def test_set_and_get_symmetrical_key(self):
creator_id = entity.insert_creator(self.cur, "CreatorSym", "pubkey_sym")
entity.set_symmetrical_key(self.cur, creator_id, "symkey123", creator_id)
if __name__ == "__main__": row = entity.get_entity(self.cur, creator_id)
unittest.main() self.assertEqual(row["symmetrical_key"], "symkey123")
log_entry = get_last_log(self.cur).lower()
self.assertIn("symmetrical_key", log_entry)

View File

@ -1,6 +1,6 @@
import unittest import unittest
from pathlib import Path
import sys import sys
from pathlib import Path
import psycopg import psycopg
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
@ -11,6 +11,12 @@ import group_member
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestGroupFunctions(unittest.TestCase): class TestGroupFunctions(unittest.TestCase):
@classmethod @classmethod
@ -18,78 +24,66 @@ class TestGroupFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure tables exist @classmethod
cls.cur.execute(""" def tearDownClass(cls):
CREATE TABLE IF NOT EXISTS entity ( cls.cur.close()
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, cls.conn.close()
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS group_member (
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id, member_id)
)
""")
cls.conn.commit()
def setUp(self): def setUp(self):
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
self.conn.rollback() self.conn.rollback()
# --- Group membership tests ---
def test_add_and_get_members(self): def test_add_and_get_members(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id)
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device") group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id)
# Add members
group_member.add_group_member(self.cur, group_id, person_id, "member") group_member.add_group_member(self.cur, group_id, person_id, "member")
group_member.add_group_member(self.cur, group_id, device_id, "device")
members = group_member.get_members_of_group(self.cur, group_id) members = group_member.get_members_of_group(self.cur, group_id)
member_ids = [m["member_id"] for m in members]
self.assertIn(person_id, member_ids) self.assertTrue(
self.assertIn(device_id, member_ids) any(m["member_id"] == person_id and m["role"] == "member"
for m in members)
)
log_entry = get_last_log(self.cur).lower()
self.assertIn("added member", log_entry)
self.assertIn(str(group_id), log_entry)
def test_nested_groups(self): def test_nested_groups(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id) parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_parent", creator_id)
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id) child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_child", creator_id)
# Add child group as member of parent
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
members = group_member.get_members_of_group(self.cur, parent_group)
self.assertEqual(members[0]["member_id"], child_group)
self.assertEqual(members[0]["role"], "subgroup")
def test_revoked_member_cannot_be_added(self): group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id) members = group_member.get_members_of_group(self.cur, parent_group)
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
entity.revoke_entity(self.cur, person_id, creator_id) self.assertTrue(
with self.assertRaises(ValueError): any(m["member_id"] == child_group and m["role"] == "subgroup"
group_member.add_group_member(self.cur, group_id, person_id, "member") for m in members)
)
def test_revoked_group_cannot_accept_members(self): def test_revoked_group_cannot_accept_members(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4") creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id) group_id = entity.create_group(self.cur, "RevokedGroup", "pubkey_group", creator_id)
entity.revoke_entity(self.cur, group_id, creator_id) person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person", creator_id)
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
entity.set_entity_status(self.cur, group_id, "revoked", creator_id)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member") group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_revoked_member_cannot_be_added(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "ActiveGroup", "pubkey_group", creator_id)
person_id = entity.enroll_person(self.cur, "RevokedPerson", "pubkey_person", creator_id)
if __name__ == "__main__": entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
unittest.main()
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")

50
tests/test_metadata.py Executable file → Normal file
View File

@ -3,74 +3,58 @@ import sys
from pathlib import Path from pathlib import Path
import psycopg import psycopg
# Add the code directory to Python path
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path)) sys.path.insert(0, str(code_path))
import metadata # your metadata.py module import metadata
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestMetadataFunctions(unittest.TestCase): class TestMetadataFunctions(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Connect to the database
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure table exists and has exactly one row
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS metadata (
name VARCHAR(50),
comment VARCHAR(200),
private_key VARCHAR(500),
public_key VARCHAR(500)
)
""")
cls.cur.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cls.cur.fetchone()
if row['cnt'] == 0:
cls.cur.execute("INSERT INTO metadata DEFAULT VALUES")
cls.conn.commit()
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.cur.close() cls.cur.close()
cls.conn.close() cls.conn.close()
def setUp(self): def setUp(self):
# Begin transaction for each test
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
# Rollback after each test
self.conn.rollback() self.conn.rollback()
# --- Test name field ---
def test_set_and_get_name(self): def test_set_and_get_name(self):
metadata.set_name(self.cur, "AppName") metadata.set_name(self.cur, "AppName")
self.assertEqual(metadata.get_name(self.cur), "AppName") self.assertEqual(metadata.get_name(self.cur), "AppName")
# --- Test comment field --- log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata name", log_entry)
self.assertIn("appname", log_entry)
def test_set_and_get_comment(self): def test_set_and_get_comment(self):
metadata.set_comment(self.cur, "Test comment") metadata.set_comment(self.cur, "Test comment")
self.assertEqual(metadata.get_comment(self.cur), "Test comment") self.assertEqual(metadata.get_comment(self.cur), "Test comment")
# --- Test keys --- log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata comment", log_entry)
def test_set_and_get_keys(self): def test_set_and_get_keys(self):
metadata.set_keys(self.cur, "pubkey123", "privkey456") metadata.set_keys(self.cur, "pubkey123", "privkey456")
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123") self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
self.assertEqual(metadata.get_private_key(self.cur), "privkey456") self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
# --- Test keys overwrite --- log_entry = get_last_log(self.cur).lower()
def test_keys_overwrite(self): self.assertIn("metadata keys", log_entry)
metadata.set_keys(self.cur, "pub1", "priv1")
metadata.set_keys(self.cur, "pub2", "priv2")
self.assertEqual(metadata.get_public_key(self.cur), "pub2")
self.assertEqual(metadata.get_private_key(self.cur), "priv2")
if __name__ == "__main__":
unittest.main()

View File

@ -1,8 +1,9 @@
import unittest import unittest
from pathlib import Path
import sys import sys
from pathlib import Path
import psycopg import psycopg
# Add core directory to path
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path)) sys.path.insert(0, str(code_path))
@ -11,6 +12,13 @@ import property
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestPropertyFunctions(unittest.TestCase): class TestPropertyFunctions(unittest.TestCase):
@classmethod @classmethod
@ -18,68 +26,71 @@ class TestPropertyFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure entity table exists @classmethod
cls.cur.execute(""" def tearDownClass(cls):
CREATE TABLE IF NOT EXISTS entity ( cls.cur.close()
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, cls.conn.close()
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS property (
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
)
""")
cls.conn.commit()
def setUp(self): def setUp(self):
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
self.conn.rollback() self.conn.rollback()
# ------------------------------------------------------------
# Basic property set/get
# ------------------------------------------------------------
def test_set_and_get_property(self): def test_set_and_get_property(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) person_id = entity.enroll_person(
property.set_property(self.cur, person_id, "email") self.cur, "Person1", "pubkey_person", creator_id
)
property.set_property(self.cur, person_id, "prop1")
props = property.get_properties(self.cur, person_id) props = property.get_properties(self.cur, person_id)
self.assertIn("email", props) self.assertIn("prop1", props)
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop1", log_entry)
# ------------------------------------------------------------
# Delete property
# ------------------------------------------------------------
def test_delete_property(self): def test_delete_property(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id) person_id = entity.enroll_person(
property.set_property(self.cur, person_id, "phone") self.cur, "Person2", "pubkey_person", creator_id
property.delete_property(self.cur, person_id, "phone") )
property.set_property(self.cur, person_id, "prop2")
property.delete_property(self.cur, person_id, "prop2")
props = property.get_properties(self.cur, person_id) props = property.get_properties(self.cur, person_id)
self.assertNotIn("phone", props) self.assertNotIn("prop2", props)
log_entry = get_last_log(self.cur).lower()
self.assertIn("deleted property", log_entry)
self.assertIn("prop2", log_entry)
# ------------------------------------------------------------
# Immutability: revoked entity cannot mutate properties
# ------------------------------------------------------------
def test_revoked_entity_has_no_properties(self): def test_revoked_entity_has_no_properties(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id) person_id = entity.enroll_person(
property.set_property(self.cur, person_id, "address") self.cur, "Person3", "pubkey_person", creator_id
entity.revoke_entity(self.cur, person_id, creator_id)
props = property.get_properties(self.cur, person_id)
# Optional: you can decide whether to return empty or raise; here we return all properties regardless of status
# If you want to ignore revoked entities:
cursor = self.cur
cursor.execute(
"SELECT property_name FROM property p JOIN entity e ON e.id=p.id WHERE e.id=%s AND e.status='active'",
(person_id,)
) )
props_active = [r['property_name'] for r in cursor.fetchall()]
self.assertNotIn("address", props_active)
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
if __name__ == "__main__": with self.assertRaises(ValueError):
unittest.main() property.set_property(self.cur, person_id, "prop3")
with self.assertRaises(ValueError):
property.delete_property(self.cur, person_id, "prop3")