Compare commits

...

2 Commits

Author SHA1 Message Date
Morten V. Christiansen c560407c74 integration zemroo, image 2026-03-02 16:22:51 +01:00
Morten V. Christiansen 35752bce6b refactors 2026-02-27 07:27:19 +01:00
35 changed files with 1131 additions and 82 deletions

View File

@ -1,34 +1,61 @@
CA/PKI Backend Project Context
CA/PKI Backend Project Context (Updated)
Stack
Python 3 + psycopg (dict_row cursors)
PostgreSQL database: ca
Unit tests: unittest (python3 -m unittest discover)
Unit tests: unittest
Run via: python3 -m unittest discover
Current test count: 17
Database Schema (current assumptions)
entity
id INT identity PK
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY
creation_ts TIMESTAMPTZ default now()
creation_ts TIMESTAMPTZ DEFAULT now()
creator INT FK → entity(id) (the entity that created this one; nullable)
creator INT FK → entity(id) (nullable)
name VARCHAR(100) NOT NULL
type VARCHAR(...) NOT NULL (e.g. person, group, device, alias)
type VARCHAR(...) NOT NULL
Allowed types: person, group, device
public_key VARCHAR(300) NOT NULL
symmetrical_key VARCHAR(100) NULL
status VARCHAR(...) NOT NULL default 'active' (values: 'active', 'revoked')
status VARCHAR(...) NOT NULL DEFAULT 'active'
Values: 'active', 'revoked'
expiration DATE NULL
Index on entity(name) (and other indexes as needed)
ca_reference VARCHAR(100) NULL
Constraint
CHECK (
(type = 'group' AND ca_reference IS NOT NULL)
OR
(type <> 'group' AND ca_reference IS NULL)
)
Rule:
Groups MUST have a ca_reference
All other entity types MUST have ca_reference IS NULL
Indexes:
Index on entity(name)
Other indexes as needed
group_member
@ -38,69 +65,169 @@ member_id INT FK → entity(id) ON DELETE CASCADE
role VARCHAR(10)
PK (group_id, member_id)
PRIMARY KEY (group_id, member_id)
Index (member_id, group_id)
Groups can contain any entity type, including other groups and devices.
Groups can contain:
persons
devices
other groups
property
Columns: (id INT FK → entity(id), property_name VARCHAR(100))
id INT FK → entity(id) ON DELETE CASCADE
PK (id, property_name)
property_name VARCHAR(100) NOT NULL
Used for flags/roles such as "creator"
validation_policy CHAR(19) NOT NULL DEFAULT 'default'
source VARCHAR(150) NULL
PRIMARY KEY (id, property_name)
Notes:
validation_policy is CHAR(19) and padded by PostgreSQL.
Used for flags/roles such as "creator".
metadata
Intended “singleton row” table, enforced at application level
Singleton row table (enforced at application level).
Columns: name, comment, private_key, public_key
Columns:
name
comment
private_key
public_key
defense_p BOOLEAN NOT NULL DEFAULT false
defense_p:
Global system flag.
Logged on change.
log
id SERIAL PK
id SERIAL PRIMARY KEY
ts TIMESTAMPTZ default now()
ts TIMESTAMPTZ DEFAULT now()
entry TEXT NOT NULL
Every API mutation must log one row here.
Every API mutation must insert exactly one row here.
Core Business Rules
Creators are NOT an entity type
Creators are not an entity type
"creator" is a property (property_name='creator') on a person.
creator is a property (property_name='creator') on a person entity.
insert_creator():
insert_creator() creates a person entity and inserts the creator property.
Creates a person
Adds "creator" property
Revoked entities are immutable
Any mutation on an entity requires ensure_entity_active(cursor, entity_id).
All entity mutations must call:
Revoked entities cannot:
ensure_entity_active(cursor, entity_id)
Join groups or accept members
Revoked entities CANNOT:
Join groups
Accept members
Add/delete properties
Change keys (public_key, symmetrical_key)
Change public_key
Change symmetrical_key
Change status again
Logging
Group CA Reference Rule
All changes to entity, group_member, property, metadata must call log_change(cursor, "...")
group entities must include a non-null ca_reference.
Logging happens inside the same transaction (no extra commits).
person and device must not define ca_reference.
Python Modules (current structure)
Enforced at:
Database level (CHECK constraint)
Python validation level
Property Metadata
Each property includes:
validation_policy
source
Defaults:
validation_policy = 'default'
source = NULL
Property mutations:
Require entity to be active
Must log changes
Metadata Defense Flag
defense_p:
Boolean system-wide flag
Default: false
Must be logged when changed
Logging Rules
All changes to:
entity
group_member
property
metadata
Must call:
log_change(cursor, "...")
Logging:
Happens inside same transaction
No extra commits
Exactly one log row per mutation
Python Modules
ca_core/entity.py
Must provide:
Provides:
ensure_entity_active(cursor, entity_id)
@ -108,60 +235,94 @@ insert_creator(cursor, name, public_key)
enroll_person(cursor, name, public_key, creator_id)
create_group(cursor, name, public_key, creator_id)
create_alias(cursor, target_entity_id)
create_group(cursor, name, public_key, creator_id, ca_reference)
get_entity(cursor, entity_id)
set_entity_status(cursor, entity_id, status, changed_by) (requires active entity)
set_entity_status(cursor, entity_id, status, changed_by)
set_entity_keys(cursor, entity_id, public_key, changed_by) (active-only)
set_entity_keys(cursor, entity_id, public_key, changed_by)
set_symmetrical_key(cursor, entity_id, key, changed_by) (active-only)
set_symmetrical_key(cursor, entity_id, key, changed_by)
get_symmetrical_key(cursor, entity_id)
ca_core/group_member.py
Uses member_id (not person_id)
Uses member_id
Must prevent adding revoked groups/members (via ensure_entity_active)
Prevents adding revoked groups/members
Logs add/remove membership
Logs membership add/remove
ca_core/property.py
Table is property(id, property_name) (NOT entity_id/name)
Table:
Must reject mutations if entity revoked (immutability)
property(id, property_name, validation_policy, source)
Logs set/delete property
Rules:
Reject mutations if entity revoked
Logs set/delete
Default policy 'default'
validation_policy is CHAR(19)
ca_core/metadata.py
Updates metadata fields and logs changes
Updates metadata fields
Manages defense_p
Logs changes
ca_core/db_logging.py
log_change(cursor, message: str)
log_change(cursor, message: str) inserts into log(entry)
Inserts into log(entry).
Tests
tests/test_entity.py, tests/test_group.py, tests/test_property.py, tests/test_metadata.py
tests/test_entity.py
tests/test_group.py
tests/test_property.py
tests/test_metadata.py
Tests verify:
Core behaviors (create, enroll, group membership, revoke immutability)
Creation and enrollment
Log entry is created for mutations (case-insensitive substring checks)
Group membership
Revocation immutability
CA reference enforcement
Property metadata fields
defense_p behavior
Log entry creation (case-insensitive substring checks)
Run via:
python3 -m unittest discover
Known Gotchas
Known gotchas
Do NOT name module logging.py (conflicts with stdlib)
Avoid naming a module logging.py (conflicts with stdlib). Use db_logging.py.
Schema and code must stay aligned:
Schema and code must stay aligned (e.g., property.id/property_name, group_member.member_id).
property.id (NOT entity_id)
group_member.member_id
entity.ca_reference constraint
CHAR(19) pads values with spaces

