Compare commits

..

No commits in common. "c560407c741cd19201a1e1c0d8e6d87620aadb98" and "a1be210f589a7ef1d12b987942968cf431c8dac2" have entirely different histories.

35 changed files with 82 additions and 1131 deletions

View File

@ -1,61 +1,34 @@
CA/PKI Backend Project Context (Updated) CA/PKI Backend Project Context
Stack Stack
Python 3 + psycopg (dict_row cursors) Python 3 + psycopg (dict_row cursors)
PostgreSQL database: ca PostgreSQL database: ca
Unit tests: unittest Unit tests: unittest (python3 -m unittest discover)
Run via: python3 -m unittest discover
Current test count: 17
Database Schema (current assumptions) Database Schema (current assumptions)
entity entity
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY id INT identity PK
creation_ts TIMESTAMPTZ DEFAULT now() creation_ts TIMESTAMPTZ default now()
creator INT FK → entity(id) (nullable) creator INT FK → entity(id) (the entity that created this one; nullable)
name VARCHAR(100) NOT NULL name VARCHAR(100) NOT NULL
type VARCHAR(...) NOT NULL type VARCHAR(...) NOT NULL (e.g. person, group, device, alias)
Allowed types: person, group, device
public_key VARCHAR(300) NOT NULL public_key VARCHAR(300) NOT NULL
symmetrical_key VARCHAR(100) NULL symmetrical_key VARCHAR(100) NULL
status VARCHAR(...) NOT NULL DEFAULT 'active' status VARCHAR(...) NOT NULL default 'active' (values: 'active', 'revoked')
Values: 'active', 'revoked'
expiration DATE NULL expiration DATE NULL
ca_reference VARCHAR(100) NULL Index on entity(name) (and other indexes as needed)
Constraint
CHECK (
(type = 'group' AND ca_reference IS NOT NULL)
OR
(type <> 'group' AND ca_reference IS NULL)
)
Rule:
Groups MUST have a ca_reference
All other entity types MUST have ca_reference IS NULL
Indexes:
Index on entity(name)
Other indexes as needed
group_member group_member
@ -65,169 +38,69 @@ member_id INT FK → entity(id) ON DELETE CASCADE
role VARCHAR(10) role VARCHAR(10)
PRIMARY KEY (group_id, member_id) PK (group_id, member_id)
Index (member_id, group_id) Index (member_id, group_id)
Groups can contain: Groups can contain any entity type, including other groups and devices.
persons
devices
other groups
property property
id INT FK → entity(id) ON DELETE CASCADE Columns: (id INT FK → entity(id), property_name VARCHAR(100))
property_name VARCHAR(100) NOT NULL PK (id, property_name)
validation_policy CHAR(19) NOT NULL DEFAULT 'default' Used for flags/roles such as "creator"
source VARCHAR(150) NULL
PRIMARY KEY (id, property_name)
Notes:
validation_policy is CHAR(19) and padded by PostgreSQL.
Used for flags/roles such as "creator".
metadata metadata
Singleton row table (enforced at application level). Intended “singleton row” table, enforced at application level
Columns: Columns: name, comment, private_key, public_key
name
comment
private_key
public_key
defense_p BOOLEAN NOT NULL DEFAULT false
defense_p:
Global system flag.
Logged on change.
log log
id SERIAL PRIMARY KEY id SERIAL PK
ts TIMESTAMPTZ DEFAULT now() ts TIMESTAMPTZ default now()
entry TEXT NOT NULL entry TEXT NOT NULL
Every API mutation must insert exactly one row here. Every API mutation must log one row here.
Core Business Rules Core Business Rules
Creators are NOT an entity type
"creator" is a property (property_name='creator') on a person. Creators are not an entity type
insert_creator(): creator is a property (property_name='creator') on a person entity.
Creates a person insert_creator() creates a person entity and inserts the creator property.
Adds "creator" property
Revoked entities are immutable Revoked entities are immutable
All entity mutations must call: Any mutation on an entity requires ensure_entity_active(cursor, entity_id).
ensure_entity_active(cursor, entity_id) Revoked entities cannot:
Revoked entities CANNOT: Join groups or accept members
Join groups
Accept members
Add/delete properties Add/delete properties
Change public_key Change keys (public_key, symmetrical_key)
Change symmetrical_key
Change status again Change status again
Group CA Reference Rule Logging
group entities must include a non-null ca_reference. All changes to entity, group_member, property, metadata must call log_change(cursor, "...")
person and device must not define ca_reference. Logging happens inside the same transaction (no extra commits).
Enforced at: Python Modules (current structure)
Database level (CHECK constraint)
Python validation level
Property Metadata
Each property includes:
validation_policy
source
Defaults:
validation_policy = 'default'
source = NULL
Property mutations:
Require entity to be active
Must log changes
Metadata Defense Flag
defense_p:
Boolean system-wide flag
Default: false
Must be logged when changed
Logging Rules
All changes to:
entity
group_member
property
metadata
Must call:
log_change(cursor, "...")
Logging:
Happens inside same transaction
No extra commits
Exactly one log row per mutation
Python Modules
ca_core/entity.py ca_core/entity.py
Provides: Must provide:
ensure_entity_active(cursor, entity_id) ensure_entity_active(cursor, entity_id)
@ -235,94 +108,60 @@ insert_creator(cursor, name, public_key)
enroll_person(cursor, name, public_key, creator_id) enroll_person(cursor, name, public_key, creator_id)
create_group(cursor, name, public_key, creator_id, ca_reference) create_group(cursor, name, public_key, creator_id)
create_alias(cursor, target_entity_id)
get_entity(cursor, entity_id) get_entity(cursor, entity_id)
set_entity_status(cursor, entity_id, status, changed_by) set_entity_status(cursor, entity_id, status, changed_by) (requires active entity)
set_entity_keys(cursor, entity_id, public_key, changed_by) set_entity_keys(cursor, entity_id, public_key, changed_by) (active-only)
set_symmetrical_key(cursor, entity_id, key, changed_by) set_symmetrical_key(cursor, entity_id, key, changed_by) (active-only)
get_symmetrical_key(cursor, entity_id) get_symmetrical_key(cursor, entity_id)
ca_core/group_member.py ca_core/group_member.py
Uses member_id Uses member_id (not person_id)
Prevents adding revoked groups/members Must prevent adding revoked groups/members (via ensure_entity_active)
Logs membership add/remove Logs add/remove membership
ca_core/property.py ca_core/property.py
Table: Table is property(id, property_name) (NOT entity_id/name)
property(id, property_name, validation_policy, source) Must reject mutations if entity revoked (immutability)
Rules: Logs set/delete property
Reject mutations if entity revoked
Logs set/delete
Default policy 'default'
validation_policy is CHAR(19)
ca_core/metadata.py ca_core/metadata.py
Updates metadata fields Updates metadata fields and logs changes
Manages defense_p
Logs changes
ca_core/db_logging.py ca_core/db_logging.py
log_change(cursor, message: str)
Inserts into log(entry). log_change(cursor, message: str) inserts into log(entry)
Tests Tests
tests/test_entity.py tests/test_entity.py, tests/test_group.py, tests/test_property.py, tests/test_metadata.py
tests/test_group.py
tests/test_property.py
tests/test_metadata.py
Tests verify: Tests verify:
Creation and enrollment Core behaviors (create, enroll, group membership, revoke immutability)
Group membership Log entry is created for mutations (case-insensitive substring checks)
Revocation immutability
CA reference enforcement
Property metadata fields
defense_p behavior
Log entry creation (case-insensitive substring checks)
Run via: Run via:
python3 -m unittest discover python3 -m unittest discover
Known Gotchas
Do NOT name module logging.py (conflicts with stdlib) Known gotchas
Schema and code must stay aligned: Avoid naming a module logging.py (conflicts with stdlib). Use db_logging.py.
property.id (NOT entity_id) Schema and code must stay aligned (e.g., property.id/property_name, group_member.member_id).
group_member.member_id
entity.ca_reference constraint
CHAR(19) pads values with spaces

View File

@ -1 +0,0 @@
# Crypto HTTP client for Zenroom suite.

View File

@ -1,157 +0,0 @@
import json
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union
JsonLike = Union[Dict[str, Any], Sequence[Any], str, int, float, bool, None]
class ZenroomError(RuntimeError):
"""Raised when Zenroom execution fails."""
class ZenroomDockerClient:
"""Run Zenroom via Docker.
This wrapper is intentionally small and testable. It focuses on:
- Writing inputs (script/data/keys/conf) to a temp workdir
- Running a docker container that executes Zenroom
- Returning parsed JSON output when possible
Assumed container interface (common Zenroom CLI pattern):
zenroom -z -a <data.json> -k <keys.json> -c <conf.json> <script.zen>
If your docker image/entrypoint differs, pass `zenroom_args` accordingly.
Note: This module is named *zenroom_client.py* (not zenroom.py) to avoid
potential import shadowing with future packages/modules.
"""
def __init__(
self,
image: str = "zenroom/zenroom:latest",
docker_bin: str = "docker",
work_mount_path: str = "/work",
zenroom_args: Optional[Sequence[str]] = None,
timeout_s: int = 30,
) -> None:
self.image = image
self.docker_bin = docker_bin
self.work_mount_path = work_mount_path
self.zenroom_args = list(zenroom_args) if zenroom_args is not None else ["zenroom", "-z"]
self.timeout_s = timeout_s
def run(
self,
script: str,
*,
data: Optional[JsonLike] = None,
keys: Optional[JsonLike] = None,
conf: Optional[JsonLike] = None,
extra_docker_args: Optional[Sequence[str]] = None,
extra_zenroom_args: Optional[Sequence[str]] = None,
) -> Union[dict, str]:
"""Execute a Zenroom script.
Args:
script: The Zenroom script (text).
data/keys/conf: Optional JSON-like payloads written to files.
extra_docker_args: Optional extra args inserted after `docker run`.
extra_zenroom_args: Optional extra args appended before the script path.
Returns:
Parsed JSON dict if stdout is valid JSON, otherwise raw stdout string.
Raises:
ZenroomError on non-zero exit.
"""
if not isinstance(script, str) or not script.strip():
raise ValueError("script must be a non-empty string")
with tempfile.TemporaryDirectory(prefix="zenroom_") as tmpdir:
workdir = Path(tmpdir)
# Defensive: TemporaryDirectory normally creates the dir, but tests may mock it.
workdir.mkdir(parents=True, exist_ok=True)
script_path = workdir / "script.zen"
script_path.write_text(script, encoding="utf-8")
data_path = None
keys_path = None
conf_path = None
if data is not None:
data_path = workdir / "data.json"
data_path.write_text(json.dumps(data), encoding="utf-8")
if keys is not None:
keys_path = workdir / "keys.json"
keys_path.write_text(json.dumps(keys), encoding="utf-8")
if conf is not None:
conf_path = workdir / "conf.json"
conf_path.write_text(json.dumps(conf), encoding="utf-8")
cmd = [
self.docker_bin,
"run",
"--rm",
"-i",
]
if extra_docker_args:
cmd.extend(list(extra_docker_args))
# Mount temp dir into container
cmd.extend(
[
"-v",
f"{workdir}:{self.work_mount_path}",
"-w",
self.work_mount_path,
self.image,
]
)
# Build zenroom command
cmd.extend(list(self.zenroom_args))
if data_path is not None:
cmd.extend(["-a", str(Path(self.work_mount_path) / data_path.name)])
if keys_path is not None:
cmd.extend(["-k", str(Path(self.work_mount_path) / keys_path.name)])
if conf_path is not None:
cmd.extend(["-c", str(Path(self.work_mount_path) / conf_path.name)])
if extra_zenroom_args:
cmd.extend(list(extra_zenroom_args))
cmd.append(str(Path(self.work_mount_path) / script_path.name))
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout_s,
check=False,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
msg = stderr or stdout or f"Zenroom failed with exit code {result.returncode}"
raise ZenroomError(msg)
out = (result.stdout or "").strip()
if not out:
return ""
try:
parsed = json.loads(out)
if isinstance(parsed, dict):
return parsed
# Zenroom can output arrays too; keep compatibility.
return {"result": parsed}
except json.JSONDecodeError:
return out

View File

@ -1,148 +0,0 @@
import json
import urllib.request
import urllib.error
from typing import Any, Dict, Optional
class ZenroomServiceError(RuntimeError):
pass
class ZenroomServiceClient:
def __init__(
self,
base_url: str = "http://localhost:3300",
*,
api_prefix: str = "/api",
timeout_s: int = 10,
) -> None:
self.base_url = base_url.rstrip("/")
self.api_prefix = api_prefix.strip()
if self.api_prefix in {"", "/"}:
self.api_prefix = ""
elif not self.api_prefix.startswith("/"):
self.api_prefix = "/" + self.api_prefix
self.timeout_s = timeout_s
def _make_url(self, path: str) -> str:
path = "/" + path.lstrip("/")
return f"{self.base_url}{self.api_prefix}{path}"
def _request_json(
self,
method: str,
path: str,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
url = self._make_url(path)
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method.upper())
try:
with urllib.request.urlopen(req, timeout=self.timeout_s) as resp:
raw = resp.read()
text = raw.decode("utf-8")
except urllib.error.HTTPError as e:
body = ""
try:
body = e.read().decode("utf-8")
except Exception:
pass
raise ZenroomServiceError(f"HTTP {e.code} from {url}: {body or e.reason}") from e
except urllib.error.URLError as e:
raise ZenroomServiceError(f"Failed to reach {url}: {e.reason}") from e
text = text.strip()
if not text:
raise ZenroomServiceError(f"Empty response from {url}")
try:
parsed = json.loads(text)
except json.JSONDecodeError as e:
raise ZenroomServiceError(f"Non-JSON response from {url}: {text[:200]}") from e
if not isinstance(parsed, dict):
raise ZenroomServiceError(f"Expected JSON object from {url}")
return parsed
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
return self._request_json("POST", path, payload)
def _post_data(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]:
res = self._post(path, {"data": data})
if "zenroom_errors" in res or "exception" in res:
exc = res.get("exception", "")
ze = res.get("zenroom_errors")
logs = ""
if isinstance(ze, dict):
logs = str(ze.get("logs", ""))[:800]
raise ZenroomServiceError(f"Zenroom error from {path}: {exc or logs or 'unknown error'}")
return res
@staticmethod
def _require_non_empty_str(name: str, value: str) -> str:
if not isinstance(value, str):
raise TypeError(f"{name} must be a string")
v = value.strip()
if not v:
raise ValueError(f"{name} cannot be empty")
return v
def generate_keypair(self, my_name: str) -> Dict[str, str]:
"""Generate an ECDH keypair using the RESTroom contract:
POST /api/Generate-a-keypair,-reading-identity-from-data
body: { "data": { "myName": "<identity>" } }
Observed response from your service:
{ "<identity>": { "keyring": { "ecdh": "<private_b64>" } } }
Some variants also include:
"ecdh_public_key": "<public_b64>"
This method returns:
{ "private_key": "...", "public_key": "..." } (public_key only if present)
"""
my_name = self._require_non_empty_str("my_name", my_name)
res = self._post_data(
"Generate-a-keypair,-reading-identity-from-data",
{"myName": my_name},
)
if not res:
raise ZenroomServiceError("Empty keypair response")
# Zenroom typically returns { "<identity>": { ... } }
owner = next(iter(res.values()))
if not isinstance(owner, dict):
raise ZenroomServiceError(f"Invalid keypair response structure: {res!r}")
keyring = owner.get("keyring")
if not isinstance(keyring, dict):
raise ZenroomServiceError(f"Invalid keypair response (missing keyring): {res!r}")
private_key = keyring.get("ecdh")
if not isinstance(private_key, str) or not private_key.strip():
raise ZenroomServiceError(f"Invalid keypair response (missing keyring.ecdh): {res!r}")
out: Dict[str, str] = {"private_key": private_key}
public_key = owner.get("ecdh_public_key")
if isinstance(public_key, str) and public_key.strip():
out["public_key"] = public_key
return out

View File

@ -4,7 +4,6 @@ from db_logging import log_change
def ensure_entity_active(cursor, entity_id): def ensure_entity_active(cursor, entity_id):
""" """
Ensure an entity exists and is active. Ensure an entity exists and is active.
Revoked entities are immutable. Revoked entities are immutable.
""" """
cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,)) cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,))
@ -26,43 +25,8 @@ def _validate_ca_reference_for_group(ca_reference):
raise ValueError("ca_reference must be at most 100 characters") raise ValueError("ca_reference must be at most 100 characters")
def is_creator(cursor, entity_id):
"""
Return True if the entity is a creator.
A creator is:
- an entity of type 'person'
- with a row in property where property_name = 'creator'
"""
cursor.execute("SELECT type FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
if row is None:
return False
if row["type"] != "person":
return False
cursor.execute(
"SELECT 1 FROM property WHERE id = %s AND property_name = %s",
(entity_id, "creator"),
)
return cursor.fetchone() is not None
def ensure_creator(cursor, creator_id):
"""
Ensure creator_id exists, is active, and references a creator.
A creator is a 'person' entity that has the 'creator' property.
"""
ensure_entity_active(cursor, creator_id)
if not is_creator(cursor, creator_id):
raise ValueError("creator_id must reference a creator")
def insert_creator(cursor, name, public_key): def insert_creator(cursor, name, public_key):
""" """
Create a creator.
Creators are persons with property 'creator' in the property table. Creators are persons with property 'creator' in the property table.
""" """
cursor.execute( cursor.execute(
@ -90,12 +54,7 @@ def insert_creator(cursor, name, public_key):
def enroll_person(cursor, name, public_key, creator_id): def enroll_person(cursor, name, public_key, creator_id):
""" ensure_entity_active(cursor, creator_id)
Enroll a new person under a creator.
creator_id must refer to an active creator (person + 'creator' property).
"""
ensure_creator(cursor, creator_id)
cursor.execute( cursor.execute(
""" """
@ -112,13 +71,7 @@ def enroll_person(cursor, name, public_key, creator_id):
def create_group(cursor, name, public_key, creator_id, ca_reference): def create_group(cursor, name, public_key, creator_id, ca_reference):
""" ensure_entity_active(cursor, creator_id)
Create a group under a creator.
creator_id must refer to an active creator (person + 'creator' property).
Groups must define a non-empty ca_reference.
"""
ensure_creator(cursor, creator_id)
_validate_ca_reference_for_group(ca_reference) _validate_ca_reference_for_group(ca_reference)
cursor.execute( cursor.execute(
@ -145,8 +98,6 @@ def get_entity(cursor, entity_id):
def set_entity_status(cursor, entity_id, status, changed_by): def set_entity_status(cursor, entity_id, status, changed_by):
""" """
Update entity status.
Only active entities can change status. Once revoked, immutable. Only active entities can change status. Once revoked, immutable.
""" """
ensure_entity_active(cursor, entity_id) ensure_entity_active(cursor, entity_id)

View File

@ -2,67 +2,32 @@ from db_logging import log_change
from entity import ensure_entity_active from entity import ensure_entity_active
def _get_entity_type(cursor, entity_id):
cursor.execute("SELECT type FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
return row["type"] if row else None
def _validate_role(role):
if not isinstance(role, str):
raise TypeError("role must be a string")
r = role.strip()
if not r:
raise ValueError("role must be a non-empty string")
if len(r) > 10:
raise ValueError("role must be at most 10 characters")
return r
def add_group_member(cursor, group_id, member_id, role): def add_group_member(cursor, group_id, member_id, role):
"""Add a member to a group.
Rules:
- group_id must reference an active entity of type 'group'
- member_id must reference an active entity (person/device/group)
- role must be a non-empty string with max length 10
- duplicates are rejected
"""
ensure_entity_active(cursor, group_id) ensure_entity_active(cursor, group_id)
ensure_entity_active(cursor, member_id) ensure_entity_active(cursor, member_id)
if _get_entity_type(cursor, group_id) != "group":
raise ValueError("group_id must reference an entity of type 'group'")
r = _validate_role(role)
cursor.execute(
"SELECT 1 FROM group_member WHERE group_id = %s AND member_id = %s",
(group_id, member_id),
)
if cursor.fetchone() is not None:
raise ValueError("Member is already in the group")
cursor.execute( cursor.execute(
""" """
INSERT INTO group_member (group_id, member_id, role) INSERT INTO group_member (group_id, member_id, role)
VALUES (%s, %s, %s) VALUES (%s, %s, %s)
""", """,
(group_id, member_id, r), (group_id, member_id, role)
) )
log_change(cursor, f"Added member {member_id} to group {group_id} as {r}") log_change(
cursor,
f"Added member {member_id} to group {group_id} as {role}"
)
def get_members_of_group(cursor, group_id): def get_members_of_group(cursor, group_id):
ensure_entity_active(cursor, group_id)
cursor.execute( cursor.execute(
""" """
SELECT member_id, role SELECT member_id, role
FROM group_member FROM group_member
WHERE group_id = %s WHERE group_id = %s
ORDER BY member_id
""", """,
(group_id,), (group_id,)
) )
return cursor.fetchall() return cursor.fetchall()

View File

@ -1,29 +1,12 @@
from db_logging import log_change from db_logging import log_change
def _ensure_singleton_row(cursor):
"""Ensure exactly one metadata row exists.
The metadata table is treated as a singleton at the application level.
Setters must ONLY update the relevant column(s) and must not wipe others.
If the table is empty, a single default row is inserted.
If the table contains more than one row, we raise to avoid ambiguous reads.
"""
cursor.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cursor.fetchone()
cnt = int(row["cnt"]) if row and row["cnt"] is not None else 0
if cnt == 0:
# Rely on column defaults (e.g., defense_p default false) and nullable columns.
cursor.execute("INSERT INTO metadata DEFAULT VALUES")
elif cnt > 1:
raise ValueError("metadata table must contain exactly one row")
def set_name(cursor, name): def set_name(cursor, name):
_ensure_singleton_row(cursor) cursor.execute("DELETE FROM metadata")
cursor.execute("UPDATE metadata SET name = %s", (name,)) cursor.execute(
"INSERT INTO metadata (name) VALUES (%s)",
(name,)
)
log_change(cursor, f"Updated metadata name to {name}") log_change(cursor, f"Updated metadata name to {name}")
@ -34,8 +17,11 @@ def get_name(cursor):
def set_comment(cursor, comment): def set_comment(cursor, comment):
_ensure_singleton_row(cursor) cursor.execute("DELETE FROM metadata")
cursor.execute("UPDATE metadata SET comment = %s", (comment,)) cursor.execute(
"INSERT INTO metadata (comment) VALUES (%s)",
(comment,)
)
log_change(cursor, f"Updated metadata comment to {comment}") log_change(cursor, f"Updated metadata comment to {comment}")
@ -46,10 +32,13 @@ def get_comment(cursor):
def set_keys(cursor, public_key, private_key): def set_keys(cursor, public_key, private_key):
_ensure_singleton_row(cursor) cursor.execute("DELETE FROM metadata")
cursor.execute( cursor.execute(
"UPDATE metadata SET public_key = %s, private_key = %s", """
(public_key, private_key), INSERT INTO metadata (public_key, private_key)
VALUES (%s, %s)
""",
(public_key, private_key)
) )
log_change(cursor, "Updated metadata keys") log_change(cursor, "Updated metadata keys")
@ -70,10 +59,13 @@ def set_defense_p(cursor, defense_p: bool):
"""Set the metadata defense_p flag. """Set the metadata defense_p flag.
This table is treated as a singleton row at the application level. This table is treated as a singleton row at the application level.
This setter updates ONLY defense_p and preserves all other columns. Current convention in this codebase is to wipe and re-insert.
""" """
_ensure_singleton_row(cursor) cursor.execute("DELETE FROM metadata")
cursor.execute("UPDATE metadata SET defense_p = %s", (defense_p,)) cursor.execute(
"INSERT INTO metadata (defense_p) VALUES (%s)",
(defense_p,)
)
log_change(cursor, f"Updated metadata defense_p to {defense_p}") log_change(cursor, f"Updated metadata defense_p to {defense_p}")

View File

@ -68,7 +68,7 @@ def get_properties(cursor, entity_id):
Returns a list of property_name values for the entity. Returns a list of property_name values for the entity.
""" """
cursor.execute( cursor.execute(
"SELECT property_name FROM property WHERE id = %s ORDER BY property_name", "SELECT property_name FROM property WHERE id = %s",
(entity_id,), (entity_id,),
) )
rows = cursor.fetchall() rows = cursor.fetchall()

View File

@ -1,88 +0,0 @@
import os
import sys
import unittest
import subprocess
from pathlib import Path
# Make ca_core importable as the module root (so `import crypto...` works)
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_client import ZenroomDockerClient, ZenroomError
def _docker_ok():
"""Return (ok, reason)."""
try:
p = subprocess.run(
["docker", "version"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
check=False,
)
except FileNotFoundError:
return False, "docker CLI not found"
except Exception as e:
return False, f"docker check failed: {e}"
if p.returncode != 0:
out = (p.stdout or "").strip()
return False, f"docker not usable: {out}"
return True, ""
def _image_exists(image: str):
"""Return (ok, reason)."""
try:
p = subprocess.run(
["docker", "image", "inspect", image],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
check=False,
)
except Exception as e:
return False, f"docker image inspect failed: {e}"
if p.returncode != 0:
return False, f"docker image not found locally: {image}"
return True, ""
class TestZenroomDockerIntegration(unittest.TestCase):
"""Integration tests for ZenroomDockerClient.
Enable by setting:
ZENROOM_DOCKER_INTEGRATION=1
Image selection:
ZENROOM_IMAGE (default: zenroom)
"""
@classmethod
def setUpClass(cls):
if not os.getenv("ZENROOM_DOCKER_INTEGRATION"):
raise unittest.SkipTest("Docker integration disabled (set ZENROOM_DOCKER_INTEGRATION=1)")
ok, reason = _docker_ok()
if not ok:
raise unittest.SkipTest(reason)
cls.image = os.getenv("ZENROOM_IMAGE", "zenroom")
ok, reason = _image_exists(cls.image)
if not ok:
raise unittest.SkipTest(reason + " (build it or set ZENROOM_IMAGE)")
def test_basic_execution(self):
client = ZenroomDockerClient(image=self.image)
out = client.run("print('hello')")
self.assertIn("hello", str(out))
def test_nonzero_exit_raises(self):
client = ZenroomDockerClient(image=self.image)
with self.assertRaises(ZenroomError):
client.run("THIS IS NOT VALID ZENCODE")

View File

@ -1,78 +0,0 @@
import os
import unittest
import sys
from pathlib import Path
# Import from ca_core (same pattern as other tests)
code_path = Path(__file__).parent.parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
def _live_enabled() -> bool:
return os.environ.get("RUN_LIVE_ZENROOM", "").strip().lower() in {
"1", "true", "yes"
}
@unittest.skipUnless(
_live_enabled(),
"Set RUN_LIVE_ZENROOM=1 to run live Zenroom service smoke tests",
)
class TestZenroomLiveServices(unittest.TestCase):
@classmethod
def setUpClass(cls):
base_url = os.environ.get("ZENROOM_BASE_URL", "http://localhost:3300").strip()
api_prefix = os.environ.get("ZENROOM_API_PREFIX", "/api").strip()
timeout_s = int(os.environ.get("ZENROOM_TIMEOUT_S", "20"))
cls.client = ZenroomServiceClient(
base_url=base_url,
api_prefix=api_prefix,
timeout_s=timeout_s,
)
def test_keypair_reading_identity_from_data(self):
"""
Tests:
POST /api/Generate-a-keypair,-reading-identity-from-data
Payload wrapped as {"data": {"myName": "..."}}
"""
res = self.client.generate_a_keypair_reading_identity_from_data(
"LiveUser123456"
)
self.assertIn("public_key", res)
self.assertIn("private_key", res)
self.assertIsInstance(res["public_key"], str)
self.assertIsInstance(res["private_key"], str)
self.assertTrue(res["public_key"])
self.assertTrue(res["private_key"])
def test_encrypt_decrypt_password_roundtrip(self):
"""
Tests:
POST /api/Encrypt-a-message-with-the-password
POST /api/Decrypt-the-message-with-the-password
"""
plaintext = "Dear Bob, your name is too short, goodbye - Alice."
encrypted = self.client.encrypt_a_message_with_the_password(
header="A very important secret",
message=plaintext,
password="myVerySecretPassword",
)
for k in ("checksum", "header", "iv", "text"):
self.assertIn(k, encrypted)
self.assertIsInstance(encrypted[k], str)
self.assertTrue(encrypted[k])
decrypted = self.client.decrypt_the_message_with_the_password(
secret_message=encrypted,
password="myVerySecretPassword",
)
self.assertEqual(decrypted.get("textDecrypted"), plaintext)

View File

@ -1,30 +0,0 @@
import os
import unittest
import sys
from pathlib import Path
code_path = Path(__file__).parents[2] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
class TestZenroomServiceClientIntegration(unittest.TestCase):
@unittest.skipUnless(
os.getenv("ZENROOM_BASE_URL"),
"No ZENROOM_BASE_URL set",
)
def test_generate_keypair_real_service(self):
client = ZenroomServiceClient(base_url=os.environ["ZENROOM_BASE_URL"])
res = client.generate_keypair("IntegrationUser")
self.assertIn("private_key", res)
self.assertIsInstance(res["private_key"], str)
self.assertTrue(res["private_key"].strip())
# public_key may or may not be returned by this contract variant
if "public_key" in res:
self.assertIsInstance(res["public_key"], str)
self.assertTrue(res["public_key"].strip())

View File

@ -82,32 +82,3 @@ class TestEntityFunctions(unittest.TestCase):
log_entry = get_last_log(self.cur).lower() log_entry = get_last_log(self.cur).lower()
self.assertIn("symmetrical_key", log_entry) self.assertIn("symmetrical_key", log_entry)
def test_is_creator_true(self):
creator_id = entity.insert_creator(self.cur, "CreatorCheck", "pubkeyX")
self.assertTrue(entity.is_creator(self.cur, creator_id))
def test_is_creator_false_for_normal_person(self):
creator_id = entity.insert_creator(self.cur, "CreatorBase", "pubkeyBase")
person_id = entity.enroll_person(self.cur, "NormalPerson", "pubkeyP", creator_id)
self.assertFalse(entity.is_creator(self.cur, person_id))
def test_is_creator_false_for_group(self):
creator_id = entity.insert_creator(self.cur, "CreatorGroup", "pubkeyG")
group_id = entity.create_group(self.cur, "GroupX", "pubkeyGX", creator_id, "CA-REF-X")
self.assertFalse(entity.is_creator(self.cur, group_id))
def test_enroll_person_requires_creator(self):
creator_id = entity.insert_creator(self.cur, "CreatorY", "pubkeyY")
person_id = entity.enroll_person(self.cur, "PersonZ", "pubkeyZ", creator_id)
with self.assertRaises(ValueError):
entity.enroll_person(self.cur, "IllegalEnroll", "pubkeyBad", person_id)
def test_create_group_requires_creator(self):
creator_id = entity.insert_creator(self.cur, "CreatorZ", "pubkeyZ")
person_id = entity.enroll_person(self.cur, "PersonA", "pubkeyA", creator_id)
with self.assertRaises(ValueError):
entity.create_group(self.cur, "BadGroup", "pubkeyBG", person_id, "CA-REF-BAD")

View File

@ -86,33 +86,3 @@ class TestGroupFunctions(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member") group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_group_id_must_be_group(self):
creator_id = entity.insert_creator(self.cur, "CreatorType", "pubkeyT")
person_id = entity.enroll_person(self.cur, "PersonType", "pubkeyPT", creator_id)
other_person = entity.enroll_person(self.cur, "PersonType2", "pubkeyPT2", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, person_id, other_person, "member")
def test_role_validation(self):
creator_id = entity.insert_creator(self.cur, "CreatorRole", "pubkeyR")
group_id = entity.create_group(self.cur, "GroupRole", "pubkeyGR", creator_id, "CA-ROLE")
person_id = entity.enroll_person(self.cur, "PersonRole", "pubkeyPR", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "")
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "x" * 11)
def test_duplicate_membership_rejected(self):
creator_id = entity.insert_creator(self.cur, "CreatorDup", "pubkeyD")
group_id = entity.create_group(self.cur, "GroupDup", "pubkeyGD", creator_id, "CA-DUP")
person_id = entity.enroll_person(self.cur, "PersonDup", "pubkeyPD", creator_id)
group_member.add_group_member(self.cur, group_id, person_id, "member")
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")

View File

@ -1,15 +0,0 @@
import os
import unittest
from crypto.zenroom_service_client import ZenroomServiceClient
class TestZenroomHTTPIntegration(unittest.TestCase):
@unittest.skipUnless(os.getenv("ZENROOM_BASE_URL"), "No ZENROOM_BASE_URL set")
def test_sign_and_verify_roundtrip(self):
base_url = os.environ["ZENROOM_BASE_URL"]
client = ZenroomServiceClient(base_url=base_url)
# You must supply real keys here for full integration
self.assertTrue(True)

View File

@ -64,11 +64,3 @@ class TestMetadataFunctions(unittest.TestCase):
log_entry = get_last_log(self.cur).lower() log_entry = get_last_log(self.cur).lower()
self.assertIn("defense_p", log_entry) self.assertIn("defense_p", log_entry)
def test_metadata_setters_do_not_wipe_other_fields(self):
metadata.set_name(self.cur, "PreserveMe")
metadata.set_defense_p(self.cur, True)
# defense_p should be updated, and name should still be present.
self.assertTrue(metadata.get_defense_p(self.cur))
self.assertEqual(metadata.get_name(self.cur), "PreserveMe")

View File

@ -1,90 +0,0 @@
import unittest
import sys
from pathlib import Path
from unittest import mock
# Allow imports from ca_core (same pattern as existing tests)
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_client import ZenroomDockerClient, ZenroomError
class TestZenroomDockerClient(unittest.TestCase):
def _fake_completed(self, returncode=0, stdout="", stderr=""):
cp = mock.Mock()
cp.returncode = returncode
cp.stdout = stdout
cp.stderr = stderr
return cp
@mock.patch("crypto.zenroom_client.subprocess.run")
def test_run_builds_expected_docker_command(self, m_run):
m_run.return_value = self._fake_completed(stdout='{"ok": true}')
client = ZenroomDockerClient(image="zenroom/zenroom:latest")
# Patch temp dir so we can assert paths deterministically
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
m_td.return_value.__exit__.return_value = False
res = client.run("print('hi')", data={"a": 1}, keys={"k": "v"}, conf={"c": 2})
self.assertEqual(res, {"ok": True})
args, kwargs = m_run.call_args
cmd = args[0]
self.assertIn("docker", cmd[0])
self.assertIn("run", cmd)
self.assertIn("zenroom/zenroom:latest", cmd)
# Mount and workdir
self.assertIn("-v", cmd)
self.assertIn("/tmp/zenroom_test:/work", cmd)
self.assertIn("-w", cmd)
self.assertIn("/work", cmd)
# Zenroom base args
self.assertIn("zenroom", cmd)
self.assertIn("-z", cmd)
# Input files flags should be present
self.assertIn("-a", cmd)
self.assertIn("/work/data.json", cmd)
self.assertIn("-k", cmd)
self.assertIn("/work/keys.json", cmd)
self.assertIn("-c", cmd)
self.assertIn("/work/conf.json", cmd)
# Script at end
self.assertEqual(cmd[-1], "/work/script.zen")
# subprocess.run called with capture_output/text
self.assertTrue(kwargs.get("capture_output"))
self.assertTrue(kwargs.get("text"))
@mock.patch("crypto.zenroom_client.subprocess.run")
def test_run_returns_raw_stdout_when_not_json(self, m_run):
m_run.return_value = self._fake_completed(stdout="hello")
client = ZenroomDockerClient()
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
m_td.return_value.__exit__.return_value = False
out = client.run("print('hi')")
self.assertEqual(out, "hello")
@mock.patch("crypto.zenroom_client.subprocess.run")
def test_run_raises_on_nonzero_exit(self, m_run):
m_run.return_value = self._fake_completed(returncode=1, stderr="boom")
client = ZenroomDockerClient()
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
m_td.return_value.__exit__.return_value = False
with self.assertRaises(ZenroomError) as ctx:
client.run("print('hi')")
self.assertIn("boom", str(ctx.exception))
def test_run_requires_non_empty_script(self):
client = ZenroomDockerClient()
with self.assertRaises(ValueError):
client.run(" ")

View File

@ -1,67 +0,0 @@
import json
import unittest
from unittest import mock
import sys
from pathlib import Path
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
class _FakeHTTPResponse:
def __init__(self, body: bytes):
self._body = body
def read(self):
return self._body
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class TestZenroomServiceClient(unittest.TestCase):
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_unpacks_private_key(self, m_urlopen):
payload = {
"IntegrationUser": {
"keyring": {"ecdh": "PRIVKEY"},
}
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("IntegrationUser")
self.assertEqual(res["private_key"], "PRIVKEY")
self.assertNotIn("public_key", res)
req = m_urlopen.call_args[0][0]
self.assertEqual(req.method, "POST")
self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(sent, {"data": {"myName": "IntegrationUser"}})
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_includes_public_key_if_present(self, m_urlopen):
payload = {
"IntegrationUser": {
"ecdh_public_key": "PUBKEY",
"keyring": {"ecdh": "PRIVKEY"},
}
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("IntegrationUser")
self.assertEqual(res["private_key"], "PRIVKEY")
self.assertEqual(res["public_key"], "PUBKEY")

View File

@ -1,55 +0,0 @@
import json
import unittest
from unittest import mock
import sys
from pathlib import Path
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
class _FakeHTTPResponse:
def __init__(self, body: bytes):
self._body = body
def read(self):
return self._body
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class TestZenroomServiceClient(unittest.TestCase):
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_unpacks_keys(self, m_urlopen):
payload = {
"Owner": {
"ecdh_public_key": "PUBKEY",
"keyring": {"ecdh": "PRIVKEY"},
}
}
m_urlopen.return_value = _FakeHTTPResponse(
json.dumps(payload).encode("utf-8")
)
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("User123")
self.assertEqual(res["public_key"], "PUBKEY")
self.assertEqual(res["private_key"], "PRIVKEY")
req = m_urlopen.call_args[0][0]
self.assertEqual(req.method, "POST")
self.assertTrue(
req.full_url.endswith(
"/api/Generate-a-keypair,-reading-identity-from-data"
)
)