Compare commits

..

No commits in common. "a1be210f589a7ef1d12b987942968cf431c8dac2" and "2678737d5ebc891e62dbb419ac5d1c630a7f8c9b" have entirely different histories.

21 changed files with 440 additions and 684 deletions

View File

@ -1,167 +0,0 @@
CA/PKI Backend Project Context
Stack
Python 3 + psycopg (dict_row cursors)
PostgreSQL database: ca
Unit tests: unittest (python3 -m unittest discover)
Database Schema (current assumptions)
entity
id INT identity PK
creation_ts TIMESTAMPTZ default now()
creator INT FK → entity(id) (the entity that created this one; nullable)
name VARCHAR(100) NOT NULL
type VARCHAR(...) NOT NULL (e.g. person, group, device, alias)
public_key VARCHAR(300) NOT NULL
symmetrical_key VARCHAR(100) NULL
status VARCHAR(...) NOT NULL default 'active' (values: 'active', 'revoked')
expiration DATE NULL
Index on entity(name) (and other indexes as needed)
group_member
group_id INT FK → entity(id) ON DELETE CASCADE
member_id INT FK → entity(id) ON DELETE CASCADE
role VARCHAR(10)
PK (group_id, member_id)
Index (member_id, group_id)
Groups can contain any entity type, including other groups and devices.
property
Columns: (id INT FK → entity(id), property_name VARCHAR(100))
PK (id, property_name)
Used for flags/roles such as "creator"
metadata
Intended “singleton row” table, enforced at application level
Columns: name, comment, private_key, public_key
log
id SERIAL PK
ts TIMESTAMPTZ default now()
entry TEXT NOT NULL
Every API mutation must log one row here.
Core Business Rules
Creators are not an entity type
creator is a property (property_name='creator') on a person entity.
insert_creator() creates a person entity and inserts the creator property.
Revoked entities are immutable
Any mutation on an entity requires ensure_entity_active(cursor, entity_id).
Revoked entities cannot:
Join groups or accept members
Add/delete properties
Change keys (public_key, symmetrical_key)
Change status again
Logging
All changes to entity, group_member, property, metadata must call log_change(cursor, "...")
Logging happens inside the same transaction (no extra commits).
Python Modules (current structure)
ca_core/entity.py
Must provide:
ensure_entity_active(cursor, entity_id)
insert_creator(cursor, name, public_key)
enroll_person(cursor, name, public_key, creator_id)
create_group(cursor, name, public_key, creator_id)
create_alias(cursor, target_entity_id)
get_entity(cursor, entity_id)
set_entity_status(cursor, entity_id, status, changed_by) (requires active entity)
set_entity_keys(cursor, entity_id, public_key, changed_by) (active-only)
set_symmetrical_key(cursor, entity_id, key, changed_by) (active-only)
get_symmetrical_key(cursor, entity_id)
ca_core/group_member.py
Uses member_id (not person_id)
Must prevent adding revoked groups/members (via ensure_entity_active)
Logs add/remove membership
ca_core/property.py
Table is property(id, property_name) (NOT entity_id/name)
Must reject mutations if entity revoked (immutability)
Logs set/delete property
ca_core/metadata.py
Updates metadata fields and logs changes
ca_core/db_logging.py
log_change(cursor, message: str) inserts into log(entry)
Tests
tests/test_entity.py, tests/test_group.py, tests/test_property.py, tests/test_metadata.py
Tests verify:
Core behaviors (create, enroll, group membership, revoke immutability)
Log entry is created for mutations (case-insensitive substring checks)
Run via:
python3 -m unittest discover
Known gotchas
Avoid naming a module logging.py (conflicts with stdlib). Use db_logging.py.
Schema and code must stay aligned (e.g., property.id/property_name, group_member.member_id).

View File

@ -1,9 +0,0 @@
# db_logging.py
def log_change(cursor, message: str):
"""Insert a log entry into the log table."""
cursor.execute(
"INSERT INTO log (entry) VALUES (%s)",
(message,)
)

View File