View File

@ -0,0 +1 @@
# Crypto HTTP client for Zenroom suite.

Binary file not shown.

View File

@ -0,0 +1,157 @@
import json
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union
JsonLike = Union[Dict[str, Any], Sequence[Any], str, int, float, bool, None]
class ZenroomError(RuntimeError):
"""Raised when Zenroom execution fails."""
class ZenroomDockerClient:
"""Run Zenroom via Docker.
This wrapper is intentionally small and testable. It focuses on:
- Writing inputs (script/data/keys/conf) to a temp workdir
- Running a docker container that executes Zenroom
- Returning parsed JSON output when possible
Assumed container interface (common Zenroom CLI pattern):
zenroom -z -a <data.json> -k <keys.json> -c <conf.json> <script.zen>
If your docker image/entrypoint differs, pass `zenroom_args` accordingly.
Note: This module is named *zenroom_client.py* (not zenroom.py) to avoid
potential import shadowing with future packages/modules.
"""
def __init__(
self,
image: str = "zenroom/zenroom:latest",
docker_bin: str = "docker",
work_mount_path: str = "/work",
zenroom_args: Optional[Sequence[str]] = None,
timeout_s: int = 30,
) -> None:
self.image = image
self.docker_bin = docker_bin
self.work_mount_path = work_mount_path
self.zenroom_args = list(zenroom_args) if zenroom_args is not None else ["zenroom", "-z"]
self.timeout_s = timeout_s
def run(
self,
script: str,
*,
data: Optional[JsonLike] = None,
keys: Optional[JsonLike] = None,
conf: Optional[JsonLike] = None,
extra_docker_args: Optional[Sequence[str]] = None,
extra_zenroom_args: Optional[Sequence[str]] = None,
) -> Union[dict, str]:
"""Execute a Zenroom script.
Args:
script: The Zenroom script (text).
data/keys/conf: Optional JSON-like payloads written to files.
extra_docker_args: Optional extra args inserted after `docker run`.
extra_zenroom_args: Optional extra args appended before the script path.
Returns:
Parsed JSON dict if stdout is valid JSON, otherwise raw stdout string.
Raises:
ZenroomError on non-zero exit.
"""
if not isinstance(script, str) or not script.strip():
raise ValueError("script must be a non-empty string")
with tempfile.TemporaryDirectory(prefix="zenroom_") as tmpdir:
workdir = Path(tmpdir)
# Defensive: TemporaryDirectory normally creates the dir, but tests may mock it.
workdir.mkdir(parents=True, exist_ok=True)
script_path = workdir / "script.zen"
script_path.write_text(script, encoding="utf-8")
data_path = None
keys_path = None
conf_path = None
if data is not None:
data_path = workdir / "data.json"
data_path.write_text(json.dumps(data), encoding="utf-8")
if keys is not None:
keys_path = workdir / "keys.json"
keys_path.write_text(json.dumps(keys), encoding="utf-8")
if conf is not None:
conf_path = workdir / "conf.json"
conf_path.write_text(json.dumps(conf), encoding="utf-8")
cmd = [
self.docker_bin,
"run",
"--rm",
"-i",
]
if extra_docker_args:
cmd.extend(list(extra_docker_args))
# Mount temp dir into container
cmd.extend(
[
"-v",
f"{workdir}:{self.work_mount_path}",
"-w",
self.work_mount_path,
self.image,
]
)
# Build zenroom command
cmd.extend(list(self.zenroom_args))
if data_path is not None:
cmd.extend(["-a", str(Path(self.work_mount_path) / data_path.name)])
if keys_path is not None:
cmd.extend(["-k", str(Path(self.work_mount_path) / keys_path.name)])
if conf_path is not None:
cmd.extend(["-c", str(Path(self.work_mount_path) / conf_path.name)])
if extra_zenroom_args:
cmd.extend(list(extra_zenroom_args))
cmd.append(str(Path(self.work_mount_path) / script_path.name))
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout_s,
check=False,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
msg = stderr or stdout or f"Zenroom failed with exit code {result.returncode}"
raise ZenroomError(msg)
out = (result.stdout or "").strip()
if not out:
return ""
try:
parsed = json.loads(out)
if isinstance(parsed, dict):
return parsed
# Zenroom can output arrays too; keep compatibility.
return {"result": parsed}
except json.JSONDecodeError:
return out

