96 lines
3.9 KiB
Python
96 lines
3.9 KiB
Python
import unittest
|
|
from pathlib import Path
|
|
import sys
|
|
import psycopg
|
|
|
|
code_path = Path(__file__).parent.parent / "ca_core"
|
|
sys.path.insert(0, str(code_path))
|
|
|
|
import entity
|
|
import group_member
|
|
|
|
DBNAME = "ca"
|
|
|
|
class TestGroupFunctions(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
|
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
|
|
|
# Ensure tables exist
|
|
cls.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS entity (
|
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
creator INT REFERENCES entity(id),
|
|
name VARCHAR(100) NOT NULL,
|
|
type VARCHAR(10) NOT NULL DEFAULT 'person',
|
|
geo_offset BIGINT,
|
|
public_key VARCHAR(300) NOT NULL,
|
|
expiration DATE,
|
|
status VARCHAR(10) NOT NULL DEFAULT 'active'
|
|
)
|
|
""")
|
|
cls.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS group_member (
|
|
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
|
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
|
role VARCHAR(10),
|
|
PRIMARY KEY (group_id, member_id)
|
|
)
|
|
""")
|
|
cls.conn.commit()
|
|
|
|
def setUp(self):
|
|
self.conn.rollback()
|
|
self.conn.autocommit = False
|
|
|
|
def tearDown(self):
|
|
self.conn.rollback()
|
|
|
|
# --- Group membership tests ---
|
|
def test_add_and_get_members(self):
|
|
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
|
group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id)
|
|
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
|
|
device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device")
|
|
# Add members
|
|
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
|
group_member.add_group_member(self.cur, group_id, device_id, "device")
|
|
members = group_member.get_members_of_group(self.cur, group_id)
|
|
member_ids = [m["member_id"] for m in members]
|
|
self.assertIn(person_id, member_ids)
|
|
self.assertIn(device_id, member_ids)
|
|
|
|
def test_nested_groups(self):
|
|
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
|
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id)
|
|
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id)
|
|
# Add child group as member of parent
|
|
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
|
|
members = group_member.get_members_of_group(self.cur, parent_group)
|
|
self.assertEqual(members[0]["member_id"], child_group)
|
|
self.assertEqual(members[0]["role"], "subgroup")
|
|
|
|
def test_revoked_member_cannot_be_added(self):
|
|
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
|
group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id)
|
|
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
|
|
entity.revoke_entity(self.cur, person_id, creator_id)
|
|
with self.assertRaises(ValueError):
|
|
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
|
|
|
def test_revoked_group_cannot_accept_members(self):
|
|
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
|
|
group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id)
|
|
entity.revoke_entity(self.cur, group_id, creator_id)
|
|
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
|
|
with self.assertRaises(ValueError):
|
|
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
|