@ -1,132 +1,169 @@
from db_logging import log_change
import random
import string
def ensure_entity_active(cursor, entity_id):
"""
Ensure an entity exists and is active.
Revoked entities are immutable.
"""
cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,))
# ------------------------
# Helper for ownership checks
# ------------------------
def _verify_ownership(cursor, entity_id, requesting_creator_id):
cursor.execute(
"SELECT id, creator, type, status FROM entity WHERE id=%s", (entity_id,)
)
row = cursor.fetchone()
if row is None:
raise ValueError("Entity does not exist")
if row["status"] != "active":
raise ValueError("Entity is not active")
def _validate_ca_reference_for_group(ca_reference):
if ca_reference is None:
raise ValueError("ca_reference is required for groups")
if not isinstance(ca_reference, str):
raise ValueError("ca_reference must be a string")
if len(ca_reference) == 0:
raise ValueError("ca_reference cannot be empty")
if len(ca_reference) > 100:
raise ValueError("ca_reference must be at most 100 characters")
if not row or row["status"] != "active":
raise ValueError("Entity not found or inactive")
owner_id = row["creator"]
entity_type = row["type"]
entity_id_db = row["id"]
if entity_type == "creator":
if requesting_creator_id != entity_id_db:
raise ValueError("Creator ID does not match entity owner")
else:
if requesting_creator_id != owner_id:
raise ValueError("Creator ID does not match entity owner")
# ------------------------
# Insertions
# ------------------------
def insert_creator(cursor, name, public_key):
"""
Creators are persons with property 'creator' in the property table.
"""
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, status, ca_reference)
VALUES (%s, 'person', %s, 'active', NULL)
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'creator', %s, NULL, 'active')
RETURNING id
""",
(name, public_key),
(name, public_key)
)
creator_id = cursor.fetchone()["id"]
# Mark as creator via property table (schema: property(id, property_name))
cursor.execute(
"""
INSERT INTO property (id, property_name)
VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING
""",
(creator_id, "creator"),
)
log_change(cursor, f"Created creator entity {creator_id} with name {name}")
return creator_id
return cursor.fetchone()["id"]
def enroll_person(cursor, name, public_key, creator_id):
ensure_entity_active(cursor, creator_id)
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
VALUES (%s, 'person', %s, %s, 'active', NULL)
RETURNING id
""",
(name, public_key, creator_id),
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
person_id = cursor.fetchone()["id"]
log_change(cursor, f"Enrolled person {person_id} under creator {creator_id}")
return person_id
def create_group(cursor, name, public_key, creator_id, ca_reference):
ensure_entity_active(cursor, creator_id)
_validate_ca_reference_for_group(ca_reference)
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
VALUES (%s, 'group', %s, %s, 'active', %s)
RETURNING id
""",
(name, public_key, creator_id, ca_reference),
)
group_id = cursor.fetchone()["id"]
log_change(
cursor,
f"Created group {group_id} under creator {creator_id} with ca_reference {ca_reference}",
)
return group_id
def get_entity(cursor, entity_id):
cursor.execute("SELECT * FROM entity WHERE id = %s", (entity_id,))
return cursor.fetchone()
def set_entity_status(cursor, entity_id, status, changed_by):
"""
Only active entities can change status. Once revoked, immutable.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute("UPDATE entity SET status = %s WHERE id = %s", (status, entity_id))
log_change(cursor, f"Set status of entity {entity_id} to {status} by {changed_by}")
def set_symmetrical_key(cursor, entity_id, key_value, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute(
"UPDATE entity SET symmetrical_key = %s WHERE id = %s",
(key_value, entity_id),
)
log_change(cursor, f"Set symmetrical_key for entity {entity_id} by {changed_by}")
def get_symmetrical_key(cursor, entity_id):
cursor.execute("SELECT symmetrical_key FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
return row["symmetrical_key"] if row else None
def set_entity_keys(cursor, entity_id, public_key, changed_by):
ensure_entity_active(cursor, entity_id)
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
cursor.execute(
"UPDATE entity SET public_key = %s WHERE id = %s",
(public_key, entity_id),
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, %s, 'active')
RETURNING id
""",
(name, public_key, creator_id)
)
log_change(cursor, f"Updated public key for entity {entity_id} by {changed_by}")
return cursor.fetchone()["id"]
def create_group(cursor, name, public_key, creator_id):
cursor.execute(
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
row = cursor.fetchone()
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'group', %s, %s, 'active')
RETURNING id
""",
(name, public_key, creator_id)
)
return cursor.fetchone()["id"]
def create_alias(cursor, person_id):
cursor.execute(
"SELECT id, type, public_key, status FROM entity WHERE id=%s", (person_id,)
)
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Person not found or inactive")
if row["type"] != "person":
raise ValueError("Only persons can create aliases")
random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, %s, 'active')
RETURNING id
""",
(random_name, row["public_key"], person_id)
)
return cursor.fetchone()["id"]
# ------------------------
# Soft-delete / revocation
# ------------------------
def revoke_entity(cursor, entity_id, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET status=%s WHERE id=%s", ("revoked", entity_id)
)
# ------------------------
# Getters / Setters
# ------------------------
def get_entity(cursor, entity_id):
cursor.execute(
"SELECT * FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row
def get_entity_id(cursor, name):
cursor.execute(
"SELECT id FROM entity WHERE name=%s AND status='active'", (name,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["id"]
def get_entity_public_key(cursor, entity_id):
cursor.execute(
"SELECT public_key FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["public_key"]
def get_entity_name(cursor, entity_id):
cursor.execute(
"SELECT name FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["name"]
def set_entity_name(cursor, entity_id, new_name, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id))
def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id)
)
def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id):
set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id)

View File

@ -1,32 +1,42 @@
from db_logging import log_change
from entity import ensure_entity_active
# ca_core/group_member.py
def add_group_member(cursor, group_id: int, member_id: int, role: str):
# Verify group exists and is active
cursor.execute("SELECT type, status FROM entity WHERE id=%s", (group_id,))
row = cursor.fetchone()
if not row or row["status"] != "active" or row["type"] != "group":
raise ValueError("Invalid or inactive group")
def add_group_member(cursor, group_id, member_id, role):
ensure_entity_active(cursor, group_id)
ensure_entity_active(cursor, member_id)
# Verify member exists and is active
cursor.execute("SELECT status FROM entity WHERE id=%s", (member_id,))
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Invalid or inactive member")
cursor.execute(
"""
INSERT INTO group_member (group_id, member_id, role)
VALUES (%s, %s, %s)
""",
"INSERT INTO group_member (group_id, member_id, role) VALUES (%s, %s, %s)",
(group_id, member_id, role)
)
log_change(
cursor,
f"Added member {member_id} to group {group_id} as {role}"
def remove_group_member(cursor, group_id: int, member_id: int):
cursor.execute(
"DELETE FROM group_member WHERE group_id=%s AND member_id=%s",
(group_id, member_id)
)
def get_members_of_group(cursor, group_id):
def get_groups_for_member(cursor, member_id: int):
cursor.execute(
"""
SELECT member_id, role
FROM group_member
WHERE group_id = %s
""",
"SELECT group_id, role FROM group_member WHERE member_id=%s",
(member_id,)
)
return cursor.fetchall()
def get_members_of_group(cursor, group_id: int):
cursor.execute(
"SELECT member_id, role FROM group_member WHERE group_id=%s",
(group_id,)
)
return cursor.fetchall()

View File

@ -1,77 +1,40 @@
from db_logging import log_change
def set_name(cursor, name):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (name) VALUES (%s)",
(name,)
)
log_change(cursor, f"Updated metadata name to {name}")
# ca_core/metadata.py
def get_name(cursor):
cursor.execute("SELECT name FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["name"] if row else None
return row['name'] if row else None
def set_comment(cursor, comment):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (comment) VALUES (%s)",
(comment,)
)
log_change(cursor, f"Updated metadata comment to {comment}")
def set_name(cursor, value):
cursor.execute("UPDATE metadata SET name=%s", (value,))
def get_comment(cursor):
cursor.execute("SELECT comment FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["comment"] if row else None
return row['comment'] if row else None
def set_keys(cursor, public_key, private_key):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"""
INSERT INTO metadata (public_key, private_key)
VALUES (%s, %s)
""",
(public_key, private_key)
)
log_change(cursor, "Updated metadata keys")
def set_comment(cursor, value):
cursor.execute("UPDATE metadata SET comment=%s", (value,))
def get_public_key(cursor):
cursor.execute("SELECT public_key FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["public_key"] if row else None
return row['public_key'] if row else None
def get_private_key(cursor):
cursor.execute("SELECT private_key FROM metadata LIMIT 1")
row = cursor.fetchone()
return row["private_key"] if row else None
return row['private_key'] if row else None
def set_defense_p(cursor, defense_p: bool):
"""Set the metadata defense_p flag.
This table is treated as a singleton row at the application level.
Current convention in this codebase is to wipe and re-insert.
"""
cursor.execute("DELETE FROM metadata")
def set_keys(cursor, public_key, private_key):
cursor.execute(
"INSERT INTO metadata (defense_p) VALUES (%s)",
(defense_p,)
"UPDATE metadata SET public_key=%s, private_key=%s",
(public_key, private_key)
)
log_change(cursor, f"Updated metadata defense_p to {defense_p}")
def get_defense_p(cursor) -> bool:
cursor.execute("SELECT defense_p FROM metadata LIMIT 1")
row = cursor.fetchone()
if not row or row["defense_p"] is None:
return False
return bool(row["defense_p"])

View File

@ -1,104 +1,29 @@
from db_logging import log_change
from entity import ensure_entity_active
def _validate_validation_policy(validation_policy: str) -> str:
if validation_policy is None:
return "default"
if not isinstance(validation_policy, str):
raise TypeError("validation_policy must be a string")
vp = validation_policy.strip()
if not vp:
raise ValueError("validation_policy cannot be empty")
if len(vp) > 19:
raise ValueError("validation_policy must be at most 19 characters")
return vp
def _validate_source(source):
if source is None:
return None
if not isinstance(source, str):
raise TypeError("source must be a string or None")
s = source.strip()
if len(s) > 150:
raise ValueError("source must be at most 150 characters")
# Allow empty string -> treat as NULL for cleanliness
return s if s else None
def set_property(cursor, entity_id, property_name, validation_policy="default", source=None):
"""
Revoked entities are immutable: cannot add/update properties.
Schema: property(id, property_name, validation_policy, source)
- validation_policy: CHAR(19) NOT NULL DEFAULT 'default'
- source: VARCHAR(150) NULL
"""
ensure_entity_active(cursor, entity_id)
if not isinstance(property_name, str) or not property_name.strip():
raise ValueError("property_name must be a non-empty string")
if len(property_name) > 100:
raise ValueError("property_name must be at most 100 characters")
vp = _validate_validation_policy(validation_policy)
src = _validate_source(source)
# ca_core/property.py
def set_property(cursor, entity_id: int, property_name: str):
cursor.execute(
"""
INSERT INTO property (id, property_name, validation_policy, source)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id, property_name)
DO UPDATE SET
validation_policy = EXCLUDED.validation_policy,
source = EXCLUDED.source
INSERT INTO property (id, property_name)
VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING
""",
(entity_id, property_name, vp, src),
)
log_change(
cursor,
f"Set property '{property_name}' for entity {entity_id} "
f"(validation_policy={vp}, source={src})",
(entity_id, property_name)
)
def get_properties(cursor, entity_id):
"""
Returns a list of property_name values for the entity.
"""
cursor.execute(
"SELECT property_name FROM property WHERE id = %s",
(entity_id,),
)
rows = cursor.fetchall()
return [r["property_name"] for r in rows]
def get_property(cursor, entity_id, property_name):
"""
Returns a dict_row with keys: property_name, validation_policy, source
or None if not found.
"""
cursor.execute(
"""
SELECT property_name, validation_policy, source
FROM property
WHERE id = %s AND property_name = %s
""",
(entity_id, property_name),
)
return cursor.fetchone()
def delete_property(cursor, entity_id, property_name):
"""
Revoked entities are immutable: cannot delete properties.
"""
ensure_entity_active(cursor, entity_id)
def delete_property(cursor, entity_id: int, property_name: str):
cursor.execute(
"DELETE FROM property WHERE id=%s AND property_name=%s",
(entity_id, property_name),
(entity_id, property_name)
)
log_change(cursor, f"Deleted property '{property_name}' for entity {entity_id}")
if cursor.rowcount == 0:
raise ValueError("Property not found")
def get_properties(cursor, entity_id: int):
cursor.execute(
"SELECT property_name FROM property WHERE id=%s",
(entity_id,)
)
return [row['property_name'] for row in cursor.fetchall()]

View File

@ -1,5 +1,5 @@
-- ------------------------
-- Metadata (singleton row; enforced at application level)
-- Metadata table (singleton)
-- ------------------------
DROP TABLE IF EXISTS metadata;
@ -7,12 +7,13 @@ CREATE TABLE metadata(
name VARCHAR(50),
comment VARCHAR(200),
private_key VARCHAR(500),
public_key VARCHAR(500),
defense_p BOOLEAN NOT NULL DEFAULT false
public_key VARCHAR(500)
);
INSERT INTO metadata DEFAULT VALUES;
-- ------------------------
-- Entity
-- Entity table
-- ------------------------
DROP TABLE IF EXISTS entity CASCADE;
@ -21,23 +22,21 @@ CREATE TABLE entity(
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(20) NOT NULL, -- person, group, device
symmetrical_key VARCHAR(100),
type VARCHAR(10) NOT NULL DEFAULT 'person', -- 'creator', 'person', 'group', 'device'
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
ca_reference VARCHAR(100),
status VARCHAR(20) NOT NULL DEFAULT 'active',
expiration DATE,
CONSTRAINT entity_ca_reference_check CHECK (
(type = 'group' AND ca_reference IS NOT NULL)
OR
(type <> 'group' AND ca_reference IS NULL)
)
status VARCHAR(10) NOT NULL DEFAULT 'active'
);
-- Indexes
CREATE INDEX idx_entity_name ON entity(name);
CREATE INDEX idx_entity_expiration ON entity(expiration);
ALTER TABLE entity ADD CONSTRAINT entity_name_unique UNIQUE (name);
-- ------------------------
-- Group Member
-- Group Member table
-- ------------------------
DROP TABLE IF EXISTS group_member;
@ -48,30 +47,16 @@ CREATE TABLE group_member(
PRIMARY KEY (group_id, member_id)
);
CREATE INDEX idx_group_member ON group_member(member_id, group_id);
CREATE INDEX idx_group_member_member_group ON group_member(member_id, group_id);
-- ------------------------
-- Property
-- Property table
-- ------------------------
DROP TABLE IF EXISTS property;
CREATE TABLE property (
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100) NOT NULL,
validation_policy CHAR(19) NOT NULL DEFAULT 'default',
source VARCHAR(150),
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
);
-- ------------------------
-- Log Table
-- ------------------------
DROP TABLE IF EXISTS log;
CREATE TABLE log(
id SERIAL PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
entry TEXT NOT NULL
);
CREATE INDEX idx_log_ts ON log(ts);

48
create_tables.sql.old Normal file
View File

@ -0,0 +1,48 @@
drop table metadata;
create table metadata(
name varchar(50),
comment varchar(200),
private_key varchar(500),
public_key varchar(500)
);
insert into metadata default vALUES;
drop table entity cascade;
create table entity(
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name varchar(100) NOT NULL,
group_p BOOLEAN NOT NULL,
geo_offset BIGINT,
--private_key VARCHAR(300) NOT NULL,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
);
CREATE INDEX idx_entity_name ON entity(name);
drop table group_member;
create table group_member(
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id,person_id)
);
CREATE INDEX idx_group_member_person_group ON group_member(person_id, group_id);
drop table property;
create table property(
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
);

View File

@ -1,20 +1,15 @@
import unittest
import sys
from pathlib import Path
import sys
import psycopg
# Add code folder to path
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
import entity
import entity # the rewritten entity.py module
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestEntityFunctions(unittest.TestCase):
@ -23,62 +18,56 @@ class TestEntityFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
# Ensure table exists
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.conn.commit()
def setUp(self):
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
self.conn.rollback()
# --- Insert and read ---
def test_insert_creator_and_get(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["name"], "Creator1")
self.assertIsNone(row["ca_reference"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("creator entity", log_entry)
self.assertIn(str(creator_id), log_entry)
self.assertEqual(row["type"], "creator")
def test_enroll_person(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, person_id), "Person1")
self.assertEqual(entity.get_entity(self.cur, person_id)["type"], "person")
row = entity.get_entity(self.cur, person_id)
self.assertIsNone(row["ca_reference"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("enrolled person", log_entry)
self.assertIn(str(person_id), log_entry)
def test_create_group_requires_ca_reference(self):
def test_create_group(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, group_id), "Group1")
self.assertEqual(entity.get_entity(self.cur, group_id)["type"], "group")
with self.assertRaises(ValueError):
entity.create_group(self.cur, "GroupMissingRef", "pubkey_group", creator_id, None)
def test_create_group_sets_ca_reference(self):
# --- Revocation ---
def test_revoke_entity(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-REF-1")
entity.revoke_entity(self.cur, creator_id, creator_id)
with self.assertRaises(ValueError):
entity.get_entity(self.cur, creator_id)
row = entity.get_entity(self.cur, group_id)
self.assertEqual(row["ca_reference"], "CA-REF-1")
log_entry = get_last_log(self.cur).lower()
self.assertIn("created group", log_entry)
self.assertIn(str(group_id), log_entry)
self.assertIn("ca-ref-1".lower(), log_entry)
if __name__ == "__main__":
unittest.main()
def test_set_and_get_symmetrical_key(self):
creator_id = entity.insert_creator(self.cur, "CreatorSym", "pubkey_sym")
entity.set_symmetrical_key(self.cur, creator_id, "symkey123", creator_id)
row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["symmetrical_key"], "symkey123")
log_entry = get_last_log(self.cur).lower()
self.assertIn("symmetrical_key", log_entry)

View File

@ -1,6 +1,6 @@
import unittest
import sys
from pathlib import Path
import sys
import psycopg
code_path = Path(__file__).parent.parent / "ca_core"
@ -11,12 +11,6 @@ import group_member
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestGroupFunctions(unittest.TestCase):
@classmethod
@ -24,65 +18,78 @@ class TestGroupFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
# Ensure tables exist
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS group_member (
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id, member_id)
)
""")
cls.conn.commit()
def setUp(self):
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
self.conn.rollback()
# --- Group membership tests ---
def test_add_and_get_members(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id)
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-GROUP-1")
device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device")
# Add members
group_member.add_group_member(self.cur, group_id, person_id, "member")
group_member.add_group_member(self.cur, group_id, device_id, "device")
members = group_member.get_members_of_group(self.cur, group_id)
self.assertTrue(
any(m["member_id"] == person_id and m["role"] == "member"
for m in members)
)
log_entry = get_last_log(self.cur).lower()
self.assertIn("added member", log_entry)
self.assertIn(str(group_id), log_entry)
member_ids = [m["member_id"] for m in members]
self.assertIn(person_id, member_ids)
self.assertIn(device_id, member_ids)
def test_nested_groups(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_parent", creator_id, "CA-PARENT")
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_child", creator_id, "CA-CHILD")
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id)
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id)
# Add child group as member of parent
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
members = group_member.get_members_of_group(self.cur, parent_group)
self.assertTrue(
any(m["member_id"] == child_group and m["role"] == "subgroup"
for m in members)
)
def test_revoked_group_cannot_accept_members(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "RevokedGroup", "pubkey_group", creator_id, "CA-REVOKED-GROUP")
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person", creator_id)
entity.set_entity_status(self.cur, group_id, "revoked", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
self.assertEqual(members[0]["member_id"], child_group)
self.assertEqual(members[0]["role"], "subgroup")
def test_revoked_member_cannot_be_added(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "ActiveGroup", "pubkey_group", creator_id, "CA-ACTIVE-GROUP")
person_id = entity.enroll_person(self.cur, "RevokedPerson", "pubkey_person", creator_id)
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id)
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
entity.revoke_entity(self.cur, person_id, creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_revoked_group_cannot_accept_members(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id)
entity.revoke_entity(self.cur, group_id, creator_id)
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
if __name__ == "__main__":
unittest.main()

54
tests/test_metadata.py Normal file → Executable file
View File

@ -3,64 +3,74 @@ import sys
from pathlib import Path
import psycopg
# Add the code directory to Python path
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
import metadata
import metadata # your metadata.py module
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestMetadataFunctions(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Connect to the database
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure table exists and has exactly one row
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS metadata (
name VARCHAR(50),
comment VARCHAR(200),
private_key VARCHAR(500),
public_key VARCHAR(500)
)
""")
cls.cur.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cls.cur.fetchone()
if row['cnt'] == 0:
cls.cur.execute("INSERT INTO metadata DEFAULT VALUES")
cls.conn.commit()
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
def setUp(self):
# Begin transaction for each test
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
# Rollback after each test
self.conn.rollback()
# --- Test name field ---
def test_set_and_get_name(self):
metadata.set_name(self.cur, "AppName")
self.assertEqual(metadata.get_name(self.cur), "AppName")
log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata name", log_entry)
self.assertIn("appname", log_entry)
# --- Test comment field ---
def test_set_and_get_comment(self):
metadata.set_comment(self.cur, "Test comment")
self.assertEqual(metadata.get_comment(self.cur), "Test comment")
log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata comment", log_entry)
# --- Test keys ---
def test_set_and_get_keys(self):
metadata.set_keys(self.cur, "pubkey123", "privkey456")
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
log_entry = get_last_log(self.cur).lower()
self.assertIn("metadata keys", log_entry)
# --- Test keys overwrite ---
def test_keys_overwrite(self):
metadata.set_keys(self.cur, "pub1", "priv1")
metadata.set_keys(self.cur, "pub2", "priv2")
self.assertEqual(metadata.get_public_key(self.cur), "pub2")
self.assertEqual(metadata.get_private_key(self.cur), "priv2")
def test_set_and_get_defense_p(self):
metadata.set_defense_p(self.cur, True)
self.assertEqual(metadata.get_defense_p(self.cur), True)
if __name__ == "__main__":
unittest.main()
log_entry = get_last_log(self.cur).lower()
self.assertIn("defense_p", log_entry)

View File

@ -1,9 +1,8 @@
import unittest
import sys
from pathlib import Path
import sys
import psycopg
# Add core directory to path
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
@ -12,13 +11,6 @@ import property
DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestPropertyFunctions(unittest.TestCase):
@classmethod
@ -26,102 +18,68 @@ class TestPropertyFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
@classmethod
def tearDownClass(cls):
cls.cur.close()
cls.conn.close()
# Ensure entity table exists
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS property (
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
)
""")
cls.conn.commit()
def setUp(self):
self.conn.rollback()
self.conn.autocommit = False
def tearDown(self):
self.conn.rollback()
# ------------------------------------------------------------
# Basic property set/get
# ------------------------------------------------------------
def test_set_and_get_property(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
person_id = entity.enroll_person(
self.cur, "Person1", "pubkey_person", creator_id
)
property.set_property(self.cur, person_id, "prop1")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
property.set_property(self.cur, person_id, "email")
props = property.get_properties(self.cur, person_id)
self.assertIn("prop1", props)
details = property.get_property(self.cur, person_id, "prop1")
self.assertIsNotNone(details)
# CHAR(19) is space-padded in PostgreSQL; strip for comparison.
self.assertEqual(details["validation_policy"].strip(), "default")
self.assertIsNone(details["source"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop1", log_entry)
# ------------------------------------------------------------
# Delete property
# ------------------------------------------------------------
self.assertIn("email", props)
def test_delete_property(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(
self.cur, "Person2", "pubkey_person", creator_id
)
property.set_property(self.cur, person_id, "prop2")
property.delete_property(self.cur, person_id, "prop2")
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
property.set_property(self.cur, person_id, "phone")
property.delete_property(self.cur, person_id, "phone")
props = property.get_properties(self.cur, person_id)
self.assertNotIn("prop2", props)
log_entry = get_last_log(self.cur).lower()
self.assertIn("deleted property", log_entry)
self.assertIn("prop2", log_entry)
def test_set_property_with_policy_and_source(self):
creator_id = entity.insert_creator(self.cur, "CreatorPolicy", "pubkey_policy")
person_id = entity.enroll_person(
self.cur, "PersonPolicy", "pubkey_person", creator_id
)
property.set_property(
self.cur,
person_id,
"prop_policy",
validation_policy="strict",
source="unit-test",
)
details = property.get_property(self.cur, person_id, "prop_policy")
self.assertIsNotNone(details)
self.assertEqual(details["validation_policy"].strip(), "strict")
self.assertEqual(details["source"], "unit-test")
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop_policy", log_entry)
self.assertIn("strict", log_entry)
# ------------------------------------------------------------
# Immutability: revoked entity cannot mutate properties
# ------------------------------------------------------------
self.assertNotIn("phone", props)
def test_revoked_entity_has_no_properties(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
person_id = entity.enroll_person(
self.cur, "Person3", "pubkey_person", creator_id
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
property.set_property(self.cur, person_id, "address")
entity.revoke_entity(self.cur, person_id, creator_id)
props = property.get_properties(self.cur, person_id)
# Optional: you can decide whether to return empty or raise; here we return all properties regardless of status
# If you want to ignore revoked entities:
cursor = self.cur
cursor.execute(
"SELECT property_name FROM property p JOIN entity e ON e.id=p.id WHERE e.id=%s AND e.status='active'",
(person_id,)
)
props_active = [r['property_name'] for r in cursor.fetchall()]
self.assertNotIn("address", props_active)
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
with self.assertRaises(ValueError):
property.set_property(self.cur, person_id, "prop3")
if __name__ == "__main__":
unittest.main()
with self.assertRaises(ValueError):
property.delete_property(self.cur, person_id, "prop3")