diff --git a/ca_core/crypto/__init__.py b/ca_core/crypto/__init__.py new file mode 100644 index 0000000..1a9daba --- /dev/null +++ b/ca_core/crypto/__init__.py @@ -0,0 +1 @@ +# Crypto HTTP client for Zenroom suite. diff --git a/ca_core/crypto/__pycache__/__init__.cpython-313.pyc b/ca_core/crypto/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..c0df962 Binary files /dev/null and b/ca_core/crypto/__pycache__/__init__.cpython-313.pyc differ diff --git a/ca_core/crypto/__pycache__/zenroom_client.cpython-313.pyc b/ca_core/crypto/__pycache__/zenroom_client.cpython-313.pyc new file mode 100644 index 0000000..287855b Binary files /dev/null and b/ca_core/crypto/__pycache__/zenroom_client.cpython-313.pyc differ diff --git a/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc b/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc new file mode 100644 index 0000000..d90133f Binary files /dev/null and b/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc differ diff --git a/ca_core/crypto/zenroom_client.py b/ca_core/crypto/zenroom_client.py new file mode 100644 index 0000000..9f3eb7d --- /dev/null +++ b/ca_core/crypto/zenroom_client.py @@ -0,0 +1,157 @@ +import json +import subprocess +import tempfile +from pathlib import Path +from typing import Any, Dict, Optional, Sequence, Union + + +JsonLike = Union[Dict[str, Any], Sequence[Any], str, int, float, bool, None] + + +class ZenroomError(RuntimeError): + """Raised when Zenroom execution fails.""" + + +class ZenroomDockerClient: + """Run Zenroom via Docker. + + This wrapper is intentionally small and testable. It focuses on: + - Writing inputs (script/data/keys/conf) to a temp workdir + - Running a docker container that executes Zenroom + - Returning parsed JSON output when possible + + Assumed container interface (common Zenroom CLI pattern): + zenroom -z -a -k -c + + If your docker image/entrypoint differs, pass `zenroom_args` accordingly. + + Note: This module is named *zenroom_client.py* (not zenroom.py) to avoid + potential import shadowing with future packages/modules. + """ + + def __init__( + self, + image: str = "zenroom/zenroom:latest", + docker_bin: str = "docker", + work_mount_path: str = "/work", + zenroom_args: Optional[Sequence[str]] = None, + timeout_s: int = 30, + ) -> None: + self.image = image + self.docker_bin = docker_bin + self.work_mount_path = work_mount_path + self.zenroom_args = list(zenroom_args) if zenroom_args is not None else ["zenroom", "-z"] + self.timeout_s = timeout_s + + def run( + self, + script: str, + *, + data: Optional[JsonLike] = None, + keys: Optional[JsonLike] = None, + conf: Optional[JsonLike] = None, + extra_docker_args: Optional[Sequence[str]] = None, + extra_zenroom_args: Optional[Sequence[str]] = None, + ) -> Union[dict, str]: + """Execute a Zenroom script. + + Args: + script: The Zenroom script (text). + data/keys/conf: Optional JSON-like payloads written to files. + extra_docker_args: Optional extra args inserted after `docker run`. + extra_zenroom_args: Optional extra args appended before the script path. + + Returns: + Parsed JSON dict if stdout is valid JSON, otherwise raw stdout string. + + Raises: + ZenroomError on non-zero exit. + """ + if not isinstance(script, str) or not script.strip(): + raise ValueError("script must be a non-empty string") + + with tempfile.TemporaryDirectory(prefix="zenroom_") as tmpdir: + workdir = Path(tmpdir) + + # Defensive: TemporaryDirectory normally creates the dir, but tests may mock it. + workdir.mkdir(parents=True, exist_ok=True) + + script_path = workdir / "script.zen" + script_path.write_text(script, encoding="utf-8") + + data_path = None + keys_path = None + conf_path = None + + if data is not None: + data_path = workdir / "data.json" + data_path.write_text(json.dumps(data), encoding="utf-8") + if keys is not None: + keys_path = workdir / "keys.json" + keys_path.write_text(json.dumps(keys), encoding="utf-8") + if conf is not None: + conf_path = workdir / "conf.json" + conf_path.write_text(json.dumps(conf), encoding="utf-8") + + cmd = [ + self.docker_bin, + "run", + "--rm", + "-i", + ] + + if extra_docker_args: + cmd.extend(list(extra_docker_args)) + + # Mount temp dir into container + cmd.extend( + [ + "-v", + f"{workdir}:{self.work_mount_path}", + "-w", + self.work_mount_path, + self.image, + ] + ) + + # Build zenroom command + cmd.extend(list(self.zenroom_args)) + + if data_path is not None: + cmd.extend(["-a", str(Path(self.work_mount_path) / data_path.name)]) + if keys_path is not None: + cmd.extend(["-k", str(Path(self.work_mount_path) / keys_path.name)]) + if conf_path is not None: + cmd.extend(["-c", str(Path(self.work_mount_path) / conf_path.name)]) + + if extra_zenroom_args: + cmd.extend(list(extra_zenroom_args)) + + cmd.append(str(Path(self.work_mount_path) / script_path.name)) + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=self.timeout_s, + check=False, + ) + + if result.returncode != 0: + stderr = (result.stderr or "").strip() + stdout = (result.stdout or "").strip() + msg = stderr or stdout or f"Zenroom failed with exit code {result.returncode}" + raise ZenroomError(msg) + + out = (result.stdout or "").strip() + if not out: + return "" + + try: + parsed = json.loads(out) + if isinstance(parsed, dict): + return parsed + # Zenroom can output arrays too; keep compatibility. + return {"result": parsed} + except json.JSONDecodeError: + return out diff --git a/ca_core/crypto/zenroom_service_client.py b/ca_core/crypto/zenroom_service_client.py new file mode 100644 index 0000000..1ad8473 --- /dev/null +++ b/ca_core/crypto/zenroom_service_client.py @@ -0,0 +1,148 @@ +import json +import urllib.request +import urllib.error +from typing import Any, Dict, Optional + + +class ZenroomServiceError(RuntimeError): + pass + + +class ZenroomServiceClient: + + def __init__( + self, + base_url: str = "http://localhost:3300", + *, + api_prefix: str = "/api", + timeout_s: int = 10, + ) -> None: + self.base_url = base_url.rstrip("/") + self.api_prefix = api_prefix.strip() + + if self.api_prefix in {"", "/"}: + self.api_prefix = "" + elif not self.api_prefix.startswith("/"): + self.api_prefix = "/" + self.api_prefix + + self.timeout_s = timeout_s + + def _make_url(self, path: str) -> str: + path = "/" + path.lstrip("/") + return f"{self.base_url}{self.api_prefix}{path}" + + def _request_json( + self, + method: str, + path: str, + payload: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + + url = self._make_url(path) + data = None + headers = {"Accept": "application/json"} + + if payload is not None: + data = json.dumps(payload).encode("utf-8") + headers["Content-Type"] = "application/json" + + req = urllib.request.Request(url, data=data, headers=headers, method=method.upper()) + + try: + with urllib.request.urlopen(req, timeout=self.timeout_s) as resp: + raw = resp.read() + text = raw.decode("utf-8") + except urllib.error.HTTPError as e: + body = "" + try: + body = e.read().decode("utf-8") + except Exception: + pass + raise ZenroomServiceError(f"HTTP {e.code} from {url}: {body or e.reason}") from e + except urllib.error.URLError as e: + raise ZenroomServiceError(f"Failed to reach {url}: {e.reason}") from e + + text = text.strip() + if not text: + raise ZenroomServiceError(f"Empty response from {url}") + + try: + parsed = json.loads(text) + except json.JSONDecodeError as e: + raise ZenroomServiceError(f"Non-JSON response from {url}: {text[:200]}") from e + + if not isinstance(parsed, dict): + raise ZenroomServiceError(f"Expected JSON object from {url}") + + return parsed + + def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]: + return self._request_json("POST", path, payload) + + def _post_data(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]: + res = self._post(path, {"data": data}) + + if "zenroom_errors" in res or "exception" in res: + exc = res.get("exception", "") + ze = res.get("zenroom_errors") + logs = "" + if isinstance(ze, dict): + logs = str(ze.get("logs", ""))[:800] + raise ZenroomServiceError(f"Zenroom error from {path}: {exc or logs or 'unknown error'}") + + return res + + @staticmethod + def _require_non_empty_str(name: str, value: str) -> str: + if not isinstance(value, str): + raise TypeError(f"{name} must be a string") + v = value.strip() + if not v: + raise ValueError(f"{name} cannot be empty") + return v + + def generate_keypair(self, my_name: str) -> Dict[str, str]: + """Generate an ECDH keypair using the RESTroom contract: + + POST /api/Generate-a-keypair,-reading-identity-from-data + body: { "data": { "myName": "" } } + + Observed response from your service: + { "": { "keyring": { "ecdh": "" } } } + + Some variants also include: + "ecdh_public_key": "" + + This method returns: + { "private_key": "...", "public_key": "..." } (public_key only if present) + """ + my_name = self._require_non_empty_str("my_name", my_name) + + res = self._post_data( + "Generate-a-keypair,-reading-identity-from-data", + {"myName": my_name}, + ) + + if not res: + raise ZenroomServiceError("Empty keypair response") + + # Zenroom typically returns { "": { ... } } + owner = next(iter(res.values())) + if not isinstance(owner, dict): + raise ZenroomServiceError(f"Invalid keypair response structure: {res!r}") + + keyring = owner.get("keyring") + if not isinstance(keyring, dict): + raise ZenroomServiceError(f"Invalid keypair response (missing keyring): {res!r}") + + private_key = keyring.get("ecdh") + if not isinstance(private_key, str) or not private_key.strip(): + raise ZenroomServiceError(f"Invalid keypair response (missing keyring.ecdh): {res!r}") + + out: Dict[str, str] = {"private_key": private_key} + + public_key = owner.get("ecdh_public_key") + if isinstance(public_key, str) and public_key.strip(): + out["public_key"] = public_key + + return out diff --git a/tests/__pycache__/test_integration_zenroom_docker.cpython-313.pyc b/tests/__pycache__/test_integration_zenroom_docker.cpython-313.pyc new file mode 100644 index 0000000..02703a0 Binary files /dev/null and b/tests/__pycache__/test_integration_zenroom_docker.cpython-313.pyc differ diff --git a/tests/__pycache__/test_integration_zenroom_service.cpython-313.pyc b/tests/__pycache__/test_integration_zenroom_service.cpython-313.pyc new file mode 100644 index 0000000..e7971df Binary files /dev/null and b/tests/__pycache__/test_integration_zenroom_service.cpython-313.pyc differ diff --git a/tests/__pycache__/test_zenroom_client.cpython-313.pyc b/tests/__pycache__/test_zenroom_client.cpython-313.pyc new file mode 100644 index 0000000..3b57272 Binary files /dev/null and b/tests/__pycache__/test_zenroom_client.cpython-313.pyc differ diff --git a/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc b/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc new file mode 100644 index 0000000..55af9bb Binary files /dev/null and b/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc differ diff --git a/tests/__pycache__/test_zenroom_service_client_clean.cpython-313.pyc b/tests/__pycache__/test_zenroom_service_client_clean.cpython-313.pyc new file mode 100644 index 0000000..5897f9f Binary files /dev/null and b/tests/__pycache__/test_zenroom_service_client_clean.cpython-313.pyc differ diff --git a/tests/integration/__pycache__/test_integration_zenroom_docker.cpython-313.pyc b/tests/integration/__pycache__/test_integration_zenroom_docker.cpython-313.pyc new file mode 100644 index 0000000..b690dd3 Binary files /dev/null and b/tests/integration/__pycache__/test_integration_zenroom_docker.cpython-313.pyc differ diff --git a/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc b/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc new file mode 100644 index 0000000..b892a8d Binary files /dev/null and b/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc differ diff --git a/tests/integration/test_integration_zenroom_docker.py b/tests/integration/test_integration_zenroom_docker.py new file mode 100644 index 0000000..d410324 --- /dev/null +++ b/tests/integration/test_integration_zenroom_docker.py @@ -0,0 +1,88 @@ +import os +import sys +import unittest +import subprocess +from pathlib import Path + +# Make ca_core importable as the module root (so `import crypto...` works) +code_path = Path(__file__).parents[1] / "ca_core" +sys.path.insert(0, str(code_path)) + +from crypto.zenroom_client import ZenroomDockerClient, ZenroomError + + +def _docker_ok(): + """Return (ok, reason).""" + try: + p = subprocess.run( + ["docker", "version"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=10, + check=False, + ) + except FileNotFoundError: + return False, "docker CLI not found" + except Exception as e: + return False, f"docker check failed: {e}" + + if p.returncode != 0: + out = (p.stdout or "").strip() + return False, f"docker not usable: {out}" + return True, "" + + +def _image_exists(image: str): + """Return (ok, reason).""" + try: + p = subprocess.run( + ["docker", "image", "inspect", image], + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + text=True, + timeout=10, + check=False, + ) + except Exception as e: + return False, f"docker image inspect failed: {e}" + + if p.returncode != 0: + return False, f"docker image not found locally: {image}" + return True, "" + + +class TestZenroomDockerIntegration(unittest.TestCase): + """Integration tests for ZenroomDockerClient. + + Enable by setting: + ZENROOM_DOCKER_INTEGRATION=1 + + Image selection: + ZENROOM_IMAGE (default: zenroom) + """ + + @classmethod + def setUpClass(cls): + if not os.getenv("ZENROOM_DOCKER_INTEGRATION"): + raise unittest.SkipTest("Docker integration disabled (set ZENROOM_DOCKER_INTEGRATION=1)") + + ok, reason = _docker_ok() + if not ok: + raise unittest.SkipTest(reason) + + cls.image = os.getenv("ZENROOM_IMAGE", "zenroom") + + ok, reason = _image_exists(cls.image) + if not ok: + raise unittest.SkipTest(reason + " (build it or set ZENROOM_IMAGE)") + + def test_basic_execution(self): + client = ZenroomDockerClient(image=self.image) + out = client.run("print('hello')") + self.assertIn("hello", str(out)) + + def test_nonzero_exit_raises(self): + client = ZenroomDockerClient(image=self.image) + with self.assertRaises(ZenroomError): + client.run("THIS IS NOT VALID ZENCODE") diff --git a/tests/integration/test_zenroom_live.py b/tests/integration/test_zenroom_live.py new file mode 100644 index 0000000..b045cca --- /dev/null +++ b/tests/integration/test_zenroom_live.py @@ -0,0 +1,78 @@ +import os +import unittest +import sys +from pathlib import Path + +# Import from ca_core (same pattern as other tests) +code_path = Path(__file__).parent.parent.parent / "ca_core" +sys.path.insert(0, str(code_path)) + +from crypto.zenroom_service_client import ZenroomServiceClient + + +def _live_enabled() -> bool: + return os.environ.get("RUN_LIVE_ZENROOM", "").strip().lower() in { + "1", "true", "yes" + } + + +@unittest.skipUnless( + _live_enabled(), + "Set RUN_LIVE_ZENROOM=1 to run live Zenroom service smoke tests", +) +class TestZenroomLiveServices(unittest.TestCase): + + @classmethod + def setUpClass(cls): + base_url = os.environ.get("ZENROOM_BASE_URL", "http://localhost:3300").strip() + api_prefix = os.environ.get("ZENROOM_API_PREFIX", "/api").strip() + timeout_s = int(os.environ.get("ZENROOM_TIMEOUT_S", "20")) + + cls.client = ZenroomServiceClient( + base_url=base_url, + api_prefix=api_prefix, + timeout_s=timeout_s, + ) + + def test_keypair_reading_identity_from_data(self): + """ + Tests: + POST /api/Generate-a-keypair,-reading-identity-from-data + Payload wrapped as {"data": {"myName": "..."}} + """ + res = self.client.generate_a_keypair_reading_identity_from_data( + "LiveUser123456" + ) + + self.assertIn("public_key", res) + self.assertIn("private_key", res) + self.assertIsInstance(res["public_key"], str) + self.assertIsInstance(res["private_key"], str) + self.assertTrue(res["public_key"]) + self.assertTrue(res["private_key"]) + + def test_encrypt_decrypt_password_roundtrip(self): + """ + Tests: + POST /api/Encrypt-a-message-with-the-password + POST /api/Decrypt-the-message-with-the-password + """ + plaintext = "Dear Bob, your name is too short, goodbye - Alice." + + encrypted = self.client.encrypt_a_message_with_the_password( + header="A very important secret", + message=plaintext, + password="myVerySecretPassword", + ) + + for k in ("checksum", "header", "iv", "text"): + self.assertIn(k, encrypted) + self.assertIsInstance(encrypted[k], str) + self.assertTrue(encrypted[k]) + + decrypted = self.client.decrypt_the_message_with_the_password( + secret_message=encrypted, + password="myVerySecretPassword", + ) + + self.assertEqual(decrypted.get("textDecrypted"), plaintext) diff --git a/tests/integration/test_zenroom_service_client_integration.py b/tests/integration/test_zenroom_service_client_integration.py new file mode 100644 index 0000000..87c79b3 --- /dev/null +++ b/tests/integration/test_zenroom_service_client_integration.py @@ -0,0 +1,30 @@ +import os +import unittest +import sys +from pathlib import Path + +code_path = Path(__file__).parents[2] / "ca_core" +sys.path.insert(0, str(code_path)) + +from crypto.zenroom_service_client import ZenroomServiceClient + + +class TestZenroomServiceClientIntegration(unittest.TestCase): + + @unittest.skipUnless( + os.getenv("ZENROOM_BASE_URL"), + "No ZENROOM_BASE_URL set", + ) + def test_generate_keypair_real_service(self): + client = ZenroomServiceClient(base_url=os.environ["ZENROOM_BASE_URL"]) + + res = client.generate_keypair("IntegrationUser") + + self.assertIn("private_key", res) + self.assertIsInstance(res["private_key"], str) + self.assertTrue(res["private_key"].strip()) + + # public_key may or may not be returned by this contract variant + if "public_key" in res: + self.assertIsInstance(res["public_key"], str) + self.assertTrue(res["public_key"].strip()) diff --git a/tests/test_integration_zenroom_service.py b/tests/test_integration_zenroom_service.py new file mode 100644 index 0000000..51b8a30 --- /dev/null +++ b/tests/test_integration_zenroom_service.py @@ -0,0 +1,15 @@ +import os +import unittest + +from crypto.zenroom_service_client import ZenroomServiceClient + + +class TestZenroomHTTPIntegration(unittest.TestCase): + + @unittest.skipUnless(os.getenv("ZENROOM_BASE_URL"), "No ZENROOM_BASE_URL set") + def test_sign_and_verify_roundtrip(self): + base_url = os.environ["ZENROOM_BASE_URL"] + client = ZenroomServiceClient(base_url=base_url) + + # You must supply real keys here for full integration + self.assertTrue(True) diff --git a/tests/test_zenroom_client.py b/tests/test_zenroom_client.py new file mode 100644 index 0000000..8168602 --- /dev/null +++ b/tests/test_zenroom_client.py @@ -0,0 +1,90 @@ +import unittest +import sys +from pathlib import Path +from unittest import mock + +# Allow imports from ca_core (same pattern as existing tests) +code_path = Path(__file__).parent.parent / "ca_core" +sys.path.insert(0, str(code_path)) + +from crypto.zenroom_client import ZenroomDockerClient, ZenroomError + + +class TestZenroomDockerClient(unittest.TestCase): + def _fake_completed(self, returncode=0, stdout="", stderr=""): + cp = mock.Mock() + cp.returncode = returncode + cp.stdout = stdout + cp.stderr = stderr + return cp + + @mock.patch("crypto.zenroom_client.subprocess.run") + def test_run_builds_expected_docker_command(self, m_run): + m_run.return_value = self._fake_completed(stdout='{"ok": true}') + client = ZenroomDockerClient(image="zenroom/zenroom:latest") + + # Patch temp dir so we can assert paths deterministically + with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td: + m_td.return_value.__enter__.return_value = "/tmp/zenroom_test" + m_td.return_value.__exit__.return_value = False + + res = client.run("print('hi')", data={"a": 1}, keys={"k": "v"}, conf={"c": 2}) + + self.assertEqual(res, {"ok": True}) + + args, kwargs = m_run.call_args + cmd = args[0] + self.assertIn("docker", cmd[0]) + self.assertIn("run", cmd) + self.assertIn("zenroom/zenroom:latest", cmd) + + # Mount and workdir + self.assertIn("-v", cmd) + self.assertIn("/tmp/zenroom_test:/work", cmd) + self.assertIn("-w", cmd) + self.assertIn("/work", cmd) + + # Zenroom base args + self.assertIn("zenroom", cmd) + self.assertIn("-z", cmd) + + # Input files flags should be present + self.assertIn("-a", cmd) + self.assertIn("/work/data.json", cmd) + self.assertIn("-k", cmd) + self.assertIn("/work/keys.json", cmd) + self.assertIn("-c", cmd) + self.assertIn("/work/conf.json", cmd) + + # Script at end + self.assertEqual(cmd[-1], "/work/script.zen") + + # subprocess.run called with capture_output/text + self.assertTrue(kwargs.get("capture_output")) + self.assertTrue(kwargs.get("text")) + + @mock.patch("crypto.zenroom_client.subprocess.run") + def test_run_returns_raw_stdout_when_not_json(self, m_run): + m_run.return_value = self._fake_completed(stdout="hello") + client = ZenroomDockerClient() + with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td: + m_td.return_value.__enter__.return_value = "/tmp/zenroom_test" + m_td.return_value.__exit__.return_value = False + out = client.run("print('hi')") + self.assertEqual(out, "hello") + + @mock.patch("crypto.zenroom_client.subprocess.run") + def test_run_raises_on_nonzero_exit(self, m_run): + m_run.return_value = self._fake_completed(returncode=1, stderr="boom") + client = ZenroomDockerClient() + with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td: + m_td.return_value.__enter__.return_value = "/tmp/zenroom_test" + m_td.return_value.__exit__.return_value = False + with self.assertRaises(ZenroomError) as ctx: + client.run("print('hi')") + self.assertIn("boom", str(ctx.exception)) + + def test_run_requires_non_empty_script(self): + client = ZenroomDockerClient() + with self.assertRaises(ValueError): + client.run(" ") diff --git a/tests/test_zenroom_service_client.py b/tests/test_zenroom_service_client.py new file mode 100644 index 0000000..24fccdd --- /dev/null +++ b/tests/test_zenroom_service_client.py @@ -0,0 +1,67 @@ +import json +import unittest +from unittest import mock +import sys +from pathlib import Path + +code_path = Path(__file__).parents[1] / "ca_core" +sys.path.insert(0, str(code_path)) + +from crypto.zenroom_service_client import ZenroomServiceClient + + +class _FakeHTTPResponse: + def __init__(self, body: bytes): + self._body = body + + def read(self): + return self._body + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class TestZenroomServiceClient(unittest.TestCase): + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_generate_keypair_unpacks_private_key(self, m_urlopen): + payload = { + "IntegrationUser": { + "keyring": {"ecdh": "PRIVKEY"}, + } + } + + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + res = client.generate_keypair("IntegrationUser") + + self.assertEqual(res["private_key"], "PRIVKEY") + self.assertNotIn("public_key", res) + + req = m_urlopen.call_args[0][0] + self.assertEqual(req.method, "POST") + self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data")) + + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual(sent, {"data": {"myName": "IntegrationUser"}}) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_generate_keypair_includes_public_key_if_present(self, m_urlopen): + payload = { + "IntegrationUser": { + "ecdh_public_key": "PUBKEY", + "keyring": {"ecdh": "PRIVKEY"}, + } + } + + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + res = client.generate_keypair("IntegrationUser") + + self.assertEqual(res["private_key"], "PRIVKEY") + self.assertEqual(res["public_key"], "PUBKEY") diff --git a/tests/test_zenroom_service_client_clean.py b/tests/test_zenroom_service_client_clean.py new file mode 100644 index 0000000..08cb623 --- /dev/null +++ b/tests/test_zenroom_service_client_clean.py @@ -0,0 +1,55 @@ +import json +import unittest +from unittest import mock +import sys +from pathlib import Path + +code_path = Path(__file__).parents[1] / "ca_core" +sys.path.insert(0, str(code_path)) + +from crypto.zenroom_service_client import ZenroomServiceClient + + +class _FakeHTTPResponse: + def __init__(self, body: bytes): + self._body = body + + def read(self): + return self._body + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class TestZenroomServiceClient(unittest.TestCase): + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_generate_keypair_unpacks_keys(self, m_urlopen): + + payload = { + "Owner": { + "ecdh_public_key": "PUBKEY", + "keyring": {"ecdh": "PRIVKEY"}, + } + } + + m_urlopen.return_value = _FakeHTTPResponse( + json.dumps(payload).encode("utf-8") + ) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + res = client.generate_keypair("User123") + + self.assertEqual(res["public_key"], "PUBKEY") + self.assertEqual(res["private_key"], "PRIVKEY") + + req = m_urlopen.call_args[0][0] + self.assertEqual(req.method, "POST") + self.assertTrue( + req.full_url.endswith( + "/api/Generate-a-keypair,-reading-identity-from-data" + ) + )