Compare commits
No commits in common. "a1be210f589a7ef1d12b987942968cf431c8dac2" and "2678737d5ebc891e62dbb419ac5d1c630a7f8c9b" have entirely different histories.
a1be210f58
...
2678737d5e
|
|
@ -1,167 +0,0 @@
|
||||||
CA/PKI Backend Project Context
|
|
||||||
Stack
|
|
||||||
|
|
||||||
Python 3 + psycopg (dict_row cursors)
|
|
||||||
|
|
||||||
PostgreSQL database: ca
|
|
||||||
|
|
||||||
Unit tests: unittest (python3 -m unittest discover)
|
|
||||||
|
|
||||||
Database Schema (current assumptions)
|
|
||||||
entity
|
|
||||||
|
|
||||||
id INT identity PK
|
|
||||||
|
|
||||||
creation_ts TIMESTAMPTZ default now()
|
|
||||||
|
|
||||||
creator INT FK → entity(id) (the entity that created this one; nullable)
|
|
||||||
|
|
||||||
name VARCHAR(100) NOT NULL
|
|
||||||
|
|
||||||
type VARCHAR(...) NOT NULL (e.g. person, group, device, alias)
|
|
||||||
|
|
||||||
public_key VARCHAR(300) NOT NULL
|
|
||||||
|
|
||||||
symmetrical_key VARCHAR(100) NULL
|
|
||||||
|
|
||||||
status VARCHAR(...) NOT NULL default 'active' (values: 'active', 'revoked')
|
|
||||||
|
|
||||||
expiration DATE NULL
|
|
||||||
|
|
||||||
Index on entity(name) (and other indexes as needed)
|
|
||||||
|
|
||||||
group_member
|
|
||||||
|
|
||||||
group_id INT FK → entity(id) ON DELETE CASCADE
|
|
||||||
|
|
||||||
member_id INT FK → entity(id) ON DELETE CASCADE
|
|
||||||
|
|
||||||
role VARCHAR(10)
|
|
||||||
|
|
||||||
PK (group_id, member_id)
|
|
||||||
|
|
||||||
Index (member_id, group_id)
|
|
||||||
|
|
||||||
Groups can contain any entity type, including other groups and devices.
|
|
||||||
|
|
||||||
property
|
|
||||||
|
|
||||||
Columns: (id INT FK → entity(id), property_name VARCHAR(100))
|
|
||||||
|
|
||||||
PK (id, property_name)
|
|
||||||
|
|
||||||
Used for flags/roles such as "creator"
|
|
||||||
|
|
||||||
metadata
|
|
||||||
|
|
||||||
Intended “singleton row” table, enforced at application level
|
|
||||||
|
|
||||||
Columns: name, comment, private_key, public_key
|
|
||||||
|
|
||||||
log
|
|
||||||
|
|
||||||
id SERIAL PK
|
|
||||||
|
|
||||||
ts TIMESTAMPTZ default now()
|
|
||||||
|
|
||||||
entry TEXT NOT NULL
|
|
||||||
|
|
||||||
Every API mutation must log one row here.
|
|
||||||
|
|
||||||
Core Business Rules
|
|
||||||
|
|
||||||
Creators are not an entity type
|
|
||||||
|
|
||||||
creator is a property (property_name='creator') on a person entity.
|
|
||||||
|
|
||||||
insert_creator() creates a person entity and inserts the creator property.
|
|
||||||
|
|
||||||
Revoked entities are immutable
|
|
||||||
|
|
||||||
Any mutation on an entity requires ensure_entity_active(cursor, entity_id).
|
|
||||||
|
|
||||||
Revoked entities cannot:
|
|
||||||
|
|
||||||
Join groups or accept members
|
|
||||||
|
|
||||||
Add/delete properties
|
|
||||||
|
|
||||||
Change keys (public_key, symmetrical_key)
|
|
||||||
|
|
||||||
Change status again
|
|
||||||
|
|
||||||
Logging
|
|
||||||
|
|
||||||
All changes to entity, group_member, property, metadata must call log_change(cursor, "...")
|
|
||||||
|
|
||||||
Logging happens inside the same transaction (no extra commits).
|
|
||||||
|
|
||||||
Python Modules (current structure)
|
|
||||||
|
|
||||||
ca_core/entity.py
|
|
||||||
|
|
||||||
Must provide:
|
|
||||||
|
|
||||||
ensure_entity_active(cursor, entity_id)
|
|
||||||
|
|
||||||
insert_creator(cursor, name, public_key)
|
|
||||||
|
|
||||||
enroll_person(cursor, name, public_key, creator_id)
|
|
||||||
|
|
||||||
create_group(cursor, name, public_key, creator_id)
|
|
||||||
|
|
||||||
create_alias(cursor, target_entity_id)
|
|
||||||
|
|
||||||
get_entity(cursor, entity_id)
|
|
||||||
|
|
||||||
set_entity_status(cursor, entity_id, status, changed_by) (requires active entity)
|
|
||||||
|
|
||||||
set_entity_keys(cursor, entity_id, public_key, changed_by) (active-only)
|
|
||||||
|
|
||||||
set_symmetrical_key(cursor, entity_id, key, changed_by) (active-only)
|
|
||||||
|
|
||||||
get_symmetrical_key(cursor, entity_id)
|
|
||||||
|
|
||||||
ca_core/group_member.py
|
|
||||||
|
|
||||||
Uses member_id (not person_id)
|
|
||||||
|
|
||||||
Must prevent adding revoked groups/members (via ensure_entity_active)
|
|
||||||
|
|
||||||
Logs add/remove membership
|
|
||||||
|
|
||||||
ca_core/property.py
|
|
||||||
|
|
||||||
Table is property(id, property_name) (NOT entity_id/name)
|
|
||||||
|
|
||||||
Must reject mutations if entity revoked (immutability)
|
|
||||||
|
|
||||||
Logs set/delete property
|
|
||||||
|
|
||||||
ca_core/metadata.py
|
|
||||||
|
|
||||||
Updates metadata fields and logs changes
|
|
||||||
|
|
||||||
ca_core/db_logging.py
|
|
||||||
|
|
||||||
log_change(cursor, message: str) inserts into log(entry)
|
|
||||||
|
|
||||||
Tests
|
|
||||||
|
|
||||||
tests/test_entity.py, tests/test_group.py, tests/test_property.py, tests/test_metadata.py
|
|
||||||
|
|
||||||
Tests verify:
|
|
||||||
|
|
||||||
Core behaviors (create, enroll, group membership, revoke immutability)
|
|
||||||
|
|
||||||
Log entry is created for mutations (case-insensitive substring checks)
|
|
||||||
|
|
||||||
Run via:
|
|
||||||
|
|
||||||
python3 -m unittest discover
|
|
||||||
|
|
||||||
Known gotchas
|
|
||||||
|
|
||||||
Avoid naming a module logging.py (conflicts with stdlib). Use db_logging.py.
|
|
||||||
|
|
||||||
Schema and code must stay aligned (e.g., property.id/property_name, group_member.member_id).
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,9 +0,0 @@
|
||||||
# db_logging.py
|
|
||||||
|
|
||||||
def log_change(cursor, message: str):
|
|
||||||
"""Insert a log entry into the log table."""
|
|
||||||
cursor.execute(
|
|
||||||
"INSERT INTO log (entry) VALUES (%s)",
|
|
||||||
(message,)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
@ -1,132 +1,169 @@
|
||||||
from db_logging import log_change
|
import random
|
||||||
|
import string
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
def ensure_entity_active(cursor, entity_id):
|
# Helper for ownership checks
|
||||||
"""
|
# ------------------------
|
||||||
Ensure an entity exists and is active.
|
def _verify_ownership(cursor, entity_id, requesting_creator_id):
|
||||||
Revoked entities are immutable.
|
cursor.execute(
|
||||||
"""
|
"SELECT id, creator, type, status FROM entity WHERE id=%s", (entity_id,)
|
||||||
cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,))
|
)
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
if row is None:
|
if not row or row["status"] != "active":
|
||||||
raise ValueError("Entity does not exist")
|
raise ValueError("Entity not found or inactive")
|
||||||
if row["status"] != "active":
|
owner_id = row["creator"]
|
||||||
raise ValueError("Entity is not active")
|
entity_type = row["type"]
|
||||||
|
entity_id_db = row["id"]
|
||||||
|
|
||||||
def _validate_ca_reference_for_group(ca_reference):
|
if entity_type == "creator":
|
||||||
if ca_reference is None:
|
if requesting_creator_id != entity_id_db:
|
||||||
raise ValueError("ca_reference is required for groups")
|
raise ValueError("Creator ID does not match entity owner")
|
||||||
if not isinstance(ca_reference, str):
|
else:
|
||||||
raise ValueError("ca_reference must be a string")
|
if requesting_creator_id != owner_id:
|
||||||
if len(ca_reference) == 0:
|
raise ValueError("Creator ID does not match entity owner")
|
||||||
raise ValueError("ca_reference cannot be empty")
|
|
||||||
if len(ca_reference) > 100:
|
|
||||||
raise ValueError("ca_reference must be at most 100 characters")
|
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Insertions
|
||||||
|
# ------------------------
|
||||||
def insert_creator(cursor, name, public_key):
|
def insert_creator(cursor, name, public_key):
|
||||||
"""
|
|
||||||
Creators are persons with property 'creator' in the property table.
|
|
||||||
"""
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO entity (name, type, public_key, status, ca_reference)
|
INSERT INTO entity (name, type, public_key, creator, status)
|
||||||
VALUES (%s, 'person', %s, 'active', NULL)
|
VALUES (%s, 'creator', %s, NULL, 'active')
|
||||||
RETURNING id
|
RETURNING id
|
||||||
""",
|
""",
|
||||||
(name, public_key),
|
(name, public_key)
|
||||||
)
|
)
|
||||||
creator_id = cursor.fetchone()["id"]
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
# Mark as creator via property table (schema: property(id, property_name))
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO property (id, property_name)
|
|
||||||
VALUES (%s, %s)
|
|
||||||
ON CONFLICT (id, property_name) DO NOTHING
|
|
||||||
""",
|
|
||||||
(creator_id, "creator"),
|
|
||||||
)
|
|
||||||
|
|
||||||
log_change(cursor, f"Created creator entity {creator_id} with name {name}")
|
|
||||||
return creator_id
|
|
||||||
|
|
||||||
|
|
||||||
def enroll_person(cursor, name, public_key, creator_id):
|
def enroll_person(cursor, name, public_key, creator_id):
|
||||||
ensure_entity_active(cursor, creator_id)
|
|
||||||
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
|
||||||
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
|
|
||||||
VALUES (%s, 'person', %s, %s, 'active', NULL)
|
|
||||||
RETURNING id
|
|
||||||
""",
|
|
||||||
(name, public_key, creator_id),
|
|
||||||
)
|
)
|
||||||
person_id = cursor.fetchone()["id"]
|
|
||||||
|
|
||||||
log_change(cursor, f"Enrolled person {person_id} under creator {creator_id}")
|
|
||||||
return person_id
|
|
||||||
|
|
||||||
|
|
||||||
def create_group(cursor, name, public_key, creator_id, ca_reference):
|
|
||||||
ensure_entity_active(cursor, creator_id)
|
|
||||||
_validate_ca_reference_for_group(ca_reference)
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO entity (name, type, public_key, creator, status, ca_reference)
|
|
||||||
VALUES (%s, 'group', %s, %s, 'active', %s)
|
|
||||||
RETURNING id
|
|
||||||
""",
|
|
||||||
(name, public_key, creator_id, ca_reference),
|
|
||||||
)
|
|
||||||
group_id = cursor.fetchone()["id"]
|
|
||||||
|
|
||||||
log_change(
|
|
||||||
cursor,
|
|
||||||
f"Created group {group_id} under creator {creator_id} with ca_reference {ca_reference}",
|
|
||||||
)
|
|
||||||
return group_id
|
|
||||||
|
|
||||||
|
|
||||||
def get_entity(cursor, entity_id):
|
|
||||||
cursor.execute("SELECT * FROM entity WHERE id = %s", (entity_id,))
|
|
||||||
return cursor.fetchone()
|
|
||||||
|
|
||||||
|
|
||||||
def set_entity_status(cursor, entity_id, status, changed_by):
|
|
||||||
"""
|
|
||||||
Only active entities can change status. Once revoked, immutable.
|
|
||||||
"""
|
|
||||||
ensure_entity_active(cursor, entity_id)
|
|
||||||
|
|
||||||
cursor.execute("UPDATE entity SET status = %s WHERE id = %s", (status, entity_id))
|
|
||||||
log_change(cursor, f"Set status of entity {entity_id} to {status} by {changed_by}")
|
|
||||||
|
|
||||||
|
|
||||||
def set_symmetrical_key(cursor, entity_id, key_value, changed_by):
|
|
||||||
ensure_entity_active(cursor, entity_id)
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"UPDATE entity SET symmetrical_key = %s WHERE id = %s",
|
|
||||||
(key_value, entity_id),
|
|
||||||
)
|
|
||||||
log_change(cursor, f"Set symmetrical_key for entity {entity_id} by {changed_by}")
|
|
||||||
|
|
||||||
|
|
||||||
def get_symmetrical_key(cursor, entity_id):
|
|
||||||
cursor.execute("SELECT symmetrical_key FROM entity WHERE id = %s", (entity_id,))
|
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
return row["symmetrical_key"] if row else None
|
if not row or row["type"] != "creator" or row["status"] != "active":
|
||||||
|
raise ValueError("Provided creator_id does not correspond to a valid active creator")
|
||||||
|
|
||||||
def set_entity_keys(cursor, entity_id, public_key, changed_by):
|
|
||||||
ensure_entity_active(cursor, entity_id)
|
|
||||||
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"UPDATE entity SET public_key = %s WHERE id = %s",
|
"""
|
||||||
(public_key, entity_id),
|
INSERT INTO entity (name, type, public_key, creator, status)
|
||||||
|
VALUES (%s, 'person', %s, %s, 'active')
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(name, public_key, creator_id)
|
||||||
)
|
)
|
||||||
log_change(cursor, f"Updated public key for entity {entity_id} by {changed_by}")
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def create_group(cursor, name, public_key, creator_id):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT type, status FROM entity WHERE id=%s", (creator_id,)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row or row["type"] != "creator" or row["status"] != "active":
|
||||||
|
raise ValueError("Provided creator_id does not correspond to a valid active creator")
|
||||||
|
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO entity (name, type, public_key, creator, status)
|
||||||
|
VALUES (%s, 'group', %s, %s, 'active')
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(name, public_key, creator_id)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def create_alias(cursor, person_id):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT id, type, public_key, status FROM entity WHERE id=%s", (person_id,)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row or row["status"] != "active":
|
||||||
|
raise ValueError("Person not found or inactive")
|
||||||
|
if row["type"] != "person":
|
||||||
|
raise ValueError("Only persons can create aliases")
|
||||||
|
|
||||||
|
random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
|
||||||
|
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO entity (name, type, public_key, creator, status)
|
||||||
|
VALUES (%s, 'person', %s, %s, 'active')
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(random_name, row["public_key"], person_id)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Soft-delete / revocation
|
||||||
|
# ------------------------
|
||||||
|
def revoke_entity(cursor, entity_id, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, entity_id, requesting_creator_id)
|
||||||
|
cursor.execute(
|
||||||
|
"UPDATE entity SET status=%s WHERE id=%s", ("revoked", entity_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------
|
||||||
|
# Getters / Setters
|
||||||
|
# ------------------------
|
||||||
|
def get_entity(cursor, entity_id):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT * FROM entity WHERE id=%s AND status='active'", (entity_id,)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found or inactive")
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def get_entity_id(cursor, name):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT id FROM entity WHERE name=%s AND status='active'", (name,)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found or inactive")
|
||||||
|
return row["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_entity_public_key(cursor, entity_id):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT public_key FROM entity WHERE id=%s AND status='active'", (entity_id,)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found or inactive")
|
||||||
|
return row["public_key"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_entity_name(cursor, entity_id):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT name FROM entity WHERE id=%s AND status='active'", (entity_id,)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
raise ValueError("Entity not found or inactive")
|
||||||
|
return row["name"]
|
||||||
|
|
||||||
|
|
||||||
|
def set_entity_name(cursor, entity_id, new_name, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, entity_id, requesting_creator_id)
|
||||||
|
cursor.execute("UPDATE entity SET name=%s WHERE id=%s", (new_name, entity_id))
|
||||||
|
|
||||||
|
|
||||||
|
def set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id):
|
||||||
|
_verify_ownership(cursor, entity_id, requesting_creator_id)
|
||||||
|
cursor.execute(
|
||||||
|
"UPDATE entity SET public_key=%s WHERE id=%s", (public_key, entity_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def set_entity_keys(cursor, entity_id, public_key, requesting_creator_id):
|
||||||
|
set_entity_public_key(cursor, entity_id, public_key, requesting_creator_id)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,32 +1,42 @@
|
||||||
from db_logging import log_change
|
# ca_core/group_member.py
|
||||||
from entity import ensure_entity_active
|
|
||||||
|
|
||||||
|
def add_group_member(cursor, group_id: int, member_id: int, role: str):
|
||||||
|
# Verify group exists and is active
|
||||||
|
cursor.execute("SELECT type, status FROM entity WHERE id=%s", (group_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if not row or row["status"] != "active" or row["type"] != "group":
|
||||||
|
raise ValueError("Invalid or inactive group")
|
||||||
|
|
||||||
def add_group_member(cursor, group_id, member_id, role):
|
# Verify member exists and is active
|
||||||
ensure_entity_active(cursor, group_id)
|
cursor.execute("SELECT status FROM entity WHERE id=%s", (member_id,))
|
||||||
ensure_entity_active(cursor, member_id)
|
row = cursor.fetchone()
|
||||||
|
if not row or row["status"] != "active":
|
||||||
|
raise ValueError("Invalid or inactive member")
|
||||||
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"INSERT INTO group_member (group_id, member_id, role) VALUES (%s, %s, %s)",
|
||||||
INSERT INTO group_member (group_id, member_id, role)
|
|
||||||
VALUES (%s, %s, %s)
|
|
||||||
""",
|
|
||||||
(group_id, member_id, role)
|
(group_id, member_id, role)
|
||||||
)
|
)
|
||||||
|
|
||||||
log_change(
|
|
||||||
cursor,
|
def remove_group_member(cursor, group_id: int, member_id: int):
|
||||||
f"Added member {member_id} to group {group_id} as {role}"
|
cursor.execute(
|
||||||
|
"DELETE FROM group_member WHERE group_id=%s AND member_id=%s",
|
||||||
|
(group_id, member_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_members_of_group(cursor, group_id):
|
def get_groups_for_member(cursor, member_id: int):
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"SELECT group_id, role FROM group_member WHERE member_id=%s",
|
||||||
SELECT member_id, role
|
(member_id,)
|
||||||
FROM group_member
|
)
|
||||||
WHERE group_id = %s
|
return cursor.fetchall()
|
||||||
""",
|
|
||||||
|
|
||||||
|
def get_members_of_group(cursor, group_id: int):
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT member_id, role FROM group_member WHERE group_id=%s",
|
||||||
(group_id,)
|
(group_id,)
|
||||||
)
|
)
|
||||||
return cursor.fetchall()
|
return cursor.fetchall()
|
||||||
|
|
|
||||||
|
|
@ -1,77 +1,40 @@
|
||||||
from db_logging import log_change
|
# ca_core/metadata.py
|
||||||
|
|
||||||
|
|
||||||
def set_name(cursor, name):
|
|
||||||
cursor.execute("DELETE FROM metadata")
|
|
||||||
cursor.execute(
|
|
||||||
"INSERT INTO metadata (name) VALUES (%s)",
|
|
||||||
(name,)
|
|
||||||
)
|
|
||||||
log_change(cursor, f"Updated metadata name to {name}")
|
|
||||||
|
|
||||||
|
|
||||||
def get_name(cursor):
|
def get_name(cursor):
|
||||||
cursor.execute("SELECT name FROM metadata LIMIT 1")
|
cursor.execute("SELECT name FROM metadata LIMIT 1")
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
return row["name"] if row else None
|
return row['name'] if row else None
|
||||||
|
|
||||||
|
|
||||||
def set_comment(cursor, comment):
|
def set_name(cursor, value):
|
||||||
cursor.execute("DELETE FROM metadata")
|
cursor.execute("UPDATE metadata SET name=%s", (value,))
|
||||||
cursor.execute(
|
|
||||||
"INSERT INTO metadata (comment) VALUES (%s)",
|
|
||||||
(comment,)
|
|
||||||
)
|
|
||||||
log_change(cursor, f"Updated metadata comment to {comment}")
|
|
||||||
|
|
||||||
|
|
||||||
def get_comment(cursor):
|
def get_comment(cursor):
|
||||||
cursor.execute("SELECT comment FROM metadata LIMIT 1")
|
cursor.execute("SELECT comment FROM metadata LIMIT 1")
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
return row["comment"] if row else None
|
return row['comment'] if row else None
|
||||||
|
|
||||||
|
|
||||||
def set_keys(cursor, public_key, private_key):
|
def set_comment(cursor, value):
|
||||||
cursor.execute("DELETE FROM metadata")
|
cursor.execute("UPDATE metadata SET comment=%s", (value,))
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO metadata (public_key, private_key)
|
|
||||||
VALUES (%s, %s)
|
|
||||||
""",
|
|
||||||
(public_key, private_key)
|
|
||||||
)
|
|
||||||
log_change(cursor, "Updated metadata keys")
|
|
||||||
|
|
||||||
|
|
||||||
def get_public_key(cursor):
|
def get_public_key(cursor):
|
||||||
cursor.execute("SELECT public_key FROM metadata LIMIT 1")
|
cursor.execute("SELECT public_key FROM metadata LIMIT 1")
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
return row["public_key"] if row else None
|
return row['public_key'] if row else None
|
||||||
|
|
||||||
|
|
||||||
def get_private_key(cursor):
|
def get_private_key(cursor):
|
||||||
cursor.execute("SELECT private_key FROM metadata LIMIT 1")
|
cursor.execute("SELECT private_key FROM metadata LIMIT 1")
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
return row["private_key"] if row else None
|
return row['private_key'] if row else None
|
||||||
|
|
||||||
|
|
||||||
def set_defense_p(cursor, defense_p: bool):
|
def set_keys(cursor, public_key, private_key):
|
||||||
"""Set the metadata defense_p flag.
|
|
||||||
|
|
||||||
This table is treated as a singleton row at the application level.
|
|
||||||
Current convention in this codebase is to wipe and re-insert.
|
|
||||||
"""
|
|
||||||
cursor.execute("DELETE FROM metadata")
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"INSERT INTO metadata (defense_p) VALUES (%s)",
|
"UPDATE metadata SET public_key=%s, private_key=%s",
|
||||||
(defense_p,)
|
(public_key, private_key)
|
||||||
)
|
)
|
||||||
log_change(cursor, f"Updated metadata defense_p to {defense_p}")
|
|
||||||
|
|
||||||
|
|
||||||
def get_defense_p(cursor) -> bool:
|
|
||||||
cursor.execute("SELECT defense_p FROM metadata LIMIT 1")
|
|
||||||
row = cursor.fetchone()
|
|
||||||
if not row or row["defense_p"] is None:
|
|
||||||
return False
|
|
||||||
return bool(row["defense_p"])
|
|
||||||
|
|
|
||||||
|
|
@ -1,104 +1,29 @@
|
||||||
from db_logging import log_change
|
# ca_core/property.py
|
||||||
from entity import ensure_entity_active
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_validation_policy(validation_policy: str) -> str:
|
|
||||||
if validation_policy is None:
|
|
||||||
return "default"
|
|
||||||
if not isinstance(validation_policy, str):
|
|
||||||
raise TypeError("validation_policy must be a string")
|
|
||||||
vp = validation_policy.strip()
|
|
||||||
if not vp:
|
|
||||||
raise ValueError("validation_policy cannot be empty")
|
|
||||||
if len(vp) > 19:
|
|
||||||
raise ValueError("validation_policy must be at most 19 characters")
|
|
||||||
return vp
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_source(source):
|
|
||||||
if source is None:
|
|
||||||
return None
|
|
||||||
if not isinstance(source, str):
|
|
||||||
raise TypeError("source must be a string or None")
|
|
||||||
s = source.strip()
|
|
||||||
if len(s) > 150:
|
|
||||||
raise ValueError("source must be at most 150 characters")
|
|
||||||
# Allow empty string -> treat as NULL for cleanliness
|
|
||||||
return s if s else None
|
|
||||||
|
|
||||||
|
|
||||||
def set_property(cursor, entity_id, property_name, validation_policy="default", source=None):
|
|
||||||
"""
|
|
||||||
Revoked entities are immutable: cannot add/update properties.
|
|
||||||
|
|
||||||
Schema: property(id, property_name, validation_policy, source)
|
|
||||||
- validation_policy: CHAR(19) NOT NULL DEFAULT 'default'
|
|
||||||
- source: VARCHAR(150) NULL
|
|
||||||
"""
|
|
||||||
ensure_entity_active(cursor, entity_id)
|
|
||||||
|
|
||||||
if not isinstance(property_name, str) or not property_name.strip():
|
|
||||||
raise ValueError("property_name must be a non-empty string")
|
|
||||||
if len(property_name) > 100:
|
|
||||||
raise ValueError("property_name must be at most 100 characters")
|
|
||||||
|
|
||||||
vp = _validate_validation_policy(validation_policy)
|
|
||||||
src = _validate_source(source)
|
|
||||||
|
|
||||||
|
def set_property(cursor, entity_id: int, property_name: str):
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO property (id, property_name, validation_policy, source)
|
INSERT INTO property (id, property_name)
|
||||||
VALUES (%s, %s, %s, %s)
|
VALUES (%s, %s)
|
||||||
ON CONFLICT (id, property_name)
|
ON CONFLICT (id, property_name) DO NOTHING
|
||||||
DO UPDATE SET
|
|
||||||
validation_policy = EXCLUDED.validation_policy,
|
|
||||||
source = EXCLUDED.source
|
|
||||||
""",
|
""",
|
||||||
(entity_id, property_name, vp, src),
|
(entity_id, property_name)
|
||||||
)
|
|
||||||
log_change(
|
|
||||||
cursor,
|
|
||||||
f"Set property '{property_name}' for entity {entity_id} "
|
|
||||||
f"(validation_policy={vp}, source={src})",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_properties(cursor, entity_id):
|
def delete_property(cursor, entity_id: int, property_name: str):
|
||||||
"""
|
|
||||||
Returns a list of property_name values for the entity.
|
|
||||||
"""
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"SELECT property_name FROM property WHERE id = %s",
|
"DELETE FROM property WHERE id=%s AND property_name=%s",
|
||||||
(entity_id,),
|
(entity_id, property_name)
|
||||||
)
|
)
|
||||||
rows = cursor.fetchall()
|
if cursor.rowcount == 0:
|
||||||
return [r["property_name"] for r in rows]
|
raise ValueError("Property not found")
|
||||||
|
|
||||||
|
|
||||||
def get_property(cursor, entity_id, property_name):
|
def get_properties(cursor, entity_id: int):
|
||||||
"""
|
|
||||||
Returns a dict_row with keys: property_name, validation_policy, source
|
|
||||||
or None if not found.
|
|
||||||
"""
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"SELECT property_name FROM property WHERE id=%s",
|
||||||
SELECT property_name, validation_policy, source
|
(entity_id,)
|
||||||
FROM property
|
|
||||||
WHERE id = %s AND property_name = %s
|
|
||||||
""",
|
|
||||||
(entity_id, property_name),
|
|
||||||
)
|
)
|
||||||
return cursor.fetchone()
|
return [row['property_name'] for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
|
||||||
def delete_property(cursor, entity_id, property_name):
|
|
||||||
"""
|
|
||||||
Revoked entities are immutable: cannot delete properties.
|
|
||||||
"""
|
|
||||||
ensure_entity_active(cursor, entity_id)
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"DELETE FROM property WHERE id = %s AND property_name = %s",
|
|
||||||
(entity_id, property_name),
|
|
||||||
)
|
|
||||||
log_change(cursor, f"Deleted property '{property_name}' for entity {entity_id}")
|
|
||||||
|
|
|
||||||
|
|
@ -1,77 +1,62 @@
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
-- Metadata (singleton row; enforced at application level)
|
-- Metadata table (singleton)
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
DROP TABLE IF EXISTS metadata;
|
DROP TABLE IF EXISTS metadata;
|
||||||
|
|
||||||
CREATE TABLE metadata(
|
CREATE TABLE metadata (
|
||||||
name VARCHAR(50),
|
name VARCHAR(50),
|
||||||
comment VARCHAR(200),
|
comment VARCHAR(200),
|
||||||
private_key VARCHAR(500),
|
private_key VARCHAR(500),
|
||||||
public_key VARCHAR(500),
|
public_key VARCHAR(500)
|
||||||
defense_p BOOLEAN NOT NULL DEFAULT false
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
INSERT INTO metadata DEFAULT VALUES;
|
||||||
|
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
-- Entity
|
-- Entity table
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
DROP TABLE IF EXISTS entity CASCADE;
|
DROP TABLE IF EXISTS entity CASCADE;
|
||||||
|
|
||||||
CREATE TABLE entity(
|
CREATE TABLE entity (
|
||||||
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
creator INT REFERENCES entity(id),
|
creator INT REFERENCES entity(id),
|
||||||
name VARCHAR(100) NOT NULL,
|
name VARCHAR(100) NOT NULL,
|
||||||
type VARCHAR(20) NOT NULL, -- person, group, device
|
type VARCHAR(10) NOT NULL DEFAULT 'person', -- 'creator', 'person', 'group', 'device'
|
||||||
symmetrical_key VARCHAR(100),
|
geo_offset BIGINT,
|
||||||
public_key VARCHAR(300) NOT NULL,
|
public_key VARCHAR(300) NOT NULL,
|
||||||
ca_reference VARCHAR(100),
|
|
||||||
status VARCHAR(20) NOT NULL DEFAULT 'active',
|
|
||||||
expiration DATE,
|
expiration DATE,
|
||||||
CONSTRAINT entity_ca_reference_check CHECK (
|
status VARCHAR(10) NOT NULL DEFAULT 'active'
|
||||||
(type = 'group' AND ca_reference IS NOT NULL)
|
|
||||||
OR
|
|
||||||
(type <> 'group' AND ca_reference IS NULL)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
-- Indexes
|
||||||
CREATE INDEX idx_entity_name ON entity(name);
|
CREATE INDEX idx_entity_name ON entity(name);
|
||||||
|
CREATE INDEX idx_entity_expiration ON entity(expiration);
|
||||||
|
|
||||||
|
ALTER TABLE entity ADD CONSTRAINT entity_name_unique UNIQUE (name);
|
||||||
|
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
-- Group Member
|
-- Group Member table
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
DROP TABLE IF EXISTS group_member;
|
DROP TABLE IF EXISTS group_member;
|
||||||
|
|
||||||
CREATE TABLE group_member(
|
CREATE TABLE group_member (
|
||||||
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
role VARCHAR(10),
|
role VARCHAR(10),
|
||||||
PRIMARY KEY (group_id, member_id)
|
PRIMARY KEY (group_id, member_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX idx_group_member ON group_member(member_id, group_id);
|
CREATE INDEX idx_group_member_member_group ON group_member(member_id, group_id);
|
||||||
|
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
-- Property
|
-- Property table
|
||||||
-- ------------------------
|
-- ------------------------
|
||||||
DROP TABLE IF EXISTS property;
|
DROP TABLE IF EXISTS property;
|
||||||
|
|
||||||
CREATE TABLE property(
|
CREATE TABLE property (
|
||||||
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
property_name VARCHAR(100) NOT NULL,
|
property_name VARCHAR(100),
|
||||||
validation_policy CHAR(19) NOT NULL DEFAULT 'default',
|
|
||||||
source VARCHAR(150),
|
|
||||||
PRIMARY KEY (id, property_name)
|
PRIMARY KEY (id, property_name)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- ------------------------
|
|
||||||
-- Log Table
|
|
||||||
-- ------------------------
|
|
||||||
DROP TABLE IF EXISTS log;
|
|
||||||
|
|
||||||
CREATE TABLE log(
|
|
||||||
id SERIAL PRIMARY KEY,
|
|
||||||
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
||||||
entry TEXT NOT NULL
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX idx_log_ts ON log(ts);
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
drop table metadata;
|
||||||
|
|
||||||
|
create table metadata(
|
||||||
|
name varchar(50),
|
||||||
|
comment varchar(200),
|
||||||
|
private_key varchar(500),
|
||||||
|
public_key varchar(500)
|
||||||
|
);
|
||||||
|
|
||||||
|
insert into metadata default vALUES;
|
||||||
|
|
||||||
|
drop table entity cascade;
|
||||||
|
|
||||||
|
create table entity(
|
||||||
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
creator INT REFERENCES entity(id),
|
||||||
|
name varchar(100) NOT NULL,
|
||||||
|
group_p BOOLEAN NOT NULL,
|
||||||
|
geo_offset BIGINT,
|
||||||
|
--private_key VARCHAR(300) NOT NULL,
|
||||||
|
public_key VARCHAR(300) NOT NULL,
|
||||||
|
expiration DATE,
|
||||||
|
status VARCHAR(10) NOT NULL DEFAULT 'active'
|
||||||
|
|
||||||
|
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_entity_name ON entity(name);
|
||||||
|
|
||||||
|
drop table group_member;
|
||||||
|
|
||||||
|
create table group_member(
|
||||||
|
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
|
person_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
|
role VARCHAR(10),
|
||||||
|
PRIMARY KEY (group_id,person_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_group_member_person_group ON group_member(person_id, group_id);
|
||||||
|
|
||||||
|
drop table property;
|
||||||
|
|
||||||
|
create table property(
|
||||||
|
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
|
property_name VARCHAR(100),
|
||||||
|
PRIMARY KEY (id, property_name)
|
||||||
|
);
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,20 +1,15 @@
|
||||||
import unittest
|
import unittest
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
|
# Add code folder to path
|
||||||
code_path = Path(__file__).parent.parent / "ca_core"
|
code_path = Path(__file__).parent.parent / "ca_core"
|
||||||
sys.path.insert(0, str(code_path))
|
sys.path.insert(0, str(code_path))
|
||||||
|
import entity # the rewritten entity.py module
|
||||||
import entity
|
|
||||||
|
|
||||||
DBNAME = "ca"
|
DBNAME = "ca"
|
||||||
|
|
||||||
def get_last_log(cursor):
|
|
||||||
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
|
|
||||||
row = cursor.fetchone()
|
|
||||||
return row["entry"] if row else ""
|
|
||||||
|
|
||||||
|
|
||||||
class TestEntityFunctions(unittest.TestCase):
|
class TestEntityFunctions(unittest.TestCase):
|
||||||
|
|
||||||
|
|
@ -23,62 +18,56 @@ class TestEntityFunctions(unittest.TestCase):
|
||||||
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
||||||
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
||||||
|
|
||||||
@classmethod
|
# Ensure table exists
|
||||||
def tearDownClass(cls):
|
cls.cur.execute("""
|
||||||
cls.cur.close()
|
CREATE TABLE IF NOT EXISTS entity (
|
||||||
cls.conn.close()
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
creator INT REFERENCES entity(id),
|
||||||
|
name VARCHAR(100) NOT NULL,
|
||||||
|
type VARCHAR(10) NOT NULL DEFAULT 'person',
|
||||||
|
geo_offset BIGINT,
|
||||||
|
public_key VARCHAR(300) NOT NULL,
|
||||||
|
expiration DATE,
|
||||||
|
status VARCHAR(10) NOT NULL DEFAULT 'active'
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.conn.commit()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
self.conn.autocommit = False
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
|
||||||
|
# --- Insert and read ---
|
||||||
def test_insert_creator_and_get(self):
|
def test_insert_creator_and_get(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
||||||
row = entity.get_entity(self.cur, creator_id)
|
row = entity.get_entity(self.cur, creator_id)
|
||||||
self.assertEqual(row["name"], "Creator1")
|
self.assertEqual(row["name"], "Creator1")
|
||||||
self.assertIsNone(row["ca_reference"])
|
self.assertEqual(row["type"], "creator")
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("creator entity", log_entry)
|
|
||||||
self.assertIn(str(creator_id), log_entry)
|
|
||||||
|
|
||||||
def test_enroll_person(self):
|
def test_enroll_person(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
||||||
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
|
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
|
||||||
|
self.assertEqual(entity.get_entity_name(self.cur, person_id), "Person1")
|
||||||
|
self.assertEqual(entity.get_entity(self.cur, person_id)["type"], "person")
|
||||||
|
|
||||||
row = entity.get_entity(self.cur, person_id)
|
def test_create_group(self):
|
||||||
self.assertIsNone(row["ca_reference"])
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("enrolled person", log_entry)
|
|
||||||
self.assertIn(str(person_id), log_entry)
|
|
||||||
|
|
||||||
def test_create_group_requires_ca_reference(self):
|
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
||||||
|
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id)
|
||||||
|
self.assertEqual(entity.get_entity_name(self.cur, group_id), "Group1")
|
||||||
|
self.assertEqual(entity.get_entity(self.cur, group_id)["type"], "group")
|
||||||
|
|
||||||
with self.assertRaises(ValueError):
|
# --- Revocation ---
|
||||||
entity.create_group(self.cur, "GroupMissingRef", "pubkey_group", creator_id, None)
|
def test_revoke_entity(self):
|
||||||
|
|
||||||
def test_create_group_sets_ca_reference(self):
|
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
|
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
|
||||||
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-REF-1")
|
entity.revoke_entity(self.cur, creator_id, creator_id)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
entity.get_entity(self.cur, creator_id)
|
||||||
|
|
||||||
row = entity.get_entity(self.cur, group_id)
|
|
||||||
self.assertEqual(row["ca_reference"], "CA-REF-1")
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
if __name__ == "__main__":
|
||||||
self.assertIn("created group", log_entry)
|
unittest.main()
|
||||||
self.assertIn(str(group_id), log_entry)
|
|
||||||
self.assertIn("ca-ref-1".lower(), log_entry)
|
|
||||||
|
|
||||||
def test_set_and_get_symmetrical_key(self):
|
|
||||||
creator_id = entity.insert_creator(self.cur, "CreatorSym", "pubkey_sym")
|
|
||||||
entity.set_symmetrical_key(self.cur, creator_id, "symkey123", creator_id)
|
|
||||||
|
|
||||||
row = entity.get_entity(self.cur, creator_id)
|
|
||||||
self.assertEqual(row["symmetrical_key"], "symkey123")
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("symmetrical_key", log_entry)
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
import unittest
|
import unittest
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
code_path = Path(__file__).parent.parent / "ca_core"
|
code_path = Path(__file__).parent.parent / "ca_core"
|
||||||
|
|
@ -11,12 +11,6 @@ import group_member
|
||||||
|
|
||||||
DBNAME = "ca"
|
DBNAME = "ca"
|
||||||
|
|
||||||
def get_last_log(cursor):
|
|
||||||
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
|
|
||||||
row = cursor.fetchone()
|
|
||||||
return row["entry"] if row else ""
|
|
||||||
|
|
||||||
|
|
||||||
class TestGroupFunctions(unittest.TestCase):
|
class TestGroupFunctions(unittest.TestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
@ -24,65 +18,78 @@ class TestGroupFunctions(unittest.TestCase):
|
||||||
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
||||||
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
||||||
|
|
||||||
@classmethod
|
# Ensure tables exist
|
||||||
def tearDownClass(cls):
|
cls.cur.execute("""
|
||||||
cls.cur.close()
|
CREATE TABLE IF NOT EXISTS entity (
|
||||||
cls.conn.close()
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
creator INT REFERENCES entity(id),
|
||||||
|
name VARCHAR(100) NOT NULL,
|
||||||
|
type VARCHAR(10) NOT NULL DEFAULT 'person',
|
||||||
|
geo_offset BIGINT,
|
||||||
|
public_key VARCHAR(300) NOT NULL,
|
||||||
|
expiration DATE,
|
||||||
|
status VARCHAR(10) NOT NULL DEFAULT 'active'
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS group_member (
|
||||||
|
group_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
|
member_id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
|
role VARCHAR(10),
|
||||||
|
PRIMARY KEY (group_id, member_id)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.conn.commit()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
self.conn.autocommit = False
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
|
||||||
|
# --- Group membership tests ---
|
||||||
def test_add_and_get_members(self):
|
def test_add_and_get_members(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
||||||
|
group_id = entity.create_group(self.cur, "GroupA", "pubkey_group", creator_id)
|
||||||
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
|
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
|
||||||
group_id = entity.create_group(self.cur, "Group1", "pubkey_group", creator_id, "CA-GROUP-1")
|
device_id = entity.insert_creator(self.cur, "Device1", "pubkey_device")
|
||||||
|
# Add members
|
||||||
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
||||||
|
group_member.add_group_member(self.cur, group_id, device_id, "device")
|
||||||
members = group_member.get_members_of_group(self.cur, group_id)
|
members = group_member.get_members_of_group(self.cur, group_id)
|
||||||
|
member_ids = [m["member_id"] for m in members]
|
||||||
self.assertTrue(
|
self.assertIn(person_id, member_ids)
|
||||||
any(m["member_id"] == person_id and m["role"] == "member"
|
self.assertIn(device_id, member_ids)
|
||||||
for m in members)
|
|
||||||
)
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("added member", log_entry)
|
|
||||||
self.assertIn(str(group_id), log_entry)
|
|
||||||
|
|
||||||
def test_nested_groups(self):
|
def test_nested_groups(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
||||||
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_parent", creator_id, "CA-PARENT")
|
parent_group = entity.create_group(self.cur, "ParentGroup", "pubkey_pg", creator_id)
|
||||||
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_child", creator_id, "CA-CHILD")
|
child_group = entity.create_group(self.cur, "ChildGroup", "pubkey_cg", creator_id)
|
||||||
|
# Add child group as member of parent
|
||||||
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
|
group_member.add_group_member(self.cur, parent_group, child_group, "subgroup")
|
||||||
|
|
||||||
members = group_member.get_members_of_group(self.cur, parent_group)
|
members = group_member.get_members_of_group(self.cur, parent_group)
|
||||||
|
self.assertEqual(members[0]["member_id"], child_group)
|
||||||
self.assertTrue(
|
self.assertEqual(members[0]["role"], "subgroup")
|
||||||
any(m["member_id"] == child_group and m["role"] == "subgroup"
|
|
||||||
for m in members)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_revoked_group_cannot_accept_members(self):
|
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
|
||||||
group_id = entity.create_group(self.cur, "RevokedGroup", "pubkey_group", creator_id, "CA-REVOKED-GROUP")
|
|
||||||
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person", creator_id)
|
|
||||||
|
|
||||||
entity.set_entity_status(self.cur, group_id, "revoked", creator_id)
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
|
||||||
|
|
||||||
def test_revoked_member_cannot_be_added(self):
|
def test_revoked_member_cannot_be_added(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
|
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
||||||
group_id = entity.create_group(self.cur, "ActiveGroup", "pubkey_group", creator_id, "CA-ACTIVE-GROUP")
|
group_id = entity.create_group(self.cur, "GroupB", "pubkey_groupB", creator_id)
|
||||||
person_id = entity.enroll_person(self.cur, "RevokedPerson", "pubkey_person", creator_id)
|
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
|
||||||
|
entity.revoke_entity(self.cur, person_id, creator_id)
|
||||||
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
||||||
|
|
||||||
|
def test_revoked_group_cannot_accept_members(self):
|
||||||
|
creator_id = entity.insert_creator(self.cur, "Creator4", "pubkey4")
|
||||||
|
group_id = entity.create_group(self.cur, "GroupC", "pubkey_groupC", creator_id)
|
||||||
|
entity.revoke_entity(self.cur, group_id, creator_id)
|
||||||
|
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
group_member.add_group_member(self.cur, group_id, person_id, "member")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,64 +3,74 @@ import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
|
# Add the code directory to Python path
|
||||||
code_path = Path(__file__).parent.parent / "ca_core"
|
code_path = Path(__file__).parent.parent / "ca_core"
|
||||||
sys.path.insert(0, str(code_path))
|
sys.path.insert(0, str(code_path))
|
||||||
|
|
||||||
import metadata
|
import metadata # your metadata.py module
|
||||||
|
|
||||||
DBNAME = "ca"
|
DBNAME = "ca"
|
||||||
|
|
||||||
def get_last_log(cursor):
|
|
||||||
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
|
|
||||||
row = cursor.fetchone()
|
|
||||||
return row["entry"] if row else ""
|
|
||||||
|
|
||||||
|
|
||||||
class TestMetadataFunctions(unittest.TestCase):
|
class TestMetadataFunctions(unittest.TestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
|
# Connect to the database
|
||||||
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
||||||
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
||||||
|
|
||||||
|
# Ensure table exists and has exactly one row
|
||||||
|
cls.cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS metadata (
|
||||||
|
name VARCHAR(50),
|
||||||
|
comment VARCHAR(200),
|
||||||
|
private_key VARCHAR(500),
|
||||||
|
public_key VARCHAR(500)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.cur.execute("SELECT COUNT(*) AS cnt FROM metadata")
|
||||||
|
row = cls.cur.fetchone()
|
||||||
|
if row['cnt'] == 0:
|
||||||
|
cls.cur.execute("INSERT INTO metadata DEFAULT VALUES")
|
||||||
|
cls.conn.commit()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
cls.cur.close()
|
cls.cur.close()
|
||||||
cls.conn.close()
|
cls.conn.close()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
# Begin transaction for each test
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
self.conn.autocommit = False
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
# Rollback after each test
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
|
||||||
|
# --- Test name field ---
|
||||||
def test_set_and_get_name(self):
|
def test_set_and_get_name(self):
|
||||||
metadata.set_name(self.cur, "AppName")
|
metadata.set_name(self.cur, "AppName")
|
||||||
self.assertEqual(metadata.get_name(self.cur), "AppName")
|
self.assertEqual(metadata.get_name(self.cur), "AppName")
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
# --- Test comment field ---
|
||||||
self.assertIn("metadata name", log_entry)
|
|
||||||
self.assertIn("appname", log_entry)
|
|
||||||
|
|
||||||
def test_set_and_get_comment(self):
|
def test_set_and_get_comment(self):
|
||||||
metadata.set_comment(self.cur, "Test comment")
|
metadata.set_comment(self.cur, "Test comment")
|
||||||
self.assertEqual(metadata.get_comment(self.cur), "Test comment")
|
self.assertEqual(metadata.get_comment(self.cur), "Test comment")
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
# --- Test keys ---
|
||||||
self.assertIn("metadata comment", log_entry)
|
|
||||||
|
|
||||||
def test_set_and_get_keys(self):
|
def test_set_and_get_keys(self):
|
||||||
metadata.set_keys(self.cur, "pubkey123", "privkey456")
|
metadata.set_keys(self.cur, "pubkey123", "privkey456")
|
||||||
|
|
||||||
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
|
self.assertEqual(metadata.get_public_key(self.cur), "pubkey123")
|
||||||
self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
|
self.assertEqual(metadata.get_private_key(self.cur), "privkey456")
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
# --- Test keys overwrite ---
|
||||||
self.assertIn("metadata keys", log_entry)
|
def test_keys_overwrite(self):
|
||||||
|
metadata.set_keys(self.cur, "pub1", "priv1")
|
||||||
|
metadata.set_keys(self.cur, "pub2", "priv2")
|
||||||
|
self.assertEqual(metadata.get_public_key(self.cur), "pub2")
|
||||||
|
self.assertEqual(metadata.get_private_key(self.cur), "priv2")
|
||||||
|
|
||||||
def test_set_and_get_defense_p(self):
|
if __name__ == "__main__":
|
||||||
metadata.set_defense_p(self.cur, True)
|
unittest.main()
|
||||||
self.assertEqual(metadata.get_defense_p(self.cur), True)
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("defense_p", log_entry)
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,8 @@
|
||||||
import unittest
|
import unittest
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
# Add core directory to path
|
|
||||||
code_path = Path(__file__).parent.parent / "ca_core"
|
code_path = Path(__file__).parent.parent / "ca_core"
|
||||||
sys.path.insert(0, str(code_path))
|
sys.path.insert(0, str(code_path))
|
||||||
|
|
||||||
|
|
@ -12,13 +11,6 @@ import property
|
||||||
|
|
||||||
DBNAME = "ca"
|
DBNAME = "ca"
|
||||||
|
|
||||||
|
|
||||||
def get_last_log(cursor):
|
|
||||||
cursor.execute("SELECT entry FROM log ORDER BY id DESC LIMIT 1")
|
|
||||||
row = cursor.fetchone()
|
|
||||||
return row["entry"] if row else ""
|
|
||||||
|
|
||||||
|
|
||||||
class TestPropertyFunctions(unittest.TestCase):
|
class TestPropertyFunctions(unittest.TestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
@ -26,102 +18,68 @@ class TestPropertyFunctions(unittest.TestCase):
|
||||||
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
cls.conn = psycopg.connect(f"dbname={DBNAME}")
|
||||||
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
cls.cur = cls.conn.cursor(row_factory=psycopg.rows.dict_row)
|
||||||
|
|
||||||
@classmethod
|
# Ensure entity table exists
|
||||||
def tearDownClass(cls):
|
cls.cur.execute("""
|
||||||
cls.cur.close()
|
CREATE TABLE IF NOT EXISTS entity (
|
||||||
cls.conn.close()
|
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
|
creation_ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
creator INT REFERENCES entity(id),
|
||||||
|
name VARCHAR(100) NOT NULL,
|
||||||
|
type VARCHAR(10) NOT NULL DEFAULT 'person',
|
||||||
|
geo_offset BIGINT,
|
||||||
|
public_key VARCHAR(300) NOT NULL,
|
||||||
|
expiration DATE,
|
||||||
|
status VARCHAR(10) NOT NULL DEFAULT 'active'
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS property (
|
||||||
|
id INT NOT NULL REFERENCES entity(id) ON DELETE CASCADE,
|
||||||
|
property_name VARCHAR(100),
|
||||||
|
PRIMARY KEY (id, property_name)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
cls.conn.commit()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
self.conn.autocommit = False
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.conn.rollback()
|
self.conn.rollback()
|
||||||
|
|
||||||
# ------------------------------------------------------------
|
|
||||||
# Basic property set/get
|
|
||||||
# ------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_set_and_get_property(self):
|
def test_set_and_get_property(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
creator_id = entity.insert_creator(self.cur, "Creator1", "pubkey1")
|
||||||
person_id = entity.enroll_person(
|
person_id = entity.enroll_person(self.cur, "Person1", "pubkey_person", creator_id)
|
||||||
self.cur, "Person1", "pubkey_person", creator_id
|
property.set_property(self.cur, person_id, "email")
|
||||||
)
|
|
||||||
|
|
||||||
property.set_property(self.cur, person_id, "prop1")
|
|
||||||
|
|
||||||
props = property.get_properties(self.cur, person_id)
|
props = property.get_properties(self.cur, person_id)
|
||||||
self.assertIn("prop1", props)
|
self.assertIn("email", props)
|
||||||
|
|
||||||
details = property.get_property(self.cur, person_id, "prop1")
|
|
||||||
self.assertIsNotNone(details)
|
|
||||||
# CHAR(19) is space-padded in PostgreSQL; strip for comparison.
|
|
||||||
self.assertEqual(details["validation_policy"].strip(), "default")
|
|
||||||
self.assertIsNone(details["source"])
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("set property", log_entry)
|
|
||||||
self.assertIn("prop1", log_entry)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------
|
|
||||||
# Delete property
|
|
||||||
# ------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_delete_property(self):
|
def test_delete_property(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
creator_id = entity.insert_creator(self.cur, "Creator2", "pubkey2")
|
||||||
person_id = entity.enroll_person(
|
person_id = entity.enroll_person(self.cur, "Person2", "pubkey_person2", creator_id)
|
||||||
self.cur, "Person2", "pubkey_person", creator_id
|
property.set_property(self.cur, person_id, "phone")
|
||||||
)
|
property.delete_property(self.cur, person_id, "phone")
|
||||||
|
|
||||||
property.set_property(self.cur, person_id, "prop2")
|
|
||||||
property.delete_property(self.cur, person_id, "prop2")
|
|
||||||
|
|
||||||
props = property.get_properties(self.cur, person_id)
|
props = property.get_properties(self.cur, person_id)
|
||||||
self.assertNotIn("prop2", props)
|
self.assertNotIn("phone", props)
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("deleted property", log_entry)
|
|
||||||
self.assertIn("prop2", log_entry)
|
|
||||||
|
|
||||||
|
|
||||||
def test_set_property_with_policy_and_source(self):
|
|
||||||
creator_id = entity.insert_creator(self.cur, "CreatorPolicy", "pubkey_policy")
|
|
||||||
person_id = entity.enroll_person(
|
|
||||||
self.cur, "PersonPolicy", "pubkey_person", creator_id
|
|
||||||
)
|
|
||||||
|
|
||||||
property.set_property(
|
|
||||||
self.cur,
|
|
||||||
person_id,
|
|
||||||
"prop_policy",
|
|
||||||
validation_policy="strict",
|
|
||||||
source="unit-test",
|
|
||||||
)
|
|
||||||
|
|
||||||
details = property.get_property(self.cur, person_id, "prop_policy")
|
|
||||||
self.assertIsNotNone(details)
|
|
||||||
self.assertEqual(details["validation_policy"].strip(), "strict")
|
|
||||||
self.assertEqual(details["source"], "unit-test")
|
|
||||||
|
|
||||||
log_entry = get_last_log(self.cur).lower()
|
|
||||||
self.assertIn("set property", log_entry)
|
|
||||||
self.assertIn("prop_policy", log_entry)
|
|
||||||
self.assertIn("strict", log_entry)
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------
|
|
||||||
# Immutability: revoked entity cannot mutate properties
|
|
||||||
# ------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_revoked_entity_has_no_properties(self):
|
def test_revoked_entity_has_no_properties(self):
|
||||||
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
creator_id = entity.insert_creator(self.cur, "Creator3", "pubkey3")
|
||||||
person_id = entity.enroll_person(
|
person_id = entity.enroll_person(self.cur, "Person3", "pubkey_person3", creator_id)
|
||||||
self.cur, "Person3", "pubkey_person", creator_id
|
property.set_property(self.cur, person_id, "address")
|
||||||
|
entity.revoke_entity(self.cur, person_id, creator_id)
|
||||||
|
props = property.get_properties(self.cur, person_id)
|
||||||
|
# Optional: you can decide whether to return empty or raise; here we return all properties regardless of status
|
||||||
|
# If you want to ignore revoked entities:
|
||||||
|
cursor = self.cur
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT property_name FROM property p JOIN entity e ON e.id=p.id WHERE e.id=%s AND e.status='active'",
|
||||||
|
(person_id,)
|
||||||
)
|
)
|
||||||
|
props_active = [r['property_name'] for r in cursor.fetchall()]
|
||||||
|
self.assertNotIn("address", props_active)
|
||||||
|
|
||||||
entity.set_entity_status(self.cur, person_id, "revoked", creator_id)
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError):
|
if __name__ == "__main__":
|
||||||
property.set_property(self.cur, person_id, "prop3")
|
unittest.main()
|
||||||
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
property.delete_property(self.cur, person_id, "prop3")
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue