Compare commits

..

No commits in common. "a1be210f589a7ef1d12b987942968cf431c8dac2" and "2678737d5ebc891e62dbb419ac5d1c630a7f8c9b" have entirely different histories.

21 changed files with 440 additions and 684 deletions

View File

@ -1,167 +0,0 @@
CA/PKI Backend Project Context
Stack
Python 3 + psycopg (dict_row cursors)
PostgreSQL database: ca
Unit tests: unittest (python3 -m unittest discover)
Database Schema (current assumptions)
entity
id INT identity PK
creation_ts TIMESTAMPTZ default now()
creator INT FK → entity(id) (the entity that created this one; nullable)
name VARCHAR(100) NOT NULL
type VARCHAR(...) NOT NULL (e.g. person, group, device, alias)
public_key VARCHAR(300) NOT NULL
symmetrical_key VARCHAR(100) NULL
status VARCHAR(...) NOT NULL default 'active' (values: 'active', 'revoked')
expiration DATE NULL
Index on entity(name) (and other indexes as needed)
group_member
group_id INT FK → entity(id) ON DELETE CASCADE
member_id INT FK → entity(id) ON DELETE CASCADE
role VARCHAR(10)
PK (group_id, member_id)
Index (member_id, group_id)
Groups can contain any entity type, including other groups and devices.
property
Columns: (id INT FK → entity(id), property_name VARCHAR(100))
PK (id, property_name)
Used for flags/roles such as "creator"
metadata
Intended “singleton row” table, enforced at application level
Columns: name, comment, private_key, public_key
log
id SERIAL PK
ts TIMESTAMPTZ default now()
entry TEXT NOT NULL
Every API mutation must log one row here.
Core Business Rules
Creators are not an entity type
creator is a property (property_name='creator') on a person entity.
insert_creator() creates a person entity and inserts the creator property.
Revoked entities are immutable
Any mutation on an entity requires ensure_entity_active(cursor, entity_id).
Revoked entities cannot:
Join groups or accept members
Add/delete properties
Change keys (public_key, symmetrical_key)
Change status again
Logging
All changes to entity, group_member, property, metadata must call log_change(cursor, "...")
Logging happens inside the same transaction (no extra commits).
Python Modules (current structure)
ca_core/entity.py
Must provide:
ensure_entity_active(cursor, entity_id)
insert_creator(cursor, name, public_key)
enroll_person(cursor, name, public_key, creator_id)
create_group(cursor, name, public_key, creator_id)
create_alias(cursor, target_entity_id)
get_entity(cursor, entity_id)
set_entity_status(cursor, entity_id, status, changed_by) (requires active entity)
set_entity_keys(cursor, entity_id, public_key, changed_by) (active-only)
set_symmetrical_key(cursor, entity_id, key, changed_by) (active-only)
get_symmetrical_key(cursor, entity_id)
ca_core/group_member.py
Uses member_id (not person_id)
Must prevent adding revoked groups/members (via ensure_entity_active)
Logs add/remove membership
ca_core/property.py
Table is property(id, property_name) (NOT entity_id/name)
Must reject mutations if entity revoked (immutability)
Logs set/delete property
ca_core/metadata.py
Updates metadata fields and logs changes
ca_core/db_logging.py
log_change(cursor, message: str) inserts into log(entry)
Tests
tests/test_entity.py, tests/test_group.py, tests/test_property.py, tests/test_metadata.py
Tests verify:
Core behaviors (create, enroll, group membership, revoke immutability)
Log entry is created for mutations (case-insensitive substring checks)
Run via:
python3 -m unittest discover
Known gotchas
Avoid naming a module logging.py (conflicts with stdlib). Use db_logging.py.
Schema and code must stay aligned (e.g., property.id/property_name, group_member.member_id).

View File

@ -1,9 +0,0 @@
# db_logging.py
def log_change(cursor, message: str):
"""Insert a log entry into the log table."""
cursor.execute(
"INSERT INTO log (entry) VALUES (%s)",
(message,)
)

View File

