Changes to be committed:
deleted: .create_tables.sql.swp new file: ca_core/__init__.py new file: ca_core/__pycache__/__init__.cpython-313.pyc new file: ca_core/__pycache__/entity.cpython-313.pyc new file: ca_core/__pycache__/group_member.cpython-313.pyc new file: ca_core/__pycache__/metadata.cpython-313.pyc new file: ca_core/__pycache__/property.cpython-313.pyc new file: ca_core/entity.py new file: ca_core/group_member.py new file: ca_core/metadata.py new file: ca_core/property.py modified: create_tables.sql deleted: tests/.create_testdata.sql.swp new file: tests/__pycache__/test_entity.cpython-313.pyc new file: tests/__pycache__/test_group.cpython-313.pyc new file: tests/__pycache__/test_metadata.cpython-313.pyc new file: tests/__pycache__/test_property.cpython-313.pyc new file: tests/test_entity.py new file: tests/test_group.py new file: tests/test_metadata.py new file: tests/test_property.py
This commit is contained in:
parent
b8bec4fe7c
commit
5c8153ed19
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,170 @@
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Helper for ownership checks (Option 2)
|
||||||
|
# ------------------------
|
||||||
|
def _verify_ownership(cursor, entity_id, requesting_creator_id):
|
||||||
|
cursor.execute("SELECT id, creator FROM entity WHERE id=%s", (entity_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found")
|
||||||
|
owner_id = row["creator"]
|
||||||
|
entity_id_db = row["id"]
|
||||||
|
|
||||||
|
if owner_id is None:
|
||||||
|
# Entity is a creator → allow only self
|
||||||
|
if requesting_creator_id != entity_id_db:
|
||||||
|
raise ValueError("Creator ID does not match entity owner")
|
||||||
|
else:
|
||||||
|
# Entity is a person/group → allow original creator
|
||||||
|
if requesting_creator_id != owner_id:
|
||||||
|
raise ValueError("Creator ID does not match entity owner")
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Insertions
|
||||||
|
# ------------------------
|
||||||
|
def insert_creator(cursor, name, public_key):
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO entity (name, group_p, public_key, creator)
|
||||||
|
VALUES (%s, FALSE, %s, NULL)
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(name, public_key)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def enroll_person(cursor, name, public_key, creator_id):
|
||||||
|
cursor.execute("SELECT creator FROM entity WHERE id=%s", (creator_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row or row["creator"] is not None:
|
||||||
|
raise ValueError("Provided creator_id does not correspond to a creator")
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO entity (name, group_p, public_key, creator)
|
||||||
|
VALUES (%s, FALSE, %s, %s)
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(name, public_key, creator_id)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def create_group(cursor, name, public_key, creator_id):
|
||||||
|
cursor.execute("SELECT creator FROM entity WHERE id=%s", (creator_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row or row["creator"] is not None:
|
||||||
|
raise ValueError("Provided creator_id does not correspond to a creator")
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO entity (name, group_p, public_key, creator)
|
||||||
|
VALUES (%s, TRUE, %s, %s)
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(name, public_key, creator_id)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def create_alias(cursor, person_id):
|
||||||
|
cursor.execute("SELECT id, group_p, public_key FROM entity WHERE id=%s", (person_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Person not found")
|
||||||
|
if row["group_p"]:
|
||||||
|
raise ValueError("Groups cannot create aliases")
|
||||||
|
random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO entity (name, group_p, public_key, creator)
|
||||||
|
VALUES (%s, FALSE, %s, %s)
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(random_name, row["public_key"], person_id)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Deletion
|
||||||
|
# ------------------------
|
||||||
|
def delete_person(cursor, person_id, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, person_id, requesting_creator_id)
|
||||||
|
cursor.execute("DELETE FROM entity WHERE id=%s AND group_p=FALSE", (person_id,))
|
||||||
|
if cursor.rowcount == 0:
|
||||||
|
raise ValueError("Person not found or already deleted")
|
||||||
|
|
||||||
|
|
||||||
|
def delete_group(cursor, group_id, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, group_id, requesting_creator_id)
|
||||||
|
cursor.execute("DELETE FROM entity WHERE id=%s AND group_p=TRUE", (group_id,))
|
||||||
|
if cursor.rowcount == 0:
|
||||||
|
raise ValueError("Group not found or already deleted")
|
||||||
|
|
||||||
|
|
||||||
|
def delete_creator(cursor, creator_id, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, creator_id, requesting_creator_id)
|
||||||
|
# check if creator has created other entities
|
||||||
|
cursor.execute("SELECT COUNT(*) as cnt FROM entity WHERE creator=%s", (creator_id,))
|
||||||
|
if cursor.fetchone()["cnt"] > 0:
|
||||||
|
raise ValueError("Creator cannot be deleted because it has created other entities")
|
||||||
|
cursor.execute("DELETE FROM entity WHERE id=%s AND creator IS NULL", (creator_id,))
|
||||||
|
if cursor.rowcount == 0:
|
||||||
|
raise ValueError("Creator not found or already deleted")
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Getters
|
||||||
|
# ------------------------
|
||||||
|
def get_entity(cursor, entity_id):
|
||||||
|
cursor.execute("SELECT * FROM entity WHERE id=%s", (entity_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found")
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def get_entity_id(cursor, name):
|
||||||
|
cursor.execute("SELECT id FROM entity WHERE name=%s", (name,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found")
|
||||||
|
return row["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_entity_public_key(cursor, entity_id):
|
||||||
|
cursor.execute("SELECT public_key FROM entity WHERE id=%s", (entity_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found")
|
||||||
|
return row["public_key"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_entity_name(cursor, entity_id):
|
||||||
|
cursor.execute("SELECT name FROM entity WHERE id=%s", (entity_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found")
|
||||||
|
return row["name"]
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Setters
|
||||||
|
# ------------------------
|
||||||
|
def set_entity_name(cursor, entity_id, new_name, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, entity_id, requesting_creator_id)
|
||||||
|
cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id))
|
||||||
|
|
||||||
|
|
||||||
|
def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, entity_id, requesting_creator_id)
|
||||||
|
cursor.execute("UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id))
|
||||||
|
|
||||||
|
|
||||||
|
def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id):
|
||||||
|
# only public_key for current schema
|
||||||
|
set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id)
|
||||||
|
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
# ca_core/group_member.py
|
||||||
|
|
||||||
|
def add_group_member(cursor, group_id: int, person_id: int, role: str):
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT INTO group_member (group_id, person_id, role) VALUES (%s, %s, %s)",
|
||||||
|
(group_id, person_id, role)
|
||||||
|
)
|
||||||
|
|
||||||
|
def remove_group_member(cursor, group_id: int, person_id: int):
|
||||||
|
cursor.execute(
|
||||||
|
"DELETE FROM group_member WHERE group_id = %s AND person_id = %s",
|
||||||
|
(group_id, person_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_groups_for_person(cursor, person_id: int):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT group_id, role FROM group_member WHERE person_id = %s",
|
||||||
|
(person_id,)
|
||||||
|
)
|
||||||
|
return cursor.fetchall()
|
||||||
|
|
||||||
|
def get_members_of_group(cursor, group_id: int):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT person_id, role FROM group_member WHERE group_id = %s",
|
||||||
|
(group_id,)
|
||||||
|
)
|
||||||
|
return cursor.fetchall()
|
||||||
|
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
def get_name(cursor):
|
||||||
|
cursor.execute("SELECT name FROM metadata LIMIT 1")
|
||||||
|
row = cursor.fetchone()
|
||||||
|
return row['name'] if row else None
|
||||||
|
|
||||||
|
def set_name(cursor, value):
|
||||||
|
cursor.execute("UPDATE metadata SET name = %s", (value,))
|
||||||
|
|
||||||
|
def get_comment(cursor):
|
||||||
|
cursor.execute("SELECT comment FROM metadata LIMIT 1")
|
||||||
|
row = cursor.fetchone()
|
||||||
|
return row['comment'] if row else None
|
||||||
|
|
||||||
|
def set_comment(cursor, value):
|
||||||
|
cursor.execute("UPDATE metadata SET comment = %s", (value,))
|
||||||
|
|
||||||
|
def get_public_key(cursor):
|
||||||
|
cursor.execute("SELECT public_key FROM metadata LIMIT 1")
|
||||||
|
row = cursor.fetchone()
|
||||||
|
return row['public_key'] if row else None
|
||||||
|
|
||||||
|
def get_private_key(cursor):
|
||||||
|
cursor.execute("SELECT private_key FROM metadata LIMIT 1")
|
||||||
|
row = cursor.fetchone()
|
||||||
|
return row['private_key'] if row else None
|
||||||
|
|
||||||
|
def set_keys(cursor, public_key, private_key):
|
||||||
|
"""
|
||||||
|
Sets both public and private keys together
|
||||||
|
"""
|
||||||
|
cursor.execute("""
|
||||||
|
UPDATE metadata
|
||||||
|
SET public_key = %s, private_key = %s
|
||||||
|
""", (public_key, private_key))
|
||||||
|
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
def set_property(cursor, entity_id: int, property_name: str):
|
||||||
|
"""
|
||||||
|
Adds a property for the entity. If it already exists, does nothing.
|
||||||
|
"""
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO property (id, property_name)
|
||||||
|
VALUES (%s, %s)
|
||||||
|
ON CONFLICT (id, property_name) DO NOTHING
|
||||||
|
""", (entity_id, property_name))
|
||||||
|
|
||||||
|
def delete_property(cursor, entity_id, property_name):
|
||||||
|
"""
|
||||||
|
Remove a property from an entity.
|
||||||
|
Raises ValueError if the property does not exist.
|
||||||
|
"""
|
||||||
|
cursor.execute(
|
||||||
|
"DELETE FROM property WHERE id=%s AND property_name=%s",
|
||||||
|
(entity_id, property_name)
|
||||||
|
)
|
||||||
|
if cursor.rowcount == 0:
|
||||||
|
raise ValueError("Property not found")
|
||||||
|
|
||||||
|
def get_properties(cursor, entity_id: int):
|
||||||
|
"""
|
||||||
|
Returns a list of property names for the given entity.
|
||||||
|
"""
|
||||||
|
cursor.execute("""
|
||||||
|
SELECT property_name
|
||||||
|
FROM property
|
||||||
|
WHERE id = %s
|
||||||
|
""", (entity_id,))
|
||||||
|
return [row['property_name'] for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
|
@ -1,8 +1,20 @@
|
||||||
drop table entity;
|
drop table metadata;
|
||||||
|
|
||||||
|
create table metadata(
|
||||||
|
name varchar(50),
|
||||||
|
comment varchar(200),
|
||||||
|
private_key varchar(500),
|
||||||
|
public_key varchar(500)
|
||||||
|
);
|
||||||
|
|
||||||
|
insert into metadata default vALUES;
|
||||||
|
|
||||||
|
drop table entity cascade;
|
||||||
|
|
||||||
create table entity(
|
create table entity(
|
||||||
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
creator INT REFERENCES entity(id),
|
||||||
name varchar(100) NOT NULL,
|
name varchar(100) NOT NULL,
|
||||||
group_p BOOLEAN NOT NULL,
|
group_p BOOLEAN NOT NULL,
|
||||||
geo_offset BIGINT,
|
geo_offset BIGINT,
|
||||||
|
|
@ -16,8 +28,8 @@ create table entity(
|
||||||
drop table group_member;
|
drop table group_member;
|
||||||
|
|
||||||
create table group_member(
|
create table group_member(
|
||||||
group_id INT,
|
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
person_id INT,
|
person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
role VARCHAR(10),
|
role VARCHAR(10),
|
||||||
PRIMARY KEY (group_id,person_id)
|
PRIMARY KEY (group_id,person_id)
|
||||||
);
|
);
|
||||||
|
|
@ -25,6 +37,7 @@ create table group_member(
|
||||||
drop table property;
|
drop table property;
|
||||||
|
|
||||||
create table property(
|
create table property(
|
||||||
id INT NOT NULL,
|
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
property_name VARCHAR(100)
|
property_name VARCHAR(100),
|
||||||
|
PRIMARY KEY (id, property_name)
|
||||||
);
|
);
|
||||||
|
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,97 @@
|
||||||
|
import unittest
|
||||||
|
import psycopg
|
||||||
|
from psycopg.rows import dict_row
|
||||||
|
from ca_core import entity
|
||||||
|
|
||||||
|
class TestEntity(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.conn = psycopg.connect("dbname=ca")
|
||||||
|
self.conn.autocommit = False
|
||||||
|
self.cur = self.conn.cursor(row_factory=dict_row)
|
||||||
|
|
||||||
|
# Clean table
|
||||||
|
self.cur.execute("TRUNCATE TABLE entity CASCADE;")
|
||||||
|
|
||||||
|
# Insert creator
|
||||||
|
self.creator_id = entity.insert_creator(self.cur, "Alice", "pubkeyA")
|
||||||
|
|
||||||
|
# Insert person
|
||||||
|
self.person_id = entity.enroll_person(self.cur, "Bob", "pubkeyB", self.creator_id)
|
||||||
|
|
||||||
|
# Insert group
|
||||||
|
self.group_id = entity.create_group(self.cur, "Admins", "gpub", self.creator_id)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.conn.rollback()
|
||||||
|
self.cur.close()
|
||||||
|
self.conn.close()
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Lookup
|
||||||
|
# -------------------------
|
||||||
|
def test_get_entity_by_id(self):
|
||||||
|
ent = entity.get_entity(self.cur, self.person_id)
|
||||||
|
self.assertEqual(ent["id"], self.person_id)
|
||||||
|
self.assertFalse(ent["group_p"])
|
||||||
|
self.assertEqual(ent["creator"], self.creator_id)
|
||||||
|
self.assertEqual(ent["name"], "Bob")
|
||||||
|
|
||||||
|
def test_get_entity_id_by_name(self):
|
||||||
|
eid = entity.get_entity_id(self.cur, "Bob")
|
||||||
|
self.assertEqual(eid, self.person_id)
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Setters / Getters
|
||||||
|
# -------------------------
|
||||||
|
def test_set_entity_name_with_creator(self):
|
||||||
|
entity.set_entity_name(self.cur, self.person_id, "Robert", self.creator_id)
|
||||||
|
self.assertEqual(entity.get_entity_name(self.cur, self.person_id), "Robert")
|
||||||
|
|
||||||
|
def test_set_entity_name_wrong_creator(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
entity.set_entity_name(self.cur, self.person_id, "Robert", 999999)
|
||||||
|
|
||||||
|
def test_set_entity_keys(self):
|
||||||
|
entity.set_entity_keys(self.cur, self.person_id, "new_pub", self.creator_id)
|
||||||
|
self.assertEqual(entity.get_entity_public_key(self.cur, self.person_id), "new_pub")
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Creator self-update
|
||||||
|
# -------------------------
|
||||||
|
def test_set_entity_name_creator_self(self):
|
||||||
|
entity.set_entity_name(self.cur, self.creator_id, "Alicia", self.creator_id)
|
||||||
|
self.assertEqual(entity.get_entity_name(self.cur, self.creator_id), "Alicia")
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Aliases
|
||||||
|
# -------------------------
|
||||||
|
def test_create_alias(self):
|
||||||
|
alias_id = entity.create_alias(self.cur, self.person_id)
|
||||||
|
alias = entity.get_entity(self.cur, alias_id)
|
||||||
|
self.assertFalse(alias["group_p"])
|
||||||
|
self.assertEqual(alias["creator"], self.person_id)
|
||||||
|
self.assertNotEqual(alias["name"], "Bob")
|
||||||
|
self.assertEqual(alias["public_key"], "pubkeyB")
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Deletion
|
||||||
|
# -------------------------
|
||||||
|
def test_delete_person(self):
|
||||||
|
entity.delete_person(self.cur, self.person_id, self.creator_id)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
entity.get_entity(self.cur, self.person_id)
|
||||||
|
|
||||||
|
def test_delete_group(self):
|
||||||
|
entity.delete_group(self.cur, self.group_id, self.creator_id)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
entity.get_entity(self.cur, self.group_id)
|
||||||
|
|
||||||
|
def test_delete_creator_with_dependents(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
entity.delete_creator(self.cur, self.creator_id, self.creator_id)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
||||||
|
|
@ -0,0 +1,89 @@
|
||||||
|
import unittest
|
||||||
|
import psycopg
|
||||||
|
from psycopg.rows import dict_row
|
||||||
|
from ca_core import entity, group_member # <-- your code package
|
||||||
|
|
||||||
|
class TestGroupMember(unittest.TestCase):
|
||||||
|
"""Unit tests for group_member functionality including cascade deletions."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.conn = psycopg.connect("dbname=ca")
|
||||||
|
self.conn.autocommit = False
|
||||||
|
self.cur = self.conn.cursor(row_factory=dict_row)
|
||||||
|
|
||||||
|
# Create a creator
|
||||||
|
self.creator_id = entity.insert_creator(self.cur, "Alice", "pubkeyA")
|
||||||
|
|
||||||
|
# Create two persons
|
||||||
|
self.person1_id = entity.enroll_person(self.cur, "Bob", "pubkeyB", self.creator_id)
|
||||||
|
self.person2_id = entity.enroll_person(self.cur, "Charlie", "pubkeyC", self.creator_id)
|
||||||
|
|
||||||
|
# Create a group
|
||||||
|
self.group_id = entity.create_group(self.cur, "Admins", "gpub", self.creator_id)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.conn.rollback()
|
||||||
|
self.cur.close()
|
||||||
|
self.conn.close()
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Add / remove members
|
||||||
|
# -------------------------
|
||||||
|
|
||||||
|
def test_add_member(self):
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person1_id, "member")
|
||||||
|
members = group_member.get_members_of_group(self.cur, self.group_id)
|
||||||
|
|
||||||
|
self.assertEqual(len(members), 1)
|
||||||
|
self.assertEqual(members[0]["person_id"], self.person1_id)
|
||||||
|
self.assertEqual(members[0]["role"], "member")
|
||||||
|
|
||||||
|
def test_remove_member(self):
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person1_id, "member")
|
||||||
|
group_member.remove_group_member(self.cur, self.group_id, self.person1_id)
|
||||||
|
members = group_member.get_members_of_group(self.cur, self.group_id)
|
||||||
|
self.assertEqual(len(members), 0)
|
||||||
|
|
||||||
|
def test_get_groups_for_person(self):
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person1_id, "admin")
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person2_id, "member")
|
||||||
|
|
||||||
|
groups = group_member.get_groups_for_person(self.cur, self.person2_id)
|
||||||
|
self.assertEqual(len(groups), 1)
|
||||||
|
self.assertEqual(groups[0]["group_id"], self.group_id)
|
||||||
|
self.assertEqual(groups[0]["role"], "member")
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Cascade deletion tests
|
||||||
|
# -------------------------
|
||||||
|
|
||||||
|
def test_cascade_delete_person(self):
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person1_id, "admin")
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person2_id, "member")
|
||||||
|
|
||||||
|
# Delete person1
|
||||||
|
entity.delete_person(self.cur, self.person1_id, self.creator_id)
|
||||||
|
|
||||||
|
# Membership should be gone for person1
|
||||||
|
members = group_member.get_members_of_group(self.cur, self.group_id)
|
||||||
|
self.assertEqual(len(members), 1)
|
||||||
|
self.assertEqual(members[0]["person_id"], self.person2_id)
|
||||||
|
|
||||||
|
def test_cascade_delete_group(self):
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person1_id, "admin")
|
||||||
|
group_member.add_group_member(self.cur, self.group_id, self.person2_id, "member")
|
||||||
|
|
||||||
|
# Delete the group
|
||||||
|
entity.delete_group(self.cur, self.group_id, self.creator_id)
|
||||||
|
|
||||||
|
# Persons should have no group membership
|
||||||
|
groups_person1 = group_member.get_groups_for_person(self.cur, self.person1_id)
|
||||||
|
groups_person2 = group_member.get_groups_for_person(self.cur, self.person2_id)
|
||||||
|
|
||||||
|
self.assertEqual(len(groups_person1), 0)
|
||||||
|
self.assertEqual(len(groups_person2), 0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
||||||
|
|
@ -0,0 +1,76 @@
|
||||||
|
import unittest
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
import psycopg
|
||||||
|
|
||||||
|
# Add the code directory to Python path
|
||||||
|
code_path = Path(__file__).parent.parent / "ca_core"
|
||||||
|
sys.path.insert(0, str(code_path))
|
||||||
|
|
||||||
|
import metadata # your metadata.py module
|
||||||
|
|
||||||
|
DBNAME = "ca"
|
||||||
|
|
||||||
|
class TestMetadataFunctions(unittest.TestCase):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
# Connect to the database
|
||||||
|
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
||||||
|
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
||||||
|
|
||||||
|
# Ensure table exists and has exactly one row
|
||||||
|
cls.cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS metadata (
|
||||||
|
name VARCHAR(50),
|
||||||
|
comment VARCHAR(200),
|
||||||
|
private_key VARCHAR(500),
|
||||||
|
public_key VARCHAR(500)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.cur.execute("SELECT COUNT(*) AS cnt FROM metadata")
|
||||||
|
row = cls.cur.fetchone()
|
||||||
|
if row['cnt'] == 0:
|
||||||
|
cls.cur.execute("INSERT INTO metadata DEFAULT VALUES")
|
||||||
|
cls.conn.commit()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
cls.cur.close()
|
||||||
|
cls.conn.close()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
# Begin transaction for each test
|
||||||
|
self.conn.rollback()
|
||||||
|
self.conn.autocommit = False
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Rollback after each test
|
||||||
|
self.conn.rollback()
|
||||||
|
|
||||||
|
# --- Test name field ---
|
||||||
|
def test_set_and_get_name(self):
|
||||||
|
metadata.set_name(self.cur, "AppName")
|
||||||
|
self.assertEqual(metadata.get_name(self.cur), "AppName")
|
||||||
|
|
||||||
|
# --- Test comment field ---
|
||||||
|
def test_set_and_get_comment(self):
|
||||||
|
metadata.set_comment(self.cur, "Test comment")
|
||||||
|
self.assertEqual(metadata.get_comment(self.cur), "Test comment")
|
||||||
|
|
||||||
|
# --- Test keys ---
|
||||||
|
def test_set_and_get_keys(self):
|
||||||
|
metadata.set_keys(self.cur, "pubkey123", "privkey456")
|
||||||
|
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
|
||||||
|
self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
|
||||||
|
|
||||||
|
# --- Test keys overwrite ---
|
||||||
|
def test_keys_overwrite(self):
|
||||||
|
metadata.set_keys(self.cur, "pub1", "priv1")
|
||||||
|
metadata.set_keys(self.cur, "pub2", "priv2")
|
||||||
|
self.assertEqual(metadata.get_public_key(self.cur), "pub2")
|
||||||
|
self.assertEqual(metadata.get_private_key(self.cur), "priv2")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
||||||
|
|
@ -0,0 +1,90 @@
|
||||||
|
import unittest
|
||||||
|
import psycopg
|
||||||
|
from psycopg.rows import dict_row
|
||||||
|
from ca_core import entity
|
||||||
|
from ca_core import property as prop # property.py
|
||||||
|
|
||||||
|
class TestProperty(unittest.TestCase):
|
||||||
|
"""Unit tests for property functions."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.conn = psycopg.connect("dbname=ca")
|
||||||
|
self.conn.autocommit = False
|
||||||
|
self.cur = self.conn.cursor(row_factory=dict_row)
|
||||||
|
|
||||||
|
# Clean tables before running
|
||||||
|
self.cur.execute("TRUNCATE TABLE property CASCADE;")
|
||||||
|
self.cur.execute("TRUNCATE TABLE entity CASCADE;")
|
||||||
|
|
||||||
|
# Insert a creator
|
||||||
|
self.creator_id = entity.insert_creator(self.cur, "Alice", "pubkeyA")
|
||||||
|
|
||||||
|
# Insert a regular person
|
||||||
|
self.person_id = entity.enroll_person(
|
||||||
|
self.cur, "Bob", "pubkeyB", self.creator_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert a group
|
||||||
|
self.group_id = entity.create_group(
|
||||||
|
self.cur, "Admins", "gpub", self.creator_id
|
||||||
|
)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.conn.rollback()
|
||||||
|
self.cur.close()
|
||||||
|
self.conn.close()
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Set property
|
||||||
|
# -------------------------
|
||||||
|
def test_set_property(self):
|
||||||
|
prop.set_property(self.cur, self.person_id, "admin")
|
||||||
|
props = prop.get_properties(self.cur, self.person_id)
|
||||||
|
self.assertIn("admin", props)
|
||||||
|
|
||||||
|
# Setting same property again should not duplicate
|
||||||
|
prop.set_property(self.cur, self.person_id, "admin")
|
||||||
|
props = prop.get_properties(self.cur, self.person_id)
|
||||||
|
self.assertEqual(props.count("admin"), 1)
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Delete property
|
||||||
|
# -------------------------
|
||||||
|
def test_delete_property(self):
|
||||||
|
prop.set_property(self.cur, self.person_id, "vip")
|
||||||
|
prop.delete_property(self.cur, self.person_id, "vip")
|
||||||
|
props = prop.get_properties(self.cur, self.person_id)
|
||||||
|
self.assertNotIn("vip", props)
|
||||||
|
|
||||||
|
def test_delete_nonexistent_property(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
prop.delete_property(self.cur, self.person_id, "nonexistent")
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Get properties
|
||||||
|
# -------------------------
|
||||||
|
def test_get_properties_multiple(self):
|
||||||
|
prop.set_property(self.cur, self.person_id, "prop1")
|
||||||
|
prop.set_property(self.cur, self.person_id, "prop2")
|
||||||
|
prop.set_property(self.cur, self.person_id, "prop3")
|
||||||
|
|
||||||
|
props = prop.get_properties(self.cur, self.person_id)
|
||||||
|
self.assertCountEqual(props, ["prop1", "prop2", "prop3"])
|
||||||
|
|
||||||
|
def test_properties_independent_per_entity(self):
|
||||||
|
prop.set_property(self.cur, self.person_id, "p1")
|
||||||
|
prop.set_property(self.cur, self.group_id, "p2")
|
||||||
|
|
||||||
|
person_props = prop.get_properties(self.cur, self.person_id)
|
||||||
|
group_props = prop.get_properties(self.cur, self.group_id)
|
||||||
|
|
||||||
|
self.assertIn("p1", person_props)
|
||||||
|
self.assertNotIn("p2", person_props)
|
||||||
|
|
||||||
|
self.assertIn("p2", group_props)
|
||||||
|
self.assertNotIn("p1", group_props)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
||||||
Loading…
Reference in New Issue