View File

@ -0,0 +1,148 @@
import json
import urllib.request
import urllib.error
from typing import Any, Dict, Optional
class ZenroomServiceError(RuntimeError):
pass
class ZenroomServiceClient:
def __init__(
self,
base_url: str = "http://localhost:3300",
*,
api_prefix: str = "/api",
timeout_s: int = 10,
) -> None:
self.base_url = base_url.rstrip("/")
self.api_prefix = api_prefix.strip()
if self.api_prefix in {"", "/"}:
self.api_prefix = ""
elif not self.api_prefix.startswith("/"):
self.api_prefix = "/" + self.api_prefix
self.timeout_s = timeout_s
def _make_url(self, path: str) -> str:
path = "/" + path.lstrip("/")
return f"{self.base_url}{self.api_prefix}{path}"
def _request_json(
self,
method: str,
path: str,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
url = self._make_url(path)
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method.upper())
try:
with urllib.request.urlopen(req, timeout=self.timeout_s) as resp:
raw = resp.read()
text = raw.decode("utf-8")
except urllib.error.HTTPError as e:
body = ""
try:
body = e.read().decode("utf-8")
except Exception:
pass
raise ZenroomServiceError(f"HTTP {e.code} from {url}: {body or e.reason}") from e
except urllib.error.URLError as e:
raise ZenroomServiceError(f"Failed to reach {url}: {e.reason}") from e
text = text.strip()
if not text:
raise ZenroomServiceError(f"Empty response from {url}")
try:
parsed = json.loads(text)
except json.JSONDecodeError as e:
raise ZenroomServiceError(f"Non-JSON response from {url}: {text[:200]}") from e
if not isinstance(parsed, dict):
raise ZenroomServiceError(f"Expected JSON object from {url}")
return parsed
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
return self._request_json("POST", path, payload)
def _post_data(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]:
res = self._post(path, {"data": data})
if "zenroom_errors" in res or "exception" in res:
exc = res.get("exception", "")
ze = res.get("zenroom_errors")
logs = ""
if isinstance(ze, dict):
logs = str(ze.get("logs", ""))[:800]
raise ZenroomServiceError(f"Zenroom error from {path}: {exc or logs or 'unknown error'}")
return res
@staticmethod
def _require_non_empty_str(name: str, value: str) -> str:
if not isinstance(value, str):
raise TypeError(f"{name} must be a string")
v = value.strip()
if not v:
raise ValueError(f"{name} cannot be empty")
return v
def generate_keypair(self, my_name: str) -> Dict[str, str]:
"""Generate an ECDH keypair using the RESTroom contract:
POST /api/Generate-a-keypair,-reading-identity-from-data
body: { "data": { "myName": "<identity>" } }
Observed response from your service:
{ "<identity>": { "keyring": { "ecdh": "<private_b64>" } } }
Some variants also include:
"ecdh_public_key": "<public_b64>"
This method returns:
{ "private_key": "...", "public_key": "..." } (public_key only if present)
"""
my_name = self._require_non_empty_str("my_name", my_name)
res = self._post_data(
"Generate-a-keypair,-reading-identity-from-data",
{"myName": my_name},
)
if not res:
raise ZenroomServiceError("Empty keypair response")
# Zenroom typically returns { "<identity>": { ... } }
owner = next(iter(res.values()))
if not isinstance(owner, dict):
raise ZenroomServiceError(f"Invalid keypair response structure: {res!r}")
keyring = owner.get("keyring")
if not isinstance(keyring, dict):
raise ZenroomServiceError(f"Invalid keypair response (missing keyring): {res!r}")
private_key = keyring.get("ecdh")
if not isinstance(private_key, str) or not private_key.strip():
raise ZenroomServiceError(f"Invalid keypair response (missing keyring.ecdh): {res!r}")
out: Dict[str, str] = {"private_key": private_key}
public_key = owner.get("ecdh_public_key")
if isinstance(public_key, str) and public_key.strip():
out["public_key"] = public_key
return out