@ -1,132 +1,169 @@
from db_logging import log_change import random
import string
# ------------------------
def ensure_entity_active(cursor, entity_id): # Helper for ownership checks
""" # ------------------------
Ensure an entity exists and is active. def _verify_ownership(cursor, entity_id, requesting_creator_id):
Revoked entities are immutable. cursor.execute(
""" "SELECT id, creator, type, status FROM entity WHERE id=%s", (entity_id,)
cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,)) )
row = cursor.fetchone() row = cursor.fetchone()
if row is None: if not row or row["status"] != "active":
raise ValueError("Entity does not exist") raise ValueError("Entity not found or inactive")
if row["status"] != "active": owner_id = row["creator"]
raise ValueError("Entity is not active") entity_type = row["type"]
entity_id_db = row["id"]
def _validate_ca_reference_for_group(ca_reference): if entity_type == "creator":
if ca_reference is None: if requesting_creator_id != entity_id_db:
raise ValueError("ca_reference is required for groups") raise ValueError("Creator ID does not match entity owner")
if not isinstance(ca_reference, str): else:
raise ValueError("ca_reference must be a string") if requesting_creator_id != owner_id:
if len(ca_reference) == 0: raise ValueError("Creator ID does not match entity owner")
raise ValueError("ca_reference cannot be empty")
if len(ca_reference) > 100:
raise ValueError("ca_reference must be at most 100 characters")
# ------------------------
# Insertions
# ------------------------
def insert_creator(cursor, name, public_key): def insert_creator(cursor, name, public_key):
"""
Creators are persons with property 'creator' in the property table.
"""
cursor.execute( cursor.execute(
""" """
INSERT INTO entity (name, type, public_key, status, ca_reference) INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, 'active', NULL) VALUES (%s, 'creator', %s, NULL, 'active')
RETURNING id RETURNING id
""", """,
(name, public_key), (name, public_key)
) )
creator_id = cursor.fetchone()["id"] return cursor.fetchone()["id"]
# Mark as creator via property table (schema: property(id, property_name))
cursor.execute(
"""
INSERT INTO property (id, property_name)
VALUES (%s, %s)
ON CONFLICT (id, property_name) DO NOTHING
""",
(creator_id, "creator"),
)
log_change(cursor, f"Created creator entity {creator_id} with name {name}")
return creator_id
def enroll_person(cursor, name, public_key, creator_id): def enroll_person(cursor, name, public_key, creator_id):
ensure_entity_active(cursor, creator_id)
cursor.execute( cursor.execute(
""" "SELECT type, status FROM entity WHERE id=%s", (creator_id,)
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
VALUES (%s, 'person', %s, %s, 'active', NULL)
RETURNING id
""",
(name, public_key, creator_id),
) )
person_id = cursor.fetchone()["id"]
log_change(cursor, f"Enrolled person {person_id} under creator {creator_id}")
return person_id
def create_group(cursor, name, public_key, creator_id, ca_reference):
ensure_entity_active(cursor, creator_id)
_validate_ca_reference_for_group(ca_reference)
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
VALUES (%s, 'group', %s, %s, 'active', %s)
RETURNING id
""",
(name, public_key, creator_id, ca_reference),
)
group_id = cursor.fetchone()["id"]
log_change(
cursor,
f"Created group {group_id} under creator {creator_id} with ca_reference {ca_reference}",
)
return group_id
def get_entity(cursor, entity_id):
cursor.execute("SELECT * FROM entity WHERE id = %s", (entity_id,))
return cursor.fetchone()
def set_entity_status(cursor, entity_id, status, changed_by):
"""
Only active entities can change status. Once revoked, immutable.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute("UPDATE entity SET status = %s WHERE id = %s", (status, entity_id))
log_change(cursor, f"Set status of entity {entity_id} to {status} by {changed_by}")
def set_symmetrical_key(cursor, entity_id, key_value, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute(
"UPDATE entity SET symmetrical_key = %s WHERE id = %s",
(key_value, entity_id),
)
log_change(cursor, f"Set symmetrical_key for entity {entity_id} by {changed_by}")
def get_symmetrical_key(cursor, entity_id):
cursor.execute("SELECT symmetrical_key FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone() row = cursor.fetchone()
return row["symmetrical_key"] if row else None if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
def set_entity_keys(cursor, entity_id, public_key, changed_by):
ensure_entity_active(cursor, entity_id)
cursor.execute( cursor.execute(
"UPDATE entity SET public_key = %s WHERE id = %s", """
(public_key, entity_id), INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, %s, 'active')
RETURNING id
""",
(name, public_key, creator_id)
) )
log_change(cursor, f"Updated public key for entity {entity_id} by {changed_by}") return cursor.fetchone()["id"]
def create_group(cursor, name, public_key, creator_id):
cursor.execute(
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
)
row = cursor.fetchone()
if not row or row["type"] != "creator" or row["status"] != "active":
raise ValueError("Provided creator_id does not correspond to a valid active creator")
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'group', %s, %s, 'active')
RETURNING id
""",
(name, public_key, creator_id)
)
return cursor.fetchone()["id"]
def create_alias(cursor, person_id):
cursor.execute(
"SELECT id, type, public_key, status FROM entity WHERE id=%s", (person_id,)
)
row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Person not found or inactive")
if row["type"] != "person":
raise ValueError("Only persons can create aliases")
random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
cursor.execute(
"""
INSERT INTO entity (name, type, public_key, creator, status)
VALUES (%s, 'person', %s, %s, 'active')
RETURNING id
""",
(random_name, row["public_key"], person_id)
)
return cursor.fetchone()["id"]
# ------------------------
# Soft-delete / revocation
# ------------------------
def revoke_entity(cursor, entity_id, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET status=%s WHERE id=%s", ("revoked", entity_id)
)
# ------------------------
# Getters / Setters
# ------------------------
def get_entity(cursor, entity_id):
cursor.execute(
"SELECT * FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row
def get_entity_id(cursor, name):
cursor.execute(
"SELECT id FROM entity WHERE name=%s AND status='active'", (name,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["id"]
def get_entity_public_key(cursor, entity_id):
cursor.execute(
"SELECT public_key FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["public_key"]
def get_entity_name(cursor, entity_id):
cursor.execute(
"SELECT name FROM entity WHERE id=%s AND status='active'", (entity_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError("Entity not found or inactive")
return row["name"]
def set_entity_name(cursor, entity_id, new_name, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id))
def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id):
_verify_ownership(cursor, entity_id, requesting_creator_id)
cursor.execute(
"UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id)
)
def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id):
set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id)

View File

@ -1,32 +1,42 @@
from db_logging import log_change # ca_core/group_member.py
from entity import ensure_entity_active
def add_group_member(cursor, group_id: int, member_id: int, role: str):
# Verify group exists and is active
cursor.execute("SELECT type, status FROM entity WHERE id=%s", (group_id,))
row = cursor.fetchone()
if not row or row["status"] != "active" or row["type"] != "group":
raise ValueError("Invalid or inactive group")
def add_group_member(cursor, group_id, member_id, role): # Verify member exists and is active
ensure_entity_active(cursor, group_id) cursor.execute("SELECT status FROM entity WHERE id=%s", (member_id,))
ensure_entity_active(cursor, member_id) row = cursor.fetchone()
if not row or row["status"] != "active":
raise ValueError("Invalid or inactive member")
cursor.execute( cursor.execute(
""" "INSERT INTO group_member (group_id, member_id, role) VALUES (%s, %s, %s)",
INSERT INTO group_member (group_id, member_id, role)
VALUES (%s, %s, %s)
""",
(group_id, member_id, role) (group_id, member_id, role)
) )
log_change(
cursor, def remove_group_member(cursor, group_id: int, member_id: int):
f"Added member {member_id} to group {group_id} as {role}" cursor.execute(
"DELETE FROM group_member WHERE group_id=%s AND member_id=%s",
(group_id, member_id)
) )
def get_members_of_group(cursor, group_id): def get_groups_for_member(cursor, member_id: int):
cursor.execute( cursor.execute(
""" "SELECT group_id, role FROM group_member WHERE member_id=%s",
SELECT member_id, role (member_id,)
FROM group_member )
WHERE group_id = %s return cursor.fetchall()
""",
def get_members_of_group(cursor, group_id: int):
cursor.execute(
"SELECT member_id, role FROM group_member WHERE group_id=%s",
(group_id,) (group_id,)
) )
return cursor.fetchall() return cursor.fetchall()

View File

@ -1,77 +1,40 @@
from db_logging import log_change # ca_core/metadata.py
def set_name(cursor, name):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (name) VALUES (%s)",
(name,)
)
log_change(cursor, f"Updated metadata name to {name}")
def get_name(cursor): def get_name(cursor):
cursor.execute("SELECT name FROM metadata LIMIT 1") cursor.execute("SELECT name FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row["name"] if row else None return row['name'] if row else None
def set_comment(cursor, comment): def set_name(cursor, value):
cursor.execute("DELETE FROM metadata") cursor.execute("UPDATE metadata SET name=%s", (value,))
cursor.execute(
"INSERT INTO metadata (comment) VALUES (%s)",
(comment,)
)
log_change(cursor, f"Updated metadata comment to {comment}")
def get_comment(cursor): def get_comment(cursor):
cursor.execute("SELECT comment FROM metadata LIMIT 1") cursor.execute("SELECT comment FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row["comment"] if row else None return row['comment'] if row else None
def set_keys(cursor, public_key, private_key): def set_comment(cursor, value):
cursor.execute("DELETE FROM metadata") cursor.execute("UPDATE metadata SET comment=%s", (value,))
cursor.execute(
"""
INSERT INTO metadata (public_key, private_key)
VALUES (%s, %s)
""",
(public_key, private_key)
)
log_change(cursor, "Updated metadata keys")
def get_public_key(cursor): def get_public_key(cursor):
cursor.execute("SELECT public_key FROM metadata LIMIT 1") cursor.execute("SELECT public_key FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row["public_key"] if row else None return row['public_key'] if row else None
def get_private_key(cursor): def get_private_key(cursor):
cursor.execute("SELECT private_key FROM metadata LIMIT 1") cursor.execute("SELECT private_key FROM metadata LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
return row["private_key"] if row else None return row['private_key'] if row else None
def set_defense_p(cursor, defense_p: bool): def set_keys(cursor, public_key, private_key):
"""Set the metadata defense_p flag.
This table is treated as a singleton row at the application level.
Current convention in this codebase is to wipe and re-insert.
"""
cursor.execute("DELETE FROM metadata")
cursor.execute( cursor.execute(
"INSERT INTO metadata (defense_p) VALUES (%s)", "UPDATE metadata SET public_key=%s, private_key=%s",
(defense_p,) (public_key, private_key)
) )
log_change(cursor, f"Updated metadata defense_p to {defense_p}")
def get_defense_p(cursor) -> bool:
cursor.execute("SELECT defense_p FROM metadata LIMIT 1")
row = cursor.fetchone()
if not row or row["defense_p"] is None:
return False
return bool(row["defense_p"])

View File

@ -1,104 +1,29 @@
from db_logging import log_change # ca_core/property.py
from entity import ensure_entity_active
def _validate_validation_policy(validation_policy: str) -> str:
if validation_policy is None:
return "default"
if not isinstance(validation_policy, str):
raise TypeError("validation_policy must be a string")
vp = validation_policy.strip()
if not vp:
raise ValueError("validation_policy cannot be empty")
if len(vp) > 19:
raise ValueError("validation_policy must be at most 19 characters")
return vp
def _validate_source(source):
if source is None:
return None
if not isinstance(source, str):
raise TypeError("source must be a string or None")
s = source.strip()
if len(s) > 150:
raise ValueError("source must be at most 150 characters")
# Allow empty string -> treat as NULL for cleanliness
return s if s else None
def set_property(cursor, entity_id, property_name, validation_policy="default", source=None):
"""
Revoked entities are immutable: cannot add/update properties.
Schema: property(id, property_name, validation_policy, source)
- validation_policy: CHAR(19) NOT NULL DEFAULT 'default'
- source: VARCHAR(150) NULL
"""
ensure_entity_active(cursor, entity_id)
if not isinstance(property_name, str) or not property_name.strip():
raise ValueError("property_name must be a non-empty string")
if len(property_name) > 100:
raise ValueError("property_name must be at most 100 characters")
vp = _validate_validation_policy(validation_policy)
src = _validate_source(source)
def set_property(cursor, entity_id: int, property_name: str):
cursor.execute( cursor.execute(
""" """
INSERT INTO property (id, property_name, validation_policy, source) INSERT INTO property (id, property_name)
VALUES (%s, %s, %s, %s) VALUES (%s, %s)
ON CONFLICT (id, property_name) ON CONFLICT (id, property_name) DO NOTHING
DO UPDATE SET
validation_policy = EXCLUDED.validation_policy,
source = EXCLUDED.source
""", """,
(entity_id, property_name, vp, src), (entity_id, property_name)
)
log_change(
cursor,
f"Set property '{property_name}' for entity {entity_id} "
f"(validation_policy={vp}, source={src})",
) )
def get_properties(cursor, entity_id): def delete_property(cursor, entity_id: int, property_name: str):
"""
Returns a list of property_name values for the entity.
"""
cursor.execute( cursor.execute(
"SELECT property_name FROM property WHERE id = %s", "DELETE FROM property WHERE id=%s AND property_name=%s",
(entity_id,), (entity_id, property_name)
) )
rows = cursor.fetchall() if cursor.rowcount == 0:
return [r["property_name"] for r in rows] raise ValueError("Property not found")
def get_property(cursor, entity_id, property_name): def get_properties(cursor, entity_id: int):
"""
Returns a dict_row with keys: property_name, validation_policy, source
or None if not found.
"""
cursor.execute( cursor.execute(
""" "SELECT property_name FROM property WHERE id=%s",
SELECT property_name, validation_policy, source (entity_id,)
FROM property
WHERE id = %s AND property_name = %s
""",
(entity_id, property_name),
) )
return cursor.fetchone() return [row['property_name'] for row in cursor.fetchall()]
def delete_property(cursor, entity_id, property_name):
"""
Revoked entities are immutable: cannot delete properties.
"""
ensure_entity_active(cursor, entity_id)
cursor.execute(
"DELETE FROM property WHERE id = %s AND property_name = %s",
(entity_id, property_name),
)
log_change(cursor, f"Deleted property '{property_name}' for entity {entity_id}")

View File

@ -1,77 +1,62 @@
-- ------------------------ -- ------------------------
-- Metadata (singleton row; enforced at application level) -- Metadata table (singleton)
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS metadata; DROP TABLE IF EXISTS metadata;
CREATE TABLE metadata( CREATE TABLE metadata (
name VARCHAR(50), name VARCHAR(50),
comment VARCHAR(200), comment VARCHAR(200),
private_key VARCHAR(500), private_key VARCHAR(500),
public_key VARCHAR(500), public_key VARCHAR(500)
defense_p BOOLEAN NOT NULL DEFAULT false
); );
INSERT INTO metadata DEFAULT VALUES;
-- ------------------------ -- ------------------------
-- Entity -- Entity table
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS entity CASCADE; DROP TABLE IF EXISTS entity CASCADE;
CREATE TABLE entity( CREATE TABLE entity (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(), creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id), creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL, name VARCHAR(100) NOT NULL,
type VARCHAR(20) NOT NULL, -- person, group, device type VARCHAR(10) NOT NULL DEFAULT 'person', -- 'creator', 'person', 'group', 'device'
symmetrical_key VARCHAR(100), geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL, public_key VARCHAR(300) NOT NULL,
ca_reference VARCHAR(100),
status VARCHAR(20) NOT NULL DEFAULT 'active',
expiration DATE, expiration DATE,
CONSTRAINT entity_ca_reference_check CHECK ( status VARCHAR(10) NOT NULL DEFAULT 'active'
(type = 'group' AND ca_reference IS NOT NULL)
OR
(type <> 'group' AND ca_reference IS NULL)
)
); );
-- Indexes
CREATE INDEX idx_entity_name ON entity(name); CREATE INDEX idx_entity_name ON entity(name);
CREATE INDEX idx_entity_expiration ON entity(expiration);
ALTER TABLE entity ADD CONSTRAINT entity_name_unique UNIQUE (name);
-- ------------------------ -- ------------------------
-- Group Member -- Group Member table
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS group_member; DROP TABLE IF EXISTS group_member;
CREATE TABLE group_member( CREATE TABLE group_member (
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10), role VARCHAR(10),
PRIMARY KEY (group_id, member_id) PRIMARY KEY (group_id, member_id)
); );
CREATE INDEX idx_group_member ON group_member(member_id, group_id); CREATE INDEX idx_group_member_member_group ON group_member(member_id, group_id);
-- ------------------------ -- ------------------------
-- Property -- Property table
-- ------------------------ -- ------------------------
DROP TABLE IF EXISTS property; DROP TABLE IF EXISTS property;
CREATE TABLE property( CREATE TABLE property (
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE, id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100) NOT NULL, property_name VARCHAR(100),
validation_policy CHAR(19) NOT NULL DEFAULT 'default',
source VARCHAR(150),
PRIMARY KEY (id, property_name) PRIMARY KEY (id, property_name)
); );
-- ------------------------
-- Log Table
-- ------------------------
DROP TABLE IF EXISTS log;
CREATE TABLE log(
id SERIAL PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
entry TEXT NOT NULL
);
CREATE INDEX idx_log_ts ON log(ts);

48
create_tables.sql.old Normal file
View File

@ -0,0 +1,48 @@
drop table metadata;
create table metadata(
name varchar(50),
comment varchar(200),
private_key varchar(500),
public_key varchar(500)
);
insert into metadata default vALUES;
drop table entity cascade;
create table entity(
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name varchar(100) NOT NULL,
group_p BOOLEAN NOT NULL,
geo_offset BIGINT,
--private_key VARCHAR(300) NOT NULL,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
);
CREATE INDEX idx_entity_name ON entity(name);
drop table group_member;
create table group_member(
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id,person_id)
);
CREATE INDEX idx_group_member_person_group ON group_member(person_id, group_id);
drop table property;
create table property(
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
);

View File

@ -1,20 +1,15 @@
import unittest import unittest
import sys
from pathlib import Path from pathlib import Path
import sys
import psycopg import psycopg
# Add code folder to path
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path)) sys.path.insert(0, str(code_path))
import entity # the rewritten entity.py module
import entity
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestEntityFunctions(unittest.TestCase): class TestEntityFunctions(unittest.TestCase):
@ -23,62 +18,56 @@ class TestEntityFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
@classmethod # Ensure table exists
def tearDownClass(cls): cls.cur.execute("""
cls.cur.close() CREATE TABLE IF NOT EXISTS entity (
cls.conn.close() id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.conn.commit()
def setUp(self): def setUp(self):
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
self.conn.rollback() self.conn.rollback()
# --- Insert and read ---
def test_insert_creator_and_get(self): def test_insert_creator_and_get(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
row = entity.get_entity(self.cur, creator_id) row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["name"], "Creator1") self.assertEqual(row["name"], "Creator1")
self.assertIsNone(row["ca_reference"]) self.assertEqual(row["type"], "creator")
log_entry = get_last_log(self.cur).lower()
self.assertIn("creator entity", log_entry)
self.assertIn(str(creator_id), log_entry)
def test_enroll_person(self): def test_enroll_person(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, person_id), "Person1")
self.assertEqual(entity.get_entity(self.cur, person_id)["type"], "person")
row = entity.get_entity(self.cur, person_id) def test_create_group(self):
self.assertIsNone(row["ca_reference"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("enrolled person", log_entry)
self.assertIn(str(person_id), log_entry)
def test_create_group_requires_ca_reference(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id)
self.assertEqual(entity.get_entity_name(self.cur, group_id), "Group1")
self.assertEqual(entity.get_entity(self.cur, group_id)["type"], "group")
with self.assertRaises(ValueError): # --- Revocation ---
entity.create_group(self.cur, "GroupMissingRef", "pubkey_group", creator_id, None) def test_revoke_entity(self):
def test_create_group_sets_ca_reference(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4") creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-REF-1") entity.revoke_entity(self.cur, creator_id, creator_id)
with self.assertRaises(ValueError):
entity.get_entity(self.cur, creator_id)
row = entity.get_entity(self.cur, group_id)
self.assertEqual(row["ca_reference"], "CA-REF-1")
log_entry = get_last_log(self.cur).lower() if __name__ == "__main__":
self.assertIn("created group", log_entry) unittest.main()
self.assertIn(str(group_id), log_entry)
self.assertIn("ca-ref-1".lower(), log_entry)
def test_set_and_get_symmetrical_key(self):
creator_id = entity.insert_creator(self.cur, "CreatorSym", "pubkey_sym")
entity.set_symmetrical_key(self.cur, creator_id, "symkey123", creator_id)
row = entity.get_entity(self.cur, creator_id)
self.assertEqual(row["symmetrical_key"], "symkey123")
log_entry = get_last_log(self.cur).lower()
self.assertIn("symmetrical_key", log_entry)

View File

@ -1,6 +1,6 @@
import unittest import unittest
import sys
from pathlib import Path from pathlib import Path
import sys
import psycopg import psycopg
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
@ -11,12 +11,6 @@ import group_member
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestGroupFunctions(unittest.TestCase): class TestGroupFunctions(unittest.TestCase):
@classmethod @classmethod
@ -24,65 +18,78 @@ class TestGroupFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
@classmethod # Ensure tables exist
def tearDownClass(cls): cls.cur.execute("""
cls.cur.close() CREATE TABLE IF NOT EXISTS entity (
cls.conn.close() id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS group_member (
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
role VARCHAR(10),
PRIMARY KEY (group_id, member_id)
)
""")
cls.conn.commit()
def setUp(self): def setUp(self):
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
self.conn.rollback() self.conn.rollback()
# --- Group membership tests ---
def test_add_and_get_members(self): def test_add_and_get_members(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id)
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id) person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-GROUP-1") device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device")
# Add members
group_member.add_group_member(self.cur, group_id, person_id, "member") group_member.add_group_member(self.cur, group_id, person_id, "member")
group_member.add_group_member(self.cur, group_id, device_id, "device")
members = group_member.get_members_of_group(self.cur, group_id) members = group_member.get_members_of_group(self.cur, group_id)
member_ids = [m["member_id"] for m in members]
self.assertTrue( self.assertIn(person_id, member_ids)
any(m["member_id"] == person_id and m["role"] == "member" self.assertIn(device_id, member_ids)
for m in members)
)
log_entry = get_last_log(self.cur).lower()
self.assertIn("added member", log_entry)
self.assertIn(str(group_id), log_entry)
def test_nested_groups(self): def test_nested_groups(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_parent", creator_id, "CA-PARENT") parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id)
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_child", creator_id, "CA-CHILD") child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id)
# Add child group as member of parent
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup") group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
members = group_member.get_members_of_group(self.cur, parent_group) members = group_member.get_members_of_group(self.cur, parent_group)
self.assertEqual(members[0]["member_id"], child_group)
self.assertTrue( self.assertEqual(members[0]["role"], "subgroup")
any(m["member_id"] == child_group and m["role"] == "subgroup"
for m in members)
)
def test_revoked_group_cannot_accept_members(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "RevokedGroup", "pubkey_group", creator_id, "CA-REVOKED-GROUP")
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person", creator_id)
entity.set_entity_status(self.cur, group_id, "revoked", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_revoked_member_cannot_be_added(self): def test_revoked_member_cannot_be_added(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4") creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
group_id = entity.create_group(self.cur, "ActiveGroup", "pubkey_group", creator_id, "CA-ACTIVE-GROUP") group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id)
person_id = entity.enroll_person(self.cur, "RevokedPerson", "pubkey_person", creator_id) person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
entity.revoke_entity(self.cur, person_id, creator_id)
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member") group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_revoked_group_cannot_accept_members(self):
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id)
entity.revoke_entity(self.cur, group_id, creator_id)
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
if __name__ == "__main__":
unittest.main()

54
tests/test_metadata.py Normal file → Executable file
View File

@ -3,64 +3,74 @@ import sys
from pathlib import Path from pathlib import Path
import psycopg import psycopg
# Add the code directory to Python path
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path)) sys.path.insert(0, str(code_path))
import metadata import metadata # your metadata.py module
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestMetadataFunctions(unittest.TestCase): class TestMetadataFunctions(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Connect to the database
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
# Ensure table exists and has exactly one row
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS metadata (
name VARCHAR(50),
comment VARCHAR(200),
private_key VARCHAR(500),
public_key VARCHAR(500)
)
""")
cls.cur.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cls.cur.fetchone()
if row['cnt'] == 0:
cls.cur.execute("INSERT INTO metadata DEFAULT VALUES")
cls.conn.commit()
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.cur.close() cls.cur.close()
cls.conn.close() cls.conn.close()
def setUp(self): def setUp(self):
# Begin transaction for each test
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
# Rollback after each test
self.conn.rollback() self.conn.rollback()
# --- Test name field ---
def test_set_and_get_name(self): def test_set_and_get_name(self):
metadata.set_name(self.cur, "AppName") metadata.set_name(self.cur, "AppName")
self.assertEqual(metadata.get_name(self.cur), "AppName") self.assertEqual(metadata.get_name(self.cur), "AppName")
log_entry = get_last_log(self.cur).lower() # --- Test comment field ---
self.assertIn("metadata name", log_entry)
self.assertIn("appname", log_entry)
def test_set_and_get_comment(self): def test_set_and_get_comment(self):
metadata.set_comment(self.cur, "Test comment") metadata.set_comment(self.cur, "Test comment")
self.assertEqual(metadata.get_comment(self.cur), "Test comment") self.assertEqual(metadata.get_comment(self.cur), "Test comment")
log_entry = get_last_log(self.cur).lower() # --- Test keys ---
self.assertIn("metadata comment", log_entry)
def test_set_and_get_keys(self): def test_set_and_get_keys(self):
metadata.set_keys(self.cur, "pubkey123", "privkey456") metadata.set_keys(self.cur, "pubkey123", "privkey456")
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123") self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
self.assertEqual(metadata.get_private_key(self.cur), "privkey456") self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
log_entry = get_last_log(self.cur).lower() # --- Test keys overwrite ---
self.assertIn("metadata keys", log_entry) def test_keys_overwrite(self):
metadata.set_keys(self.cur, "pub1", "priv1")
metadata.set_keys(self.cur, "pub2", "priv2")
self.assertEqual(metadata.get_public_key(self.cur), "pub2")
self.assertEqual(metadata.get_private_key(self.cur), "priv2")
def test_set_and_get_defense_p(self): if __name__ == "__main__":
metadata.set_defense_p(self.cur, True) unittest.main()
self.assertEqual(metadata.get_defense_p(self.cur), True)
log_entry = get_last_log(self.cur).lower()
self.assertIn("defense_p", log_entry)

View File

@ -1,9 +1,8 @@
import unittest import unittest
import sys
from pathlib import Path from pathlib import Path
import sys
import psycopg import psycopg
# Add core directory to path
code_path = Path(__file__).parent.parent / "ca_core" code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path)) sys.path.insert(0, str(code_path))
@ -12,13 +11,6 @@ import property
DBNAME = "ca" DBNAME = "ca"
def get_last_log(cursor):
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
return row["entry"] if row else ""
class TestPropertyFunctions(unittest.TestCase): class TestPropertyFunctions(unittest.TestCase):
@classmethod @classmethod
@ -26,102 +18,68 @@ class TestPropertyFunctions(unittest.TestCase):
cls.conn = psycopg.connect(f"dbname={DBNAME}") cls.conn = psycopg.connect(f"dbname={DBNAME}")
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row) cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
@classmethod # Ensure entity table exists
def tearDownClass(cls): cls.cur.execute("""
cls.cur.close() CREATE TABLE IF NOT EXISTS entity (
cls.conn.close() id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
creator INT REFERENCES entity(id),
name VARCHAR(100) NOT NULL,
type VARCHAR(10) NOT NULL DEFAULT 'person',
geo_offset BIGINT,
public_key VARCHAR(300) NOT NULL,
expiration DATE,
status VARCHAR(10) NOT NULL DEFAULT 'active'
)
""")
cls.cur.execute("""
CREATE TABLE IF NOT EXISTS property (
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
property_name VARCHAR(100),
PRIMARY KEY (id, property_name)
)
""")
cls.conn.commit()
def setUp(self): def setUp(self):
self.conn.rollback() self.conn.rollback()
self.conn.autocommit = False
def tearDown(self): def tearDown(self):
self.conn.rollback() self.conn.rollback()
# ------------------------------------------------------------
# Basic property set/get
# ------------------------------------------------------------
def test_set_and_get_property(self): def test_set_and_get_property(self):
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1") creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
person_id = entity.enroll_person( person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
self.cur, "Person1", "pubkey_person", creator_id property.set_property(self.cur, person_id, "email")
)
property.set_property(self.cur, person_id, "prop1")
props = property.get_properties(self.cur, person_id) props = property.get_properties(self.cur, person_id)
self.assertIn("prop1", props) self.assertIn("email", props)
details = property.get_property(self.cur, person_id, "prop1")
self.assertIsNotNone(details)
# CHAR(19) is space-padded in PostgreSQL; strip for comparison.
self.assertEqual(details["validation_policy"].strip(), "default")
self.assertIsNone(details["source"])
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop1", log_entry)
# ------------------------------------------------------------
# Delete property
# ------------------------------------------------------------
def test_delete_property(self): def test_delete_property(self):
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2") creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
person_id = entity.enroll_person( person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
self.cur, "Person2", "pubkey_person", creator_id property.set_property(self.cur, person_id, "phone")
) property.delete_property(self.cur, person_id, "phone")
property.set_property(self.cur, person_id, "prop2")
property.delete_property(self.cur, person_id, "prop2")
props = property.get_properties(self.cur, person_id) props = property.get_properties(self.cur, person_id)
self.assertNotIn("prop2", props) self.assertNotIn("phone", props)
log_entry = get_last_log(self.cur).lower()
self.assertIn("deleted property", log_entry)
self.assertIn("prop2", log_entry)
def test_set_property_with_policy_and_source(self):
creator_id = entity.insert_creator(self.cur, "CreatorPolicy", "pubkey_policy")
person_id = entity.enroll_person(
self.cur, "PersonPolicy", "pubkey_person", creator_id
)
property.set_property(
self.cur,
person_id,
"prop_policy",
validation_policy="strict",
source="unit-test",
)
details = property.get_property(self.cur, person_id, "prop_policy")
self.assertIsNotNone(details)
self.assertEqual(details["validation_policy"].strip(), "strict")
self.assertEqual(details["source"], "unit-test")
log_entry = get_last_log(self.cur).lower()
self.assertIn("set property", log_entry)
self.assertIn("prop_policy", log_entry)
self.assertIn("strict", log_entry)
# ------------------------------------------------------------
# Immutability: revoked entity cannot mutate properties
# ------------------------------------------------------------
def test_revoked_entity_has_no_properties(self): def test_revoked_entity_has_no_properties(self):
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3") creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
person_id = entity.enroll_person( person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
self.cur, "Person3", "pubkey_person", creator_id property.set_property(self.cur, person_id, "address")
entity.revoke_entity(self.cur, person_id, creator_id)
props = property.get_properties(self.cur, person_id)
# Optional: you can decide whether to return empty or raise; here we return all properties regardless of status
# If you want to ignore revoked entities:
cursor = self.cur
cursor.execute(
"SELECT property_name FROM property p JOIN entity e ON e.id=p.id WHERE e.id=%s AND e.status='active'",
(person_id,)
) )
props_active = [r['property_name'] for r in cursor.fetchall()]
self.assertNotIn("address", props_active)
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
with self.assertRaises(ValueError): if __name__ == "__main__":
property.set_property(self.cur, person_id, "prop3") unittest.main()
with self.assertRaises(ValueError):
property.delete_property(self.cur, person_id, "prop3")