diff --git a/ca_core/__pycache__/entity.cpython-313.pyc b/ca_core/__pycache__/entity.cpython-313.pyc index ae62474..c3ad988 100644 Binary files a/ca_core/__pycache__/entity.cpython-313.pyc and b/ca_core/__pycache__/entity.cpython-313.pyc differ diff --git a/ca_core/__pycache__/group_member.cpython-313.pyc b/ca_core/__pycache__/group_member.cpython-313.pyc index 4d44604..94f4adb 100644 Binary files a/ca_core/__pycache__/group_member.cpython-313.pyc and b/ca_core/__pycache__/group_member.cpython-313.pyc differ diff --git a/ca_core/__pycache__/metadata.cpython-313.pyc b/ca_core/__pycache__/metadata.cpython-313.pyc index 4cf6b62..eebe600 100644 Binary files a/ca_core/__pycache__/metadata.cpython-313.pyc and b/ca_core/__pycache__/metadata.cpython-313.pyc differ diff --git a/ca_core/__pycache__/property.cpython-313.pyc b/ca_core/__pycache__/property.cpython-313.pyc index 872b54a..e24e311 100644 Binary files a/ca_core/__pycache__/property.cpython-313.pyc and b/ca_core/__pycache__/property.cpython-313.pyc differ diff --git a/ca_core/entity.py b/ca_core/entity.py index 0b26849..070a7d1 100644 --- a/ca_core/entity.py +++ b/ca_core/entity.py @@ -2,22 +2,23 @@ import random import string # ------------------------ -# Helper for ownership checks (Option 2) +# Helper for ownership checks # ------------------------ def _verify_ownership(cursor, entity_id, requesting_creator_id): - cursor.execute("SELECT id, creator FROM entity WHERE id=%s", (entity_id,)) + cursor.execute( + "SELECT id, creator, type, status FROM entity WHERE id=%s", (entity_id,) + ) row = cursor.fetchone() - if not row: - raise ValueError("Entity not found") + if not row or row["status"] != "active": + raise ValueError("Entity not found or inactive") owner_id = row["creator"] + entity_type = row["type"] entity_id_db = row["id"] - if owner_id is None: - # Entity is a creator → allow only self + if entity_type == "creator": if requesting_creator_id != entity_id_db: raise ValueError("Creator ID does not match entity owner") else: - # Entity is a person/group → allow original creator if requesting_creator_id != owner_id: raise ValueError("Creator ID does not match entity owner") @@ -28,8 +29,8 @@ def _verify_ownership(cursor, entity_id, requesting_creator_id): def insert_creator(cursor, name, public_key): cursor.execute( """ - INSERT INTO entity (name, group_p, public_key, creator) - VALUES (%s, FALSE, %s, NULL) + INSERT INTO entity (name, type, public_key, creator, status) + VALUES (%s, 'creator', %s, NULL, 'active') RETURNING id """, (name, public_key) @@ -38,14 +39,17 @@ def insert_creator(cursor, name, public_key): def enroll_person(cursor, name, public_key, creator_id): - cursor.execute("SELECT creator FROM entity WHERE id=%s", (creator_id,)) + cursor.execute( + "SELECT type, status FROM entity WHERE id=%s", (creator_id,) + ) row = cursor.fetchone() - if not row or row["creator"] is not None: - raise ValueError("Provided creator_id does not correspond to a creator") + if not row or row["type"] != "creator" or row["status"] != "active": + raise ValueError("Provided creator_id does not correspond to a valid active creator") + cursor.execute( """ - INSERT INTO entity (name, group_p, public_key, creator) - VALUES (%s, FALSE, %s, %s) + INSERT INTO entity (name, type, public_key, creator, status) + VALUES (%s, 'person', %s, %s, 'active') RETURNING id """, (name, public_key, creator_id) @@ -54,14 +58,17 @@ def enroll_person(cursor, name, public_key, creator_id): def create_group(cursor, name, public_key, creator_id): - cursor.execute("SELECT creator FROM entity WHERE id=%s", (creator_id,)) + cursor.execute( + "SELECT type, status FROM entity WHERE id=%s", (creator_id,) + ) row = cursor.fetchone() - if not row or row["creator"] is not None: - raise ValueError("Provided creator_id does not correspond to a creator") + if not row or row["type"] != "creator" or row["status"] != "active": + raise ValueError("Provided creator_id does not correspond to a valid active creator") + cursor.execute( """ - INSERT INTO entity (name, group_p, public_key, creator) - VALUES (%s, TRUE, %s, %s) + INSERT INTO entity (name, type, public_key, creator, status) + VALUES (%s, 'group', %s, %s, 'active') RETURNING id """, (name, public_key, creator_id) @@ -70,17 +77,21 @@ def create_group(cursor, name, public_key, creator_id): def create_alias(cursor, person_id): - cursor.execute("SELECT id, group_p, public_key FROM entity WHERE id=%s", (person_id,)) + cursor.execute( + "SELECT id, type, public_key, status FROM entity WHERE id=%s", (person_id,) + ) row = cursor.fetchone() - if not row: - raise ValueError("Person not found") - if row["group_p"]: - raise ValueError("Groups cannot create aliases") - random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + if not row or row["status"] != "active": + raise ValueError("Person not found or inactive") + if row["type"] != "person": + raise ValueError("Only persons can create aliases") + + random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8)) + cursor.execute( """ - INSERT INTO entity (name, group_p, public_key, creator) - VALUES (%s, FALSE, %s, %s) + INSERT INTO entity (name, type, public_key, creator, status) + VALUES (%s, 'person', %s, %s, 'active') RETURNING id """, (random_name, row["public_key"], person_id) @@ -89,71 +100,58 @@ def create_alias(cursor, person_id): # ------------------------ -# Deletion +# Soft-delete / revocation # ------------------------ -def delete_person(cursor, person_id, requesting_creator_id): - _verify_ownership(cursor, person_id, requesting_creator_id) - cursor.execute("DELETE FROM entity WHERE id=%s AND group_p=FALSE", (person_id,)) - if cursor.rowcount == 0: - raise ValueError("Person not found or already deleted") - - -def delete_group(cursor, group_id, requesting_creator_id): - _verify_ownership(cursor, group_id, requesting_creator_id) - cursor.execute("DELETE FROM entity WHERE id=%s AND group_p=TRUE", (group_id,)) - if cursor.rowcount == 0: - raise ValueError("Group not found or already deleted") - - -def delete_creator(cursor, creator_id, requesting_creator_id): - _verify_ownership(cursor, creator_id, requesting_creator_id) - # check if creator has created other entities - cursor.execute("SELECT COUNT(*) as cnt FROM entity WHERE creator=%s", (creator_id,)) - if cursor.fetchone()["cnt"] > 0: - raise ValueError("Creator cannot be deleted because it has created other entities") - cursor.execute("DELETE FROM entity WHERE id=%s AND creator IS NULL", (creator_id,)) - if cursor.rowcount == 0: - raise ValueError("Creator not found or already deleted") +def revoke_entity(cursor, entity_id, requesting_creator_id): + _verify_ownership(cursor, entity_id, requesting_creator_id) + cursor.execute( + "UPDATE entity SET status=%s WHERE id=%s", ("revoked", entity_id) + ) # ------------------------ -# Getters +# Getters / Setters # ------------------------ def get_entity(cursor, entity_id): - cursor.execute("SELECT * FROM entity WHERE id=%s", (entity_id,)) + cursor.execute( + "SELECT * FROM entity WHERE id=%s AND status='active'", (entity_id,) + ) row = cursor.fetchone() if not row: - raise ValueError("Entity not found") + raise ValueError("Entity not found or inactive") return row def get_entity_id(cursor, name): - cursor.execute("SELECT id FROM entity WHERE name=%s", (name,)) + cursor.execute( + "SELECT id FROM entity WHERE name=%s AND status='active'", (name,) + ) row = cursor.fetchone() if not row: - raise ValueError("Entity not found") + raise ValueError("Entity not found or inactive") return row["id"] def get_entity_public_key(cursor, entity_id): - cursor.execute("SELECT public_key FROM entity WHERE id=%s", (entity_id,)) + cursor.execute( + "SELECT public_key FROM entity WHERE id=%s AND status='active'", (entity_id,) + ) row = cursor.fetchone() if not row: - raise ValueError("Entity not found") + raise ValueError("Entity not found or inactive") return row["public_key"] def get_entity_name(cursor, entity_id): - cursor.execute("SELECT name FROM entity WHERE id=%s", (entity_id,)) + cursor.execute( + "SELECT name FROM entity WHERE id=%s AND status='active'", (entity_id,) + ) row = cursor.fetchone() if not row: - raise ValueError("Entity not found") + raise ValueError("Entity not found or inactive") return row["name"] -# ------------------------ -# Setters -# ------------------------ def set_entity_name(cursor, entity_id, new_name, requesting_creator_id): _verify_ownership(cursor, entity_id, requesting_creator_id) cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id)) @@ -161,10 +159,11 @@ def set_entity_name(cursor, entity_id, new_name, requesting_creator_id): def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id): _verify_ownership(cursor, entity_id, requesting_creator_id) - cursor.execute("UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id)) + cursor.execute( + "UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id) + ) def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id): - # only public_key for current schema set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id) diff --git a/ca_core/group_member.py b/ca_core/group_member.py index 36216db..4ab3142 100644 --- a/ca_core/group_member.py +++ b/ca_core/group_member.py @@ -1,27 +1,42 @@ # ca_core/group_member.py -def add_group_member(cursor, group_id: int, person_id: int, role: str): +def add_group_member(cursor, group_id: int, member_id: int, role: str): + # Verify group exists and is active + cursor.execute("SELECT type, status FROM entity WHERE id=%s", (group_id,)) + row = cursor.fetchone() + if not row or row["status"] != "active" or row["type"] != "group": + raise ValueError("Invalid or inactive group") + + # Verify member exists and is active + cursor.execute("SELECT status FROM entity WHERE id=%s", (member_id,)) + row = cursor.fetchone() + if not row or row["status"] != "active": + raise ValueError("Invalid or inactive member") + cursor.execute( - "INSERT INTO group_member (group_id, person_id, role) VALUES (%s, %s, %s)", - (group_id, person_id, role) + "INSERT INTO group_member (group_id, member_id, role) VALUES (%s, %s, %s)", + (group_id, member_id, role) ) -def remove_group_member(cursor, group_id: int, person_id: int): + +def remove_group_member(cursor, group_id: int, member_id: int): cursor.execute( - "DELETE FROM group_member WHERE group_id = %s AND person_id = %s", - (group_id, person_id) + "DELETE FROM group_member WHERE group_id=%s AND member_id=%s", + (group_id, member_id) ) -def get_groups_for_person(cursor, person_id: int): + +def get_groups_for_member(cursor, member_id: int): cursor.execute( - "SELECT group_id, role FROM group_member WHERE person_id = %s", - (person_id,) + "SELECT group_id, role FROM group_member WHERE member_id=%s", + (member_id,) ) return cursor.fetchall() + def get_members_of_group(cursor, group_id: int): cursor.execute( - "SELECT person_id, role FROM group_member WHERE group_id = %s", + "SELECT member_id, role FROM group_member WHERE group_id=%s", (group_id,) ) return cursor.fetchall() diff --git a/ca_core/metadata.py b/ca_core/metadata.py index dc1cdb3..0413485 100644 --- a/ca_core/metadata.py +++ b/ca_core/metadata.py @@ -1,35 +1,40 @@ +# ca_core/metadata.py + def get_name(cursor): cursor.execute("SELECT name FROM metadata LIMIT 1") row = cursor.fetchone() return row['name'] if row else None + def set_name(cursor, value): - cursor.execute("UPDATE metadata SET name = %s", (value,)) + cursor.execute("UPDATE metadata SET name=%s", (value,)) + def get_comment(cursor): cursor.execute("SELECT comment FROM metadata LIMIT 1") row = cursor.fetchone() return row['comment'] if row else None + def set_comment(cursor, value): - cursor.execute("UPDATE metadata SET comment = %s", (value,)) + cursor.execute("UPDATE metadata SET comment=%s", (value,)) + def get_public_key(cursor): cursor.execute("SELECT public_key FROM metadata LIMIT 1") row = cursor.fetchone() return row['public_key'] if row else None + def get_private_key(cursor): cursor.execute("SELECT private_key FROM metadata LIMIT 1") row = cursor.fetchone() return row['private_key'] if row else None -def set_keys(cursor, public_key, private_key): - """ - Sets both public and private keys together - """ - cursor.execute(""" - UPDATE metadata - SET public_key = %s, private_key = %s - """, (public_key, private_key)) + +def set_keys(cursor, public_key, private_key): + cursor.execute( + "UPDATE metadata SET public_key=%s, private_key=%s", + (public_key, private_key) + ) diff --git a/ca_core/property.py b/ca_core/property.py index ff30d4d..eaacf42 100644 --- a/ca_core/property.py +++ b/ca_core/property.py @@ -1,18 +1,17 @@ +# ca_core/property.py + def set_property(cursor, entity_id: int, property_name: str): - """ - Adds a property for the entity. If it already exists, does nothing. - """ - cursor.execute(""" + cursor.execute( + """ INSERT INTO property (id, property_name) VALUES (%s, %s) ON CONFLICT (id, property_name) DO NOTHING - """, (entity_id, property_name)) + """, + (entity_id, property_name) + ) -def delete_property(cursor, entity_id, property_name): - """ - Remove a property from an entity. - Raises ValueError if the property does not exist. - """ + +def delete_property(cursor, entity_id: int, property_name: str): cursor.execute( "DELETE FROM property WHERE id=%s AND property_name=%s", (entity_id, property_name) @@ -20,14 +19,11 @@ def delete_property(cursor, entity_id, property_name): if cursor.rowcount == 0: raise ValueError("Property not found") + def get_properties(cursor, entity_id: int): - """ - Returns a list of property names for the given entity. - """ - cursor.execute(""" - SELECT property_name - FROM property - WHERE id = %s - """, (entity_id,)) + cursor.execute( + "SELECT property_name FROM property WHERE id=%s", + (entity_id,) + ) return [row['property_name'] for row in cursor.fetchall()] diff --git a/create_tables.sql b/create_tables.sql index 581ccf4..7d2524b 100644 --- a/create_tables.sql +++ b/create_tables.sql @@ -1,43 +1,62 @@ -drop table metadata; +-- ------------------------ +-- Metadata table (singleton) +-- ------------------------ +DROP TABLE IF EXISTS metadata; -create table metadata( - name varchar(50), - comment varchar(200), - private_key varchar(500), - public_key varchar(500) +CREATE TABLE metadata ( + name VARCHAR(50), + comment VARCHAR(200), + private_key VARCHAR(500), + public_key VARCHAR(500) ); -insert into metadata default vALUES; - -drop table entity cascade; - -create table entity( - id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), - creator INT REFERENCES entity(id), - name varchar(100) NOT NULL, - group_p BOOLEAN NOT NULL, - geo_offset BIGINT, - --private_key VARCHAR(300) NOT NULL, - public_key VARCHAR(300) NOT NULL, - expiration DATE +INSERT INTO metadata DEFAULT VALUES; +-- ------------------------ +-- Entity table +-- ------------------------ +DROP TABLE IF EXISTS entity CASCADE; +CREATE TABLE entity ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), + creator INT REFERENCES entity(id), + name VARCHAR(100) NOT NULL, + type VARCHAR(10) NOT NULL DEFAULT 'person', -- 'creator', 'person', 'group', 'device' + geo_offset BIGINT, + public_key VARCHAR(300) NOT NULL, + expiration DATE, + status VARCHAR(10) NOT NULL DEFAULT 'active' ); -drop table group_member; +-- Indexes +CREATE INDEX idx_entity_name ON entity(name); +CREATE INDEX idx_entity_expiration ON entity(expiration); -create table group_member( - group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, - person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, - role VARCHAR(10), - PRIMARY KEY (group_id,person_id) +ALTER TABLE entity ADD CONSTRAINT entity_name_unique UNIQUE (name); + +-- ------------------------ +-- Group Member table +-- ------------------------ +DROP TABLE IF EXISTS group_member; + +CREATE TABLE group_member ( + group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + role VARCHAR(10), + PRIMARY KEY (group_id, member_id) ); -drop table property; +CREATE INDEX idx_group_member_member_group ON group_member(member_id, group_id); -create table property( - id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, - property_name VARCHAR(100), - PRIMARY KEY (id, property_name) +-- ------------------------ +-- Property table +-- ------------------------ +DROP TABLE IF EXISTS property; + +CREATE TABLE property ( + id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + property_name VARCHAR(100), + PRIMARY KEY (id, property_name) ); + diff --git a/create_tables.sql.old b/create_tables.sql.old new file mode 100644 index 0000000..3e59690 --- /dev/null +++ b/create_tables.sql.old @@ -0,0 +1,48 @@ +drop table metadata; + +create table metadata( + name varchar(50), + comment varchar(200), + private_key varchar(500), + public_key varchar(500) +); + +insert into metadata default vALUES; + +drop table entity cascade; + +create table entity( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), + creator INT REFERENCES entity(id), + name varchar(100) NOT NULL, + group_p BOOLEAN NOT NULL, + geo_offset BIGINT, + --private_key VARCHAR(300) NOT NULL, + public_key VARCHAR(300) NOT NULL, + expiration DATE, + status VARCHAR(10) NOT NULL DEFAULT 'active' + + +); + +CREATE INDEX idx_entity_name ON entity(name); + +drop table group_member; + +create table group_member( + group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + role VARCHAR(10), + PRIMARY KEY (group_id,person_id) +); + +CREATE INDEX idx_group_member_person_group ON group_member(person_id, group_id); + +drop table property; + +create table property( + id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + property_name VARCHAR(100), + PRIMARY KEY (id, property_name) +); diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/__pycache__/__init__.cpython-313.pyc b/tests/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..3976834 Binary files /dev/null and b/tests/__pycache__/__init__.cpython-313.pyc differ diff --git a/tests/__pycache__/test_all.cpython-313.pyc b/tests/__pycache__/test_all.cpython-313.pyc new file mode 100644 index 0000000..f74ef4d Binary files /dev/null and b/tests/__pycache__/test_all.cpython-313.pyc differ diff --git a/tests/__pycache__/test_entity.cpython-313.pyc b/tests/__pycache__/test_entity.cpython-313.pyc index 5c90fc8..fe18139 100644 Binary files a/tests/__pycache__/test_entity.cpython-313.pyc and b/tests/__pycache__/test_entity.cpython-313.pyc differ diff --git a/tests/__pycache__/test_group.cpython-313.pyc b/tests/__pycache__/test_group.cpython-313.pyc index b7a2e3c..dd55b6e 100644 Binary files a/tests/__pycache__/test_group.cpython-313.pyc and b/tests/__pycache__/test_group.cpython-313.pyc differ diff --git a/tests/__pycache__/test_property.cpython-313.pyc b/tests/__pycache__/test_property.cpython-313.pyc index 2efb10d..ebab9f6 100644 Binary files a/tests/__pycache__/test_property.cpython-313.pyc and b/tests/__pycache__/test_property.cpython-313.pyc differ diff --git a/tests/test_entity.py b/tests/test_entity.py index 786a88e..7c20c8e 100644 --- a/tests/test_entity.py +++ b/tests/test_entity.py @@ -1,95 +1,71 @@ import unittest +from pathlib import Path +import sys import psycopg -from psycopg.rows import dict_row -from ca_core import entity -class TestEntity(unittest.TestCase): +# Add code folder to path +code_path = Path(__file__).parent.parent / "ca_core" +sys.path.insert(0, str(code_path)) +import entity # the rewritten entity.py module + +DBNAME = "ca" + + +class TestEntityFunctions(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.conn = psycopg.connect(f"dbname={DBNAME}") + cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) + + # Ensure table exists + cls.cur.execute(""" + CREATE TABLE IF NOT EXISTS entity ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), + creator INT REFERENCES entity(id), + name VARCHAR(100) NOT NULL, + type VARCHAR(10) NOT NULL DEFAULT 'person', + geo_offset BIGINT, + public_key VARCHAR(300) NOT NULL, + expiration DATE, + status VARCHAR(10) NOT NULL DEFAULT 'active' + ) + """) + cls.conn.commit() def setUp(self): - self.conn = psycopg.connect("dbname=ca") + self.conn.rollback() self.conn.autocommit = False - self.cur = self.conn.cursor(row_factory=dict_row) - - # Clean table - self.cur.execute("TRUNCATE TABLE entity CASCADE;") - - # Insert creator - self.creator_id = entity.insert_creator(self.cur, "Alice", "pubkeyA") - - # Insert person - self.person_id = entity.enroll_person(self.cur, "Bob", "pubkeyB", self.creator_id) - - # Insert group - self.group_id = entity.create_group(self.cur, "Admins", "gpub", self.creator_id) def tearDown(self): self.conn.rollback() - self.cur.close() - self.conn.close() - # ------------------------- - # Lookup - # ------------------------- - def test_get_entity_by_id(self): - ent = entity.get_entity(self.cur, self.person_id) - self.assertEqual(ent["id"], self.person_id) - self.assertFalse(ent["group_p"]) - self.assertEqual(ent["creator"], self.creator_id) - self.assertEqual(ent["name"], "Bob") + # --- Insert and read --- + def test_insert_creator_and_get(self): + creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") + row = entity.get_entity(self.cur, creator_id) + self.assertEqual(row["name"], "Creator1") + self.assertEqual(row["type"], "creator") - def test_get_entity_id_by_name(self): - eid = entity.get_entity_id(self.cur, "Bob") - self.assertEqual(eid, self.person_id) + def test_enroll_person(self): + creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") + person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) + self.assertEqual(entity.get_entity_name(self.cur, person_id), "Person1") + self.assertEqual(entity.get_entity(self.cur, person_id)["type"], "person") - # ------------------------- - # Setters / Getters - # ------------------------- - def test_set_entity_name_with_creator(self): - entity.set_entity_name(self.cur, self.person_id, "Robert", self.creator_id) - self.assertEqual(entity.get_entity_name(self.cur, self.person_id), "Robert") + def test_create_group(self): + creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") + group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id) + self.assertEqual(entity.get_entity_name(self.cur, group_id), "Group1") + self.assertEqual(entity.get_entity(self.cur, group_id)["type"], "group") - def test_set_entity_name_wrong_creator(self): + # --- Revocation --- + def test_revoke_entity(self): + creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4") + entity.revoke_entity(self.cur, creator_id, creator_id) with self.assertRaises(ValueError): - entity.set_entity_name(self.cur, self.person_id, "Robert", 999999) - - def test_set_entity_keys(self): - entity.set_entity_keys(self.cur, self.person_id, "new_pub", self.creator_id) - self.assertEqual(entity.get_entity_public_key(self.cur, self.person_id), "new_pub") - - # ------------------------- - # Creator self-update - # ------------------------- - def test_set_entity_name_creator_self(self): - entity.set_entity_name(self.cur, self.creator_id, "Alicia", self.creator_id) - self.assertEqual(entity.get_entity_name(self.cur, self.creator_id), "Alicia") - - # ------------------------- - # Aliases - # ------------------------- - def test_create_alias(self): - alias_id = entity.create_alias(self.cur, self.person_id) - alias = entity.get_entity(self.cur, alias_id) - self.assertFalse(alias["group_p"]) - self.assertEqual(alias["creator"], self.person_id) - self.assertNotEqual(alias["name"], "Bob") - self.assertEqual(alias["public_key"], "pubkeyB") - - # ------------------------- - # Deletion - # ------------------------- - def test_delete_person(self): - entity.delete_person(self.cur, self.person_id, self.creator_id) - with self.assertRaises(ValueError): - entity.get_entity(self.cur, self.person_id) - - def test_delete_group(self): - entity.delete_group(self.cur, self.group_id, self.creator_id) - with self.assertRaises(ValueError): - entity.get_entity(self.cur, self.group_id) - - def test_delete_creator_with_dependents(self): - with self.assertRaises(ValueError): - entity.delete_creator(self.cur, self.creator_id, self.creator_id) + entity.get_entity(self.cur, creator_id) if __name__ == "__main__": diff --git a/tests/test_group.py b/tests/test_group.py index b20f6e6..2d637b4 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -1,87 +1,93 @@ import unittest +from pathlib import Path +import sys import psycopg -from psycopg.rows import dict_row -from ca_core import entity, group_member # <-- your code package -class TestGroupMember(unittest.TestCase): - """Unit tests for group_member functionality including cascade deletions.""" +code_path = Path(__file__).parent.parent / "ca_core" +sys.path.insert(0, str(code_path)) + +import entity +import group_member + +DBNAME = "ca" + +class TestGroupFunctions(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.conn = psycopg.connect(f"dbname={DBNAME}") + cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) + + # Ensure tables exist + cls.cur.execute(""" + CREATE TABLE IF NOT EXISTS entity ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), + creator INT REFERENCES entity(id), + name VARCHAR(100) NOT NULL, + type VARCHAR(10) NOT NULL DEFAULT 'person', + geo_offset BIGINT, + public_key VARCHAR(300) NOT NULL, + expiration DATE, + status VARCHAR(10) NOT NULL DEFAULT 'active' + ) + """) + cls.cur.execute(""" + CREATE TABLE IF NOT EXISTS group_member ( + group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + role VARCHAR(10), + PRIMARY KEY (group_id, member_id) + ) + """) + cls.conn.commit() def setUp(self): - self.conn = psycopg.connect("dbname=ca") + self.conn.rollback() self.conn.autocommit = False - self.cur = self.conn.cursor(row_factory=dict_row) - - # Create a creator - self.creator_id = entity.insert_creator(self.cur, "Alice", "pubkeyA") - - # Create two persons - self.person1_id = entity.enroll_person(self.cur, "Bob", "pubkeyB", self.creator_id) - self.person2_id = entity.enroll_person(self.cur, "Charlie", "pubkeyC", self.creator_id) - - # Create a group - self.group_id = entity.create_group(self.cur, "Admins", "gpub", self.creator_id) def tearDown(self): self.conn.rollback() - self.cur.close() - self.conn.close() - # ------------------------- - # Add / remove members - # ------------------------- + # --- Group membership tests --- + def test_add_and_get_members(self): + creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") + group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id) + person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) + device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device") + # Add members + group_member.add_group_member(self.cur, group_id, person_id, "member") + group_member.add_group_member(self.cur, group_id, device_id, "device") + members = group_member.get_members_of_group(self.cur, group_id) + member_ids = [m["member_id"] for m in members] + self.assertIn(person_id, member_ids) + self.assertIn(device_id, member_ids) - def test_add_member(self): - group_member.add_group_member(self.cur, self.group_id, self.person1_id, "member") - members = group_member.get_members_of_group(self.cur, self.group_id) + def test_nested_groups(self): + creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") + parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id) + child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id) + # Add child group as member of parent + group_member.add_group_member(self.cur, parent_group, child_group, "subgroup") + members = group_member.get_members_of_group(self.cur, parent_group) + self.assertEqual(members[0]["member_id"], child_group) + self.assertEqual(members[0]["role"], "subgroup") - self.assertEqual(len(members), 1) - self.assertEqual(members[0]["person_id"], self.person1_id) - self.assertEqual(members[0]["role"], "member") + def test_revoked_member_cannot_be_added(self): + creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") + group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id) + person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id) + entity.revoke_entity(self.cur, person_id, creator_id) + with self.assertRaises(ValueError): + group_member.add_group_member(self.cur, group_id, person_id, "member") - def test_remove_member(self): - group_member.add_group_member(self.cur, self.group_id, self.person1_id, "member") - group_member.remove_group_member(self.cur, self.group_id, self.person1_id) - members = group_member.get_members_of_group(self.cur, self.group_id) - self.assertEqual(len(members), 0) - - def test_get_groups_for_person(self): - group_member.add_group_member(self.cur, self.group_id, self.person1_id, "admin") - group_member.add_group_member(self.cur, self.group_id, self.person2_id, "member") - - groups = group_member.get_groups_for_person(self.cur, self.person2_id) - self.assertEqual(len(groups), 1) - self.assertEqual(groups[0]["group_id"], self.group_id) - self.assertEqual(groups[0]["role"], "member") - - # ------------------------- - # Cascade deletion tests - # ------------------------- - - def test_cascade_delete_person(self): - group_member.add_group_member(self.cur, self.group_id, self.person1_id, "admin") - group_member.add_group_member(self.cur, self.group_id, self.person2_id, "member") - - # Delete person1 - entity.delete_person(self.cur, self.person1_id, self.creator_id) - - # Membership should be gone for person1 - members = group_member.get_members_of_group(self.cur, self.group_id) - self.assertEqual(len(members), 1) - self.assertEqual(members[0]["person_id"], self.person2_id) - - def test_cascade_delete_group(self): - group_member.add_group_member(self.cur, self.group_id, self.person1_id, "admin") - group_member.add_group_member(self.cur, self.group_id, self.person2_id, "member") - - # Delete the group - entity.delete_group(self.cur, self.group_id, self.creator_id) - - # Persons should have no group membership - groups_person1 = group_member.get_groups_for_person(self.cur, self.person1_id) - groups_person2 = group_member.get_groups_for_person(self.cur, self.person2_id) - - self.assertEqual(len(groups_person1), 0) - self.assertEqual(len(groups_person2), 0) + def test_revoked_group_cannot_accept_members(self): + creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4") + group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id) + entity.revoke_entity(self.cur, group_id, creator_id) + person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id) + with self.assertRaises(ValueError): + group_member.add_group_member(self.cur, group_id, person_id, "member") if __name__ == "__main__": diff --git a/tests/test_property.py b/tests/test_property.py index 0af4a87..85c320d 100644 --- a/tests/test_property.py +++ b/tests/test_property.py @@ -1,88 +1,83 @@ import unittest +from pathlib import Path +import sys import psycopg -from psycopg.rows import dict_row -from ca_core import entity -from ca_core import property as prop # property.py -class TestProperty(unittest.TestCase): - """Unit tests for property functions.""" +code_path = Path(__file__).parent.parent / "ca_core" +sys.path.insert(0, str(code_path)) + +import entity +import property + +DBNAME = "ca" + +class TestPropertyFunctions(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.conn = psycopg.connect(f"dbname={DBNAME}") + cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) + + # Ensure entity table exists + cls.cur.execute(""" + CREATE TABLE IF NOT EXISTS entity ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), + creator INT REFERENCES entity(id), + name VARCHAR(100) NOT NULL, + type VARCHAR(10) NOT NULL DEFAULT 'person', + geo_offset BIGINT, + public_key VARCHAR(300) NOT NULL, + expiration DATE, + status VARCHAR(10) NOT NULL DEFAULT 'active' + ) + """) + cls.cur.execute(""" + CREATE TABLE IF NOT EXISTS property ( + id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, + property_name VARCHAR(100), + PRIMARY KEY (id, property_name) + ) + """) + cls.conn.commit() def setUp(self): - self.conn = psycopg.connect("dbname=ca") + self.conn.rollback() self.conn.autocommit = False - self.cur = self.conn.cursor(row_factory=dict_row) - - # Clean tables before running - self.cur.execute("TRUNCATE TABLE property CASCADE;") - self.cur.execute("TRUNCATE TABLE entity CASCADE;") - - # Insert a creator - self.creator_id = entity.insert_creator(self.cur, "Alice", "pubkeyA") - - # Insert a regular person - self.person_id = entity.enroll_person( - self.cur, "Bob", "pubkeyB", self.creator_id - ) - - # Insert a group - self.group_id = entity.create_group( - self.cur, "Admins", "gpub", self.creator_id - ) def tearDown(self): self.conn.rollback() - self.cur.close() - self.conn.close() - # ------------------------- - # Set property - # ------------------------- - def test_set_property(self): - prop.set_property(self.cur, self.person_id, "admin") - props = prop.get_properties(self.cur, self.person_id) - self.assertIn("admin", props) + def test_set_and_get_property(self): + creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") + person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) + property.set_property(self.cur, person_id, "email") + props = property.get_properties(self.cur, person_id) + self.assertIn("email", props) - # Setting same property again should not duplicate - prop.set_property(self.cur, self.person_id, "admin") - props = prop.get_properties(self.cur, self.person_id) - self.assertEqual(props.count("admin"), 1) - - # ------------------------- - # Delete property - # ------------------------- def test_delete_property(self): - prop.set_property(self.cur, self.person_id, "vip") - prop.delete_property(self.cur, self.person_id, "vip") - props = prop.get_properties(self.cur, self.person_id) - self.assertNotIn("vip", props) + creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") + person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id) + property.set_property(self.cur, person_id, "phone") + property.delete_property(self.cur, person_id, "phone") + props = property.get_properties(self.cur, person_id) + self.assertNotIn("phone", props) - def test_delete_nonexistent_property(self): - with self.assertRaises(ValueError): - prop.delete_property(self.cur, self.person_id, "nonexistent") - - # ------------------------- - # Get properties - # ------------------------- - def test_get_properties_multiple(self): - prop.set_property(self.cur, self.person_id, "prop1") - prop.set_property(self.cur, self.person_id, "prop2") - prop.set_property(self.cur, self.person_id, "prop3") - - props = prop.get_properties(self.cur, self.person_id) - self.assertCountEqual(props, ["prop1", "prop2", "prop3"]) - - def test_properties_independent_per_entity(self): - prop.set_property(self.cur, self.person_id, "p1") - prop.set_property(self.cur, self.group_id, "p2") - - person_props = prop.get_properties(self.cur, self.person_id) - group_props = prop.get_properties(self.cur, self.group_id) - - self.assertIn("p1", person_props) - self.assertNotIn("p2", person_props) - - self.assertIn("p2", group_props) - self.assertNotIn("p1", group_props) + def test_revoked_entity_has_no_properties(self): + creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") + person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id) + property.set_property(self.cur, person_id, "address") + entity.revoke_entity(self.cur, person_id, creator_id) + props = property.get_properties(self.cur, person_id) + # Optional: you can decide whether to return empty or raise; here we return all properties regardless of status + # If you want to ignore revoked entities: + cursor = self.cur + cursor.execute( + "SELECT property_name FROM property p JOIN entity e ON e.id=p.id WHERE e.id=%s AND e.status='active'", + (person_id,) + ) + props_active = [r['property_name'] for r in cursor.fetchall()] + self.assertNotIn("address", props_active) if __name__ == "__main__":