View File

@ -4,6 +4,7 @@ from db_logging import log_change
def ensure_entity_active(cursor, entity_id):
"""
Ensure an entity exists and is active.
Revoked entities are immutable.
"""
cursor.execute("SELECT status FROM entity WHERE id = %s", (entity_id,))
@ -25,8 +26,43 @@ def _validate_ca_reference_for_group(ca_reference):
raise ValueError("ca_reference must be at most 100 characters")
def is_creator(cursor, entity_id):
"""
Return True if the entity is a creator.
A creator is:
- an entity of type 'person'
- with a row in property where property_name = 'creator'
"""
cursor.execute("SELECT type FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
if row is None:
return False
if row["type"] != "person":
return False
cursor.execute(
"SELECT 1 FROM property WHERE id = %s AND property_name = %s",
(entity_id, "creator"),
)
return cursor.fetchone() is not None
def ensure_creator(cursor, creator_id):
"""
Ensure creator_id exists, is active, and references a creator.
A creator is a 'person' entity that has the 'creator' property.
"""
ensure_entity_active(cursor, creator_id)
if not is_creator(cursor, creator_id):
raise ValueError("creator_id must reference a creator")
def insert_creator(cursor, name, public_key):
"""
Create a creator.
Creators are persons with property 'creator' in the property table.
"""
cursor.execute(
@ -54,7 +90,12 @@ def insert_creator(cursor, name, public_key):
def enroll_person(cursor, name, public_key, creator_id):
ensure_entity_active(cursor, creator_id)
"""
Enroll a new person under a creator.
creator_id must refer to an active creator (person + 'creator' property).
"""
ensure_creator(cursor, creator_id)
cursor.execute(
"""
@ -71,7 +112,13 @@ def enroll_person(cursor, name, public_key, creator_id):
def create_group(cursor, name, public_key, creator_id, ca_reference):
ensure_entity_active(cursor, creator_id)
"""
Create a group under a creator.
creator_id must refer to an active creator (person + 'creator' property).
Groups must define a non-empty ca_reference.
"""
ensure_creator(cursor, creator_id)
_validate_ca_reference_for_group(ca_reference)
cursor.execute(
@ -98,6 +145,8 @@ def get_entity(cursor, entity_id):
def set_entity_status(cursor, entity_id, status, changed_by):
"""
Update entity status.
Only active entities can change status. Once revoked, immutable.
"""
ensure_entity_active(cursor, entity_id)

View File

@ -2,32 +2,67 @@ from db_logging import log_change
from entity import ensure_entity_active
def _get_entity_type(cursor, entity_id):
cursor.execute("SELECT type FROM entity WHERE id = %s", (entity_id,))
row = cursor.fetchone()
return row["type"] if row else None
def _validate_role(role):
if not isinstance(role, str):
raise TypeError("role must be a string")
r = role.strip()
if not r:
raise ValueError("role must be a non-empty string")
if len(r) > 10:
raise ValueError("role must be at most 10 characters")
return r
def add_group_member(cursor, group_id, member_id, role):
"""Add a member to a group.
Rules:
- group_id must reference an active entity of type 'group'
- member_id must reference an active entity (person/device/group)
- role must be a non-empty string with max length 10
- duplicates are rejected
"""
ensure_entity_active(cursor, group_id)
ensure_entity_active(cursor, member_id)
if _get_entity_type(cursor, group_id) != "group":
raise ValueError("group_id must reference an entity of type 'group'")
r = _validate_role(role)
cursor.execute(
"SELECT 1 FROM group_member WHERE group_id = %s AND member_id = %s",
(group_id, member_id),
)
if cursor.fetchone() is not None:
raise ValueError("Member is already in the group")
cursor.execute(
"""
INSERT INTO group_member (group_id, member_id, role)
VALUES (%s, %s, %s)
""",
(group_id, member_id, role)
(group_id, member_id, r),
)
log_change(
cursor,
f"Added member {member_id} to group {group_id} as {role}"
)
log_change(cursor, f"Added member {member_id} to group {group_id} as {r}")
def get_members_of_group(cursor, group_id):
ensure_entity_active(cursor, group_id)
cursor.execute(
"""
SELECT member_id, role
FROM group_member
WHERE group_id = %s
ORDER BY member_id
""",
(group_id,)
(group_id,),
)
return cursor.fetchall()

View File

@ -1,12 +1,29 @@
from db_logging import log_change
def _ensure_singleton_row(cursor):
"""Ensure exactly one metadata row exists.
The metadata table is treated as a singleton at the application level.
Setters must ONLY update the relevant column(s) and must not wipe others.
If the table is empty, a single default row is inserted.
If the table contains more than one row, we raise to avoid ambiguous reads.
"""
cursor.execute("SELECT COUNT(*) AS cnt FROM metadata")
row = cursor.fetchone()
cnt = int(row["cnt"]) if row and row["cnt"] is not None else 0
if cnt == 0:
# Rely on column defaults (e.g., defense_p default false) and nullable columns.
cursor.execute("INSERT INTO metadata DEFAULT VALUES")
elif cnt > 1:
raise ValueError("metadata table must contain exactly one row")
def set_name(cursor, name):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (name) VALUES (%s)",
(name,)
)
_ensure_singleton_row(cursor)
cursor.execute("UPDATE metadata SET name = %s", (name,))
log_change(cursor, f"Updated metadata name to {name}")
@ -17,11 +34,8 @@ def get_name(cursor):
def set_comment(cursor, comment):
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (comment) VALUES (%s)",
(comment,)
)
_ensure_singleton_row(cursor)
cursor.execute("UPDATE metadata SET comment = %s", (comment,))
log_change(cursor, f"Updated metadata comment to {comment}")
@ -32,13 +46,10 @@ def get_comment(cursor):
def set_keys(cursor, public_key, private_key):
cursor.execute("DELETE FROM metadata")
_ensure_singleton_row(cursor)
cursor.execute(
"""
INSERT INTO metadata (public_key, private_key)
VALUES (%s, %s)
""",
(public_key, private_key)
"UPDATE metadata SET public_key = %s, private_key = %s",
(public_key, private_key),
)
log_change(cursor, "Updated metadata keys")
@ -59,13 +70,10 @@ def set_defense_p(cursor, defense_p: bool):
"""Set the metadata defense_p flag.
This table is treated as a singleton row at the application level.
Current convention in this codebase is to wipe and re-insert.
This setter updates ONLY defense_p and preserves all other columns.
"""
cursor.execute("DELETE FROM metadata")
cursor.execute(
"INSERT INTO metadata (defense_p) VALUES (%s)",
(defense_p,)
)
_ensure_singleton_row(cursor)
cursor.execute("UPDATE metadata SET defense_p = %s", (defense_p,))
log_change(cursor, f"Updated metadata defense_p to {defense_p}")

View File

@ -68,7 +68,7 @@ def get_properties(cursor, entity_id):
Returns a list of property_name values for the entity.
"""
cursor.execute(
"SELECT property_name FROM property WHERE id = %s",
"SELECT property_name FROM property WHERE id = %s ORDER BY property_name",
(entity_id,),
)
rows = cursor.fetchall()

Binary file not shown.

View File

@ -0,0 +1,88 @@
import os
import sys
import unittest
import subprocess
from pathlib import Path
# Make ca_core importable as the module root (so `import crypto...` works)
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_client import ZenroomDockerClient, ZenroomError
def _docker_ok():
"""Return (ok, reason)."""
try:
p = subprocess.run(
["docker", "version"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
check=False,
)
except FileNotFoundError:
return False, "docker CLI not found"
except Exception as e:
return False, f"docker check failed: {e}"
if p.returncode != 0:
out = (p.stdout or "").strip()
return False, f"docker not usable: {out}"
return True, ""
def _image_exists(image: str):
"""Return (ok, reason)."""
try:
p = subprocess.run(
["docker", "image", "inspect", image],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
check=False,
)
except Exception as e:
return False, f"docker image inspect failed: {e}"
if p.returncode != 0:
return False, f"docker image not found locally: {image}"
return True, ""
class TestZenroomDockerIntegration(unittest.TestCase):
"""Integration tests for ZenroomDockerClient.
Enable by setting:
ZENROOM_DOCKER_INTEGRATION=1
Image selection:
ZENROOM_IMAGE (default: zenroom)
"""
@classmethod
def setUpClass(cls):
if not os.getenv("ZENROOM_DOCKER_INTEGRATION"):
raise unittest.SkipTest("Docker integration disabled (set ZENROOM_DOCKER_INTEGRATION=1)")
ok, reason = _docker_ok()
if not ok:
raise unittest.SkipTest(reason)
cls.image = os.getenv("ZENROOM_IMAGE", "zenroom")
ok, reason = _image_exists(cls.image)
if not ok:
raise unittest.SkipTest(reason + " (build it or set ZENROOM_IMAGE)")
def test_basic_execution(self):
client = ZenroomDockerClient(image=self.image)
out = client.run("print('hello')")
self.assertIn("hello", str(out))
def test_nonzero_exit_raises(self):
client = ZenroomDockerClient(image=self.image)
with self.assertRaises(ZenroomError):
client.run("THIS IS NOT VALID ZENCODE")

View File

@ -0,0 +1,78 @@
import os
import unittest
import sys
from pathlib import Path
# Import from ca_core (same pattern as other tests)
code_path = Path(__file__).parent.parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
def _live_enabled() -> bool:
return os.environ.get("RUN_LIVE_ZENROOM", "").strip().lower() in {
"1", "true", "yes"
}
@unittest.skipUnless(
_live_enabled(),
"Set RUN_LIVE_ZENROOM=1 to run live Zenroom service smoke tests",
)
class TestZenroomLiveServices(unittest.TestCase):
@classmethod
def setUpClass(cls):
base_url = os.environ.get("ZENROOM_BASE_URL", "http://localhost:3300").strip()
api_prefix = os.environ.get("ZENROOM_API_PREFIX", "/api").strip()
timeout_s = int(os.environ.get("ZENROOM_TIMEOUT_S", "20"))
cls.client = ZenroomServiceClient(
base_url=base_url,
api_prefix=api_prefix,
timeout_s=timeout_s,
)
def test_keypair_reading_identity_from_data(self):
"""
Tests:
POST /api/Generate-a-keypair,-reading-identity-from-data
Payload wrapped as {"data": {"myName": "..."}}
"""
res = self.client.generate_a_keypair_reading_identity_from_data(
"LiveUser123456"
)
self.assertIn("public_key", res)
self.assertIn("private_key", res)
self.assertIsInstance(res["public_key"], str)
self.assertIsInstance(res["private_key"], str)
self.assertTrue(res["public_key"])
self.assertTrue(res["private_key"])
def test_encrypt_decrypt_password_roundtrip(self):
"""
Tests:
POST /api/Encrypt-a-message-with-the-password
POST /api/Decrypt-the-message-with-the-password
"""
plaintext = "Dear Bob, your name is too short, goodbye - Alice."
encrypted = self.client.encrypt_a_message_with_the_password(
header="A very important secret",
message=plaintext,
password="myVerySecretPassword",
)
for k in ("checksum", "header", "iv", "text"):
self.assertIn(k, encrypted)
self.assertIsInstance(encrypted[k], str)
self.assertTrue(encrypted[k])
decrypted = self.client.decrypt_the_message_with_the_password(
secret_message=encrypted,
password="myVerySecretPassword",
)
self.assertEqual(decrypted.get("textDecrypted"), plaintext)

View File

@ -0,0 +1,30 @@
import os
import unittest
import sys
from pathlib import Path
code_path = Path(__file__).parents[2] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
class TestZenroomServiceClientIntegration(unittest.TestCase):
@unittest.skipUnless(
os.getenv("ZENROOM_BASE_URL"),
"No ZENROOM_BASE_URL set",
)
def test_generate_keypair_real_service(self):
client = ZenroomServiceClient(base_url=os.environ["ZENROOM_BASE_URL"])
res = client.generate_keypair("IntegrationUser")
self.assertIn("private_key", res)
self.assertIsInstance(res["private_key"], str)
self.assertTrue(res["private_key"].strip())
# public_key may or may not be returned by this contract variant
if "public_key" in res:
self.assertIsInstance(res["public_key"], str)
self.assertTrue(res["public_key"].strip())

View File

@ -82,3 +82,32 @@ class TestEntityFunctions(unittest.TestCase):
log_entry = get_last_log(self.cur).lower()
self.assertIn("symmetrical_key", log_entry)
def test_is_creator_true(self):
creator_id = entity.insert_creator(self.cur, "CreatorCheck", "pubkeyX")
self.assertTrue(entity.is_creator(self.cur, creator_id))
def test_is_creator_false_for_normal_person(self):
creator_id = entity.insert_creator(self.cur, "CreatorBase", "pubkeyBase")
person_id = entity.enroll_person(self.cur, "NormalPerson", "pubkeyP", creator_id)
self.assertFalse(entity.is_creator(self.cur, person_id))
def test_is_creator_false_for_group(self):
creator_id = entity.insert_creator(self.cur, "CreatorGroup", "pubkeyG")
group_id = entity.create_group(self.cur, "GroupX", "pubkeyGX", creator_id, "CA-REF-X")
self.assertFalse(entity.is_creator(self.cur, group_id))
def test_enroll_person_requires_creator(self):
creator_id = entity.insert_creator(self.cur, "CreatorY", "pubkeyY")
person_id = entity.enroll_person(self.cur, "PersonZ", "pubkeyZ", creator_id)
with self.assertRaises(ValueError):
entity.enroll_person(self.cur, "IllegalEnroll", "pubkeyBad", person_id)
def test_create_group_requires_creator(self):
creator_id = entity.insert_creator(self.cur, "CreatorZ", "pubkeyZ")
person_id = entity.enroll_person(self.cur, "PersonA", "pubkeyA", creator_id)
with self.assertRaises(ValueError):
entity.create_group(self.cur, "BadGroup", "pubkeyBG", person_id, "CA-REF-BAD")

View File

@ -86,3 +86,33 @@ class TestGroupFunctions(unittest.TestCase):
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")
def test_group_id_must_be_group(self):
creator_id = entity.insert_creator(self.cur, "CreatorType", "pubkeyT")
person_id = entity.enroll_person(self.cur, "PersonType", "pubkeyPT", creator_id)
other_person = entity.enroll_person(self.cur, "PersonType2", "pubkeyPT2", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, person_id, other_person, "member")
def test_role_validation(self):
creator_id = entity.insert_creator(self.cur, "CreatorRole", "pubkeyR")
group_id = entity.create_group(self.cur, "GroupRole", "pubkeyGR", creator_id, "CA-ROLE")
person_id = entity.enroll_person(self.cur, "PersonRole", "pubkeyPR", creator_id)
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "")
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "x" * 11)
def test_duplicate_membership_rejected(self):
creator_id = entity.insert_creator(self.cur, "CreatorDup", "pubkeyD")
group_id = entity.create_group(self.cur, "GroupDup", "pubkeyGD", creator_id, "CA-DUP")
person_id = entity.enroll_person(self.cur, "PersonDup", "pubkeyPD", creator_id)
group_member.add_group_member(self.cur, group_id, person_id, "member")
with self.assertRaises(ValueError):
group_member.add_group_member(self.cur, group_id, person_id, "member")

View File

@ -0,0 +1,15 @@
import os
import unittest
from crypto.zenroom_service_client import ZenroomServiceClient
class TestZenroomHTTPIntegration(unittest.TestCase):
@unittest.skipUnless(os.getenv("ZENROOM_BASE_URL"), "No ZENROOM_BASE_URL set")
def test_sign_and_verify_roundtrip(self):
base_url = os.environ["ZENROOM_BASE_URL"]
client = ZenroomServiceClient(base_url=base_url)
# You must supply real keys here for full integration
self.assertTrue(True)

View File

@ -64,3 +64,11 @@ class TestMetadataFunctions(unittest.TestCase):
log_entry = get_last_log(self.cur).lower()
self.assertIn("defense_p", log_entry)
def test_metadata_setters_do_not_wipe_other_fields(self):
metadata.set_name(self.cur, "PreserveMe")
metadata.set_defense_p(self.cur, True)
# defense_p should be updated, and name should still be present.
self.assertTrue(metadata.get_defense_p(self.cur))
self.assertEqual(metadata.get_name(self.cur), "PreserveMe")

View File

@ -0,0 +1,90 @@
import unittest
import sys
from pathlib import Path
from unittest import mock
# Allow imports from ca_core (same pattern as existing tests)
code_path = Path(__file__).parent.parent / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_client import ZenroomDockerClient, ZenroomError
class TestZenroomDockerClient(unittest.TestCase):
def _fake_completed(self, returncode=0, stdout="", stderr=""):
cp = mock.Mock()
cp.returncode = returncode
cp.stdout = stdout
cp.stderr = stderr
return cp
@mock.patch("crypto.zenroom_client.subprocess.run")
def test_run_builds_expected_docker_command(self, m_run):
m_run.return_value = self._fake_completed(stdout='{"ok": true}')
client = ZenroomDockerClient(image="zenroom/zenroom:latest")
# Patch temp dir so we can assert paths deterministically
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
m_td.return_value.__exit__.return_value = False
res = client.run("print('hi')", data={"a": 1}, keys={"k": "v"}, conf={"c": 2})
self.assertEqual(res, {"ok": True})
args, kwargs = m_run.call_args
cmd = args[0]
self.assertIn("docker", cmd[0])
self.assertIn("run", cmd)
self.assertIn("zenroom/zenroom:latest", cmd)
# Mount and workdir
self.assertIn("-v", cmd)
self.assertIn("/tmp/zenroom_test:/work", cmd)
self.assertIn("-w", cmd)
self.assertIn("/work", cmd)
# Zenroom base args
self.assertIn("zenroom", cmd)
self.assertIn("-z", cmd)
# Input files flags should be present
self.assertIn("-a", cmd)
self.assertIn("/work/data.json", cmd)
self.assertIn("-k", cmd)
self.assertIn("/work/keys.json", cmd)
self.assertIn("-c", cmd)
self.assertIn("/work/conf.json", cmd)
# Script at end
self.assertEqual(cmd[-1], "/work/script.zen")
# subprocess.run called with capture_output/text
self.assertTrue(kwargs.get("capture_output"))
self.assertTrue(kwargs.get("text"))
@mock.patch("crypto.zenroom_client.subprocess.run")
def test_run_returns_raw_stdout_when_not_json(self, m_run):
m_run.return_value = self._fake_completed(stdout="hello")
client = ZenroomDockerClient()
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
m_td.return_value.__exit__.return_value = False
out = client.run("print('hi')")
self.assertEqual(out, "hello")
@mock.patch("crypto.zenroom_client.subprocess.run")
def test_run_raises_on_nonzero_exit(self, m_run):
m_run.return_value = self._fake_completed(returncode=1, stderr="boom")
client = ZenroomDockerClient()
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
m_td.return_value.__exit__.return_value = False
with self.assertRaises(ZenroomError) as ctx:
client.run("print('hi')")
self.assertIn("boom", str(ctx.exception))
def test_run_requires_non_empty_script(self):
client = ZenroomDockerClient()
with self.assertRaises(ValueError):
client.run(" ")

View File

@ -0,0 +1,67 @@
import json
import unittest
from unittest import mock
import sys
from pathlib import Path
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
class _FakeHTTPResponse:
def __init__(self, body: bytes):
self._body = body
def read(self):
return self._body
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class TestZenroomServiceClient(unittest.TestCase):
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_unpacks_private_key(self, m_urlopen):
payload = {
"IntegrationUser": {
"keyring": {"ecdh": "PRIVKEY"},
}
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("IntegrationUser")
self.assertEqual(res["private_key"], "PRIVKEY")
self.assertNotIn("public_key", res)
req = m_urlopen.call_args[0][0]
self.assertEqual(req.method, "POST")
self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(sent, {"data": {"myName": "IntegrationUser"}})
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_includes_public_key_if_present(self, m_urlopen):
payload = {
"IntegrationUser": {
"ecdh_public_key": "PUBKEY",
"keyring": {"ecdh": "PRIVKEY"},
}
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("IntegrationUser")
self.assertEqual(res["private_key"], "PRIVKEY")
self.assertEqual(res["public_key"], "PUBKEY")

View File

@ -0,0 +1,55 @@
import json
import unittest
from unittest import mock
import sys
from pathlib import Path
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient
class _FakeHTTPResponse:
def __init__(self, body: bytes):
self._body = body
def read(self):
return self._body
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class TestZenroomServiceClient(unittest.TestCase):
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_unpacks_keys(self, m_urlopen):
payload = {
"Owner": {
"ecdh_public_key": "PUBKEY",
"keyring": {"ecdh": "PRIVKEY"},
}
}
m_urlopen.return_value = _FakeHTTPResponse(
json.dumps(payload).encode("utf-8")
)
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("User123")
self.assertEqual(res["public_key"], "PUBKEY")
self.assertEqual(res["private_key"], "PRIVKEY")
req = m_urlopen.call_args[0][0]
self.assertEqual(req.method, "POST")
self.assertTrue(
req.full_url.endswith(
"/api/Generate-a-keypair,-reading-identity-from-data"
)
)