integration zemroo, image
This commit is contained in:
parent
35752bce6b
commit
c560407c74
|
|
@ -0,0 +1 @@
|
|||
# Crypto HTTP client for Zenroom suite.
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,157 @@
|
|||
import json
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Sequence, Union
|
||||
|
||||
|
||||
JsonLike = Union[Dict[str, Any], Sequence[Any], str, int, float, bool, None]
|
||||
|
||||
|
||||
class ZenroomError(RuntimeError):
|
||||
"""Raised when Zenroom execution fails."""
|
||||
|
||||
|
||||
class ZenroomDockerClient:
|
||||
"""Run Zenroom via Docker.
|
||||
|
||||
This wrapper is intentionally small and testable. It focuses on:
|
||||
- Writing inputs (script/data/keys/conf) to a temp workdir
|
||||
- Running a docker container that executes Zenroom
|
||||
- Returning parsed JSON output when possible
|
||||
|
||||
Assumed container interface (common Zenroom CLI pattern):
|
||||
zenroom -z -a <data.json> -k <keys.json> -c <conf.json> <script.zen>
|
||||
|
||||
If your docker image/entrypoint differs, pass `zenroom_args` accordingly.
|
||||
|
||||
Note: This module is named *zenroom_client.py* (not zenroom.py) to avoid
|
||||
potential import shadowing with future packages/modules.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
image: str = "zenroom/zenroom:latest",
|
||||
docker_bin: str = "docker",
|
||||
work_mount_path: str = "/work",
|
||||
zenroom_args: Optional[Sequence[str]] = None,
|
||||
timeout_s: int = 30,
|
||||
) -> None:
|
||||
self.image = image
|
||||
self.docker_bin = docker_bin
|
||||
self.work_mount_path = work_mount_path
|
||||
self.zenroom_args = list(zenroom_args) if zenroom_args is not None else ["zenroom", "-z"]
|
||||
self.timeout_s = timeout_s
|
||||
|
||||
def run(
|
||||
self,
|
||||
script: str,
|
||||
*,
|
||||
data: Optional[JsonLike] = None,
|
||||
keys: Optional[JsonLike] = None,
|
||||
conf: Optional[JsonLike] = None,
|
||||
extra_docker_args: Optional[Sequence[str]] = None,
|
||||
extra_zenroom_args: Optional[Sequence[str]] = None,
|
||||
) -> Union[dict, str]:
|
||||
"""Execute a Zenroom script.
|
||||
|
||||
Args:
|
||||
script: The Zenroom script (text).
|
||||
data/keys/conf: Optional JSON-like payloads written to files.
|
||||
extra_docker_args: Optional extra args inserted after `docker run`.
|
||||
extra_zenroom_args: Optional extra args appended before the script path.
|
||||
|
||||
Returns:
|
||||
Parsed JSON dict if stdout is valid JSON, otherwise raw stdout string.
|
||||
|
||||
Raises:
|
||||
ZenroomError on non-zero exit.
|
||||
"""
|
||||
if not isinstance(script, str) or not script.strip():
|
||||
raise ValueError("script must be a non-empty string")
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="zenroom_") as tmpdir:
|
||||
workdir = Path(tmpdir)
|
||||
|
||||
# Defensive: TemporaryDirectory normally creates the dir, but tests may mock it.
|
||||
workdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
script_path = workdir / "script.zen"
|
||||
script_path.write_text(script, encoding="utf-8")
|
||||
|
||||
data_path = None
|
||||
keys_path = None
|
||||
conf_path = None
|
||||
|
||||
if data is not None:
|
||||
data_path = workdir / "data.json"
|
||||
data_path.write_text(json.dumps(data), encoding="utf-8")
|
||||
if keys is not None:
|
||||
keys_path = workdir / "keys.json"
|
||||
keys_path.write_text(json.dumps(keys), encoding="utf-8")
|
||||
if conf is not None:
|
||||
conf_path = workdir / "conf.json"
|
||||
conf_path.write_text(json.dumps(conf), encoding="utf-8")
|
||||
|
||||
cmd = [
|
||||
self.docker_bin,
|
||||
"run",
|
||||
"--rm",
|
||||
"-i",
|
||||
]
|
||||
|
||||
if extra_docker_args:
|
||||
cmd.extend(list(extra_docker_args))
|
||||
|
||||
# Mount temp dir into container
|
||||
cmd.extend(
|
||||
[
|
||||
"-v",
|
||||
f"{workdir}:{self.work_mount_path}",
|
||||
"-w",
|
||||
self.work_mount_path,
|
||||
self.image,
|
||||
]
|
||||
)
|
||||
|
||||
# Build zenroom command
|
||||
cmd.extend(list(self.zenroom_args))
|
||||
|
||||
if data_path is not None:
|
||||
cmd.extend(["-a", str(Path(self.work_mount_path) / data_path.name)])
|
||||
if keys_path is not None:
|
||||
cmd.extend(["-k", str(Path(self.work_mount_path) / keys_path.name)])
|
||||
if conf_path is not None:
|
||||
cmd.extend(["-c", str(Path(self.work_mount_path) / conf_path.name)])
|
||||
|
||||
if extra_zenroom_args:
|
||||
cmd.extend(list(extra_zenroom_args))
|
||||
|
||||
cmd.append(str(Path(self.work_mount_path) / script_path.name))
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=self.timeout_s,
|
||||
check=False,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
stderr = (result.stderr or "").strip()
|
||||
stdout = (result.stdout or "").strip()
|
||||
msg = stderr or stdout or f"Zenroom failed with exit code {result.returncode}"
|
||||
raise ZenroomError(msg)
|
||||
|
||||
out = (result.stdout or "").strip()
|
||||
if not out:
|
||||
return ""
|
||||
|
||||
try:
|
||||
parsed = json.loads(out)
|
||||
if isinstance(parsed, dict):
|
||||
return parsed
|
||||
# Zenroom can output arrays too; keep compatibility.
|
||||
return {"result": parsed}
|
||||
except json.JSONDecodeError:
|
||||
return out
|
||||
|
|
@ -0,0 +1,148 @@
|
|||
import json
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
class ZenroomServiceError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class ZenroomServiceClient:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = "http://localhost:3300",
|
||||
*,
|
||||
api_prefix: str = "/api",
|
||||
timeout_s: int = 10,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.api_prefix = api_prefix.strip()
|
||||
|
||||
if self.api_prefix in {"", "/"}:
|
||||
self.api_prefix = ""
|
||||
elif not self.api_prefix.startswith("/"):
|
||||
self.api_prefix = "/" + self.api_prefix
|
||||
|
||||
self.timeout_s = timeout_s
|
||||
|
||||
def _make_url(self, path: str) -> str:
|
||||
path = "/" + path.lstrip("/")
|
||||
return f"{self.base_url}{self.api_prefix}{path}"
|
||||
|
||||
def _request_json(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
|
||||
url = self._make_url(path)
|
||||
data = None
|
||||
headers = {"Accept": "application/json"}
|
||||
|
||||
if payload is not None:
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method=method.upper())
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=self.timeout_s) as resp:
|
||||
raw = resp.read()
|
||||
text = raw.decode("utf-8")
|
||||
except urllib.error.HTTPError as e:
|
||||
body = ""
|
||||
try:
|
||||
body = e.read().decode("utf-8")
|
||||
except Exception:
|
||||
pass
|
||||
raise ZenroomServiceError(f"HTTP {e.code} from {url}: {body or e.reason}") from e
|
||||
except urllib.error.URLError as e:
|
||||
raise ZenroomServiceError(f"Failed to reach {url}: {e.reason}") from e
|
||||
|
||||
text = text.strip()
|
||||
if not text:
|
||||
raise ZenroomServiceError(f"Empty response from {url}")
|
||||
|
||||
try:
|
||||
parsed = json.loads(text)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ZenroomServiceError(f"Non-JSON response from {url}: {text[:200]}") from e
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
raise ZenroomServiceError(f"Expected JSON object from {url}")
|
||||
|
||||
return parsed
|
||||
|
||||
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return self._request_json("POST", path, payload)
|
||||
|
||||
def _post_data(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self._post(path, {"data": data})
|
||||
|
||||
if "zenroom_errors" in res or "exception" in res:
|
||||
exc = res.get("exception", "")
|
||||
ze = res.get("zenroom_errors")
|
||||
logs = ""
|
||||
if isinstance(ze, dict):
|
||||
logs = str(ze.get("logs", ""))[:800]
|
||||
raise ZenroomServiceError(f"Zenroom error from {path}: {exc or logs or 'unknown error'}")
|
||||
|
||||
return res
|
||||
|
||||
@staticmethod
|
||||
def _require_non_empty_str(name: str, value: str) -> str:
|
||||
if not isinstance(value, str):
|
||||
raise TypeError(f"{name} must be a string")
|
||||
v = value.strip()
|
||||
if not v:
|
||||
raise ValueError(f"{name} cannot be empty")
|
||||
return v
|
||||
|
||||
def generate_keypair(self, my_name: str) -> Dict[str, str]:
|
||||
"""Generate an ECDH keypair using the RESTroom contract:
|
||||
|
||||
POST /api/Generate-a-keypair,-reading-identity-from-data
|
||||
body: { "data": { "myName": "<identity>" } }
|
||||
|
||||
Observed response from your service:
|
||||
{ "<identity>": { "keyring": { "ecdh": "<private_b64>" } } }
|
||||
|
||||
Some variants also include:
|
||||
"ecdh_public_key": "<public_b64>"
|
||||
|
||||
This method returns:
|
||||
{ "private_key": "...", "public_key": "..." } (public_key only if present)
|
||||
"""
|
||||
my_name = self._require_non_empty_str("my_name", my_name)
|
||||
|
||||
res = self._post_data(
|
||||
"Generate-a-keypair,-reading-identity-from-data",
|
||||
{"myName": my_name},
|
||||
)
|
||||
|
||||
if not res:
|
||||
raise ZenroomServiceError("Empty keypair response")
|
||||
|
||||
# Zenroom typically returns { "<identity>": { ... } }
|
||||
owner = next(iter(res.values()))
|
||||
if not isinstance(owner, dict):
|
||||
raise ZenroomServiceError(f"Invalid keypair response structure: {res!r}")
|
||||
|
||||
keyring = owner.get("keyring")
|
||||
if not isinstance(keyring, dict):
|
||||
raise ZenroomServiceError(f"Invalid keypair response (missing keyring): {res!r}")
|
||||
|
||||
private_key = keyring.get("ecdh")
|
||||
if not isinstance(private_key, str) or not private_key.strip():
|
||||
raise ZenroomServiceError(f"Invalid keypair response (missing keyring.ecdh): {res!r}")
|
||||
|
||||
out: Dict[str, str] = {"private_key": private_key}
|
||||
|
||||
public_key = owner.get("ecdh_public_key")
|
||||
if isinstance(public_key, str) and public_key.strip():
|
||||
out["public_key"] = public_key
|
||||
|
||||
return out
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,88 @@
|
|||
import os
|
||||
import sys
|
||||
import unittest
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# Make ca_core importable as the module root (so `import crypto...` works)
|
||||
code_path = Path(__file__).parents[1] / "ca_core"
|
||||
sys.path.insert(0, str(code_path))
|
||||
|
||||
from crypto.zenroom_client import ZenroomDockerClient, ZenroomError
|
||||
|
||||
|
||||
def _docker_ok():
|
||||
"""Return (ok, reason)."""
|
||||
try:
|
||||
p = subprocess.run(
|
||||
["docker", "version"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return False, "docker CLI not found"
|
||||
except Exception as e:
|
||||
return False, f"docker check failed: {e}"
|
||||
|
||||
if p.returncode != 0:
|
||||
out = (p.stdout or "").strip()
|
||||
return False, f"docker not usable: {out}"
|
||||
return True, ""
|
||||
|
||||
|
||||
def _image_exists(image: str):
|
||||
"""Return (ok, reason)."""
|
||||
try:
|
||||
p = subprocess.run(
|
||||
["docker", "image", "inspect", image],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False,
|
||||
)
|
||||
except Exception as e:
|
||||
return False, f"docker image inspect failed: {e}"
|
||||
|
||||
if p.returncode != 0:
|
||||
return False, f"docker image not found locally: {image}"
|
||||
return True, ""
|
||||
|
||||
|
||||
class TestZenroomDockerIntegration(unittest.TestCase):
|
||||
"""Integration tests for ZenroomDockerClient.
|
||||
|
||||
Enable by setting:
|
||||
ZENROOM_DOCKER_INTEGRATION=1
|
||||
|
||||
Image selection:
|
||||
ZENROOM_IMAGE (default: zenroom)
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not os.getenv("ZENROOM_DOCKER_INTEGRATION"):
|
||||
raise unittest.SkipTest("Docker integration disabled (set ZENROOM_DOCKER_INTEGRATION=1)")
|
||||
|
||||
ok, reason = _docker_ok()
|
||||
if not ok:
|
||||
raise unittest.SkipTest(reason)
|
||||
|
||||
cls.image = os.getenv("ZENROOM_IMAGE", "zenroom")
|
||||
|
||||
ok, reason = _image_exists(cls.image)
|
||||
if not ok:
|
||||
raise unittest.SkipTest(reason + " (build it or set ZENROOM_IMAGE)")
|
||||
|
||||
def test_basic_execution(self):
|
||||
client = ZenroomDockerClient(image=self.image)
|
||||
out = client.run("print('hello')")
|
||||
self.assertIn("hello", str(out))
|
||||
|
||||
def test_nonzero_exit_raises(self):
|
||||
client = ZenroomDockerClient(image=self.image)
|
||||
with self.assertRaises(ZenroomError):
|
||||
client.run("THIS IS NOT VALID ZENCODE")
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
import os
|
||||
import unittest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Import from ca_core (same pattern as other tests)
|
||||
code_path = Path(__file__).parent.parent.parent / "ca_core"
|
||||
sys.path.insert(0, str(code_path))
|
||||
|
||||
from crypto.zenroom_service_client import ZenroomServiceClient
|
||||
|
||||
|
||||
def _live_enabled() -> bool:
|
||||
return os.environ.get("RUN_LIVE_ZENROOM", "").strip().lower() in {
|
||||
"1", "true", "yes"
|
||||
}
|
||||
|
||||
|
||||
@unittest.skipUnless(
|
||||
_live_enabled(),
|
||||
"Set RUN_LIVE_ZENROOM=1 to run live Zenroom service smoke tests",
|
||||
)
|
||||
class TestZenroomLiveServices(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
base_url = os.environ.get("ZENROOM_BASE_URL", "http://localhost:3300").strip()
|
||||
api_prefix = os.environ.get("ZENROOM_API_PREFIX", "/api").strip()
|
||||
timeout_s = int(os.environ.get("ZENROOM_TIMEOUT_S", "20"))
|
||||
|
||||
cls.client = ZenroomServiceClient(
|
||||
base_url=base_url,
|
||||
api_prefix=api_prefix,
|
||||
timeout_s=timeout_s,
|
||||
)
|
||||
|
||||
def test_keypair_reading_identity_from_data(self):
|
||||
"""
|
||||
Tests:
|
||||
POST /api/Generate-a-keypair,-reading-identity-from-data
|
||||
Payload wrapped as {"data": {"myName": "..."}}
|
||||
"""
|
||||
res = self.client.generate_a_keypair_reading_identity_from_data(
|
||||
"LiveUser123456"
|
||||
)
|
||||
|
||||
self.assertIn("public_key", res)
|
||||
self.assertIn("private_key", res)
|
||||
self.assertIsInstance(res["public_key"], str)
|
||||
self.assertIsInstance(res["private_key"], str)
|
||||
self.assertTrue(res["public_key"])
|
||||
self.assertTrue(res["private_key"])
|
||||
|
||||
def test_encrypt_decrypt_password_roundtrip(self):
|
||||
"""
|
||||
Tests:
|
||||
POST /api/Encrypt-a-message-with-the-password
|
||||
POST /api/Decrypt-the-message-with-the-password
|
||||
"""
|
||||
plaintext = "Dear Bob, your name is too short, goodbye - Alice."
|
||||
|
||||
encrypted = self.client.encrypt_a_message_with_the_password(
|
||||
header="A very important secret",
|
||||
message=plaintext,
|
||||
password="myVerySecretPassword",
|
||||
)
|
||||
|
||||
for k in ("checksum", "header", "iv", "text"):
|
||||
self.assertIn(k, encrypted)
|
||||
self.assertIsInstance(encrypted[k], str)
|
||||
self.assertTrue(encrypted[k])
|
||||
|
||||
decrypted = self.client.decrypt_the_message_with_the_password(
|
||||
secret_message=encrypted,
|
||||
password="myVerySecretPassword",
|
||||
)
|
||||
|
||||
self.assertEqual(decrypted.get("textDecrypted"), plaintext)
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
import os
|
||||
import unittest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
code_path = Path(__file__).parents[2] / "ca_core"
|
||||
sys.path.insert(0, str(code_path))
|
||||
|
||||
from crypto.zenroom_service_client import ZenroomServiceClient
|
||||
|
||||
|
||||
class TestZenroomServiceClientIntegration(unittest.TestCase):
|
||||
|
||||
@unittest.skipUnless(
|
||||
os.getenv("ZENROOM_BASE_URL"),
|
||||
"No ZENROOM_BASE_URL set",
|
||||
)
|
||||
def test_generate_keypair_real_service(self):
|
||||
client = ZenroomServiceClient(base_url=os.environ["ZENROOM_BASE_URL"])
|
||||
|
||||
res = client.generate_keypair("IntegrationUser")
|
||||
|
||||
self.assertIn("private_key", res)
|
||||
self.assertIsInstance(res["private_key"], str)
|
||||
self.assertTrue(res["private_key"].strip())
|
||||
|
||||
# public_key may or may not be returned by this contract variant
|
||||
if "public_key" in res:
|
||||
self.assertIsInstance(res["public_key"], str)
|
||||
self.assertTrue(res["public_key"].strip())
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
from crypto.zenroom_service_client import ZenroomServiceClient
|
||||
|
||||
|
||||
class TestZenroomHTTPIntegration(unittest.TestCase):
|
||||
|
||||
@unittest.skipUnless(os.getenv("ZENROOM_BASE_URL"), "No ZENROOM_BASE_URL set")
|
||||
def test_sign_and_verify_roundtrip(self):
|
||||
base_url = os.environ["ZENROOM_BASE_URL"]
|
||||
client = ZenroomServiceClient(base_url=base_url)
|
||||
|
||||
# You must supply real keys here for full integration
|
||||
self.assertTrue(True)
|
||||
|
|
@ -0,0 +1,90 @@
|
|||
import unittest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
# Allow imports from ca_core (same pattern as existing tests)
|
||||
code_path = Path(__file__).parent.parent / "ca_core"
|
||||
sys.path.insert(0, str(code_path))
|
||||
|
||||
from crypto.zenroom_client import ZenroomDockerClient, ZenroomError
|
||||
|
||||
|
||||
class TestZenroomDockerClient(unittest.TestCase):
|
||||
def _fake_completed(self, returncode=0, stdout="", stderr=""):
|
||||
cp = mock.Mock()
|
||||
cp.returncode = returncode
|
||||
cp.stdout = stdout
|
||||
cp.stderr = stderr
|
||||
return cp
|
||||
|
||||
@mock.patch("crypto.zenroom_client.subprocess.run")
|
||||
def test_run_builds_expected_docker_command(self, m_run):
|
||||
m_run.return_value = self._fake_completed(stdout='{"ok": true}')
|
||||
client = ZenroomDockerClient(image="zenroom/zenroom:latest")
|
||||
|
||||
# Patch temp dir so we can assert paths deterministically
|
||||
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
|
||||
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
|
||||
m_td.return_value.__exit__.return_value = False
|
||||
|
||||
res = client.run("print('hi')", data={"a": 1}, keys={"k": "v"}, conf={"c": 2})
|
||||
|
||||
self.assertEqual(res, {"ok": True})
|
||||
|
||||
args, kwargs = m_run.call_args
|
||||
cmd = args[0]
|
||||
self.assertIn("docker", cmd[0])
|
||||
self.assertIn("run", cmd)
|
||||
self.assertIn("zenroom/zenroom:latest", cmd)
|
||||
|
||||
# Mount and workdir
|
||||
self.assertIn("-v", cmd)
|
||||
self.assertIn("/tmp/zenroom_test:/work", cmd)
|
||||
self.assertIn("-w", cmd)
|
||||
self.assertIn("/work", cmd)
|
||||
|
||||
# Zenroom base args
|
||||
self.assertIn("zenroom", cmd)
|
||||
self.assertIn("-z", cmd)
|
||||
|
||||
# Input files flags should be present
|
||||
self.assertIn("-a", cmd)
|
||||
self.assertIn("/work/data.json", cmd)
|
||||
self.assertIn("-k", cmd)
|
||||
self.assertIn("/work/keys.json", cmd)
|
||||
self.assertIn("-c", cmd)
|
||||
self.assertIn("/work/conf.json", cmd)
|
||||
|
||||
# Script at end
|
||||
self.assertEqual(cmd[-1], "/work/script.zen")
|
||||
|
||||
# subprocess.run called with capture_output/text
|
||||
self.assertTrue(kwargs.get("capture_output"))
|
||||
self.assertTrue(kwargs.get("text"))
|
||||
|
||||
@mock.patch("crypto.zenroom_client.subprocess.run")
|
||||
def test_run_returns_raw_stdout_when_not_json(self, m_run):
|
||||
m_run.return_value = self._fake_completed(stdout="hello")
|
||||
client = ZenroomDockerClient()
|
||||
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
|
||||
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
|
||||
m_td.return_value.__exit__.return_value = False
|
||||
out = client.run("print('hi')")
|
||||
self.assertEqual(out, "hello")
|
||||
|
||||
@mock.patch("crypto.zenroom_client.subprocess.run")
|
||||
def test_run_raises_on_nonzero_exit(self, m_run):
|
||||
m_run.return_value = self._fake_completed(returncode=1, stderr="boom")
|
||||
client = ZenroomDockerClient()
|
||||
with mock.patch("crypto.zenroom_client.tempfile.TemporaryDirectory") as m_td:
|
||||
m_td.return_value.__enter__.return_value = "/tmp/zenroom_test"
|
||||
m_td.return_value.__exit__.return_value = False
|
||||
with self.assertRaises(ZenroomError) as ctx:
|
||||
client.run("print('hi')")
|
||||
self.assertIn("boom", str(ctx.exception))
|
||||
|
||||
def test_run_requires_non_empty_script(self):
|
||||
client = ZenroomDockerClient()
|
||||
with self.assertRaises(ValueError):
|
||||
client.run(" ")
|
||||
|
|
@ -0,0 +1,67 @@
|
|||
import json
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
code_path = Path(__file__).parents[1] / "ca_core"
|
||||
sys.path.insert(0, str(code_path))
|
||||
|
||||
from crypto.zenroom_service_client import ZenroomServiceClient
|
||||
|
||||
|
||||
class _FakeHTTPResponse:
|
||||
def __init__(self, body: bytes):
|
||||
self._body = body
|
||||
|
||||
def read(self):
|
||||
return self._body
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
|
||||
class TestZenroomServiceClient(unittest.TestCase):
|
||||
|
||||
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
||||
def test_generate_keypair_unpacks_private_key(self, m_urlopen):
|
||||
payload = {
|
||||
"IntegrationUser": {
|
||||
"keyring": {"ecdh": "PRIVKEY"},
|
||||
}
|
||||
}
|
||||
|
||||
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
||||
|
||||
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
||||
res = client.generate_keypair("IntegrationUser")
|
||||
|
||||
self.assertEqual(res["private_key"], "PRIVKEY")
|
||||
self.assertNotIn("public_key", res)
|
||||
|
||||
req = m_urlopen.call_args[0][0]
|
||||
self.assertEqual(req.method, "POST")
|
||||
self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data"))
|
||||
|
||||
sent = json.loads(req.data.decode("utf-8"))
|
||||
self.assertEqual(sent, {"data": {"myName": "IntegrationUser"}})
|
||||
|
||||
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
||||
def test_generate_keypair_includes_public_key_if_present(self, m_urlopen):
|
||||
payload = {
|
||||
"IntegrationUser": {
|
||||
"ecdh_public_key": "PUBKEY",
|
||||
"keyring": {"ecdh": "PRIVKEY"},
|
||||
}
|
||||
}
|
||||
|
||||
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
||||
|
||||
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
||||
res = client.generate_keypair("IntegrationUser")
|
||||
|
||||
self.assertEqual(res["private_key"], "PRIVKEY")
|
||||
self.assertEqual(res["public_key"], "PUBKEY")
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
import json
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
code_path = Path(__file__).parents[1] / "ca_core"
|
||||
sys.path.insert(0, str(code_path))
|
||||
|
||||
from crypto.zenroom_service_client import ZenroomServiceClient
|
||||
|
||||
|
||||
class _FakeHTTPResponse:
|
||||
def __init__(self, body: bytes):
|
||||
self._body = body
|
||||
|
||||
def read(self):
|
||||
return self._body
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
|
||||
class TestZenroomServiceClient(unittest.TestCase):
|
||||
|
||||
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
||||
def test_generate_keypair_unpacks_keys(self, m_urlopen):
|
||||
|
||||
payload = {
|
||||
"Owner": {
|
||||
"ecdh_public_key": "PUBKEY",
|
||||
"keyring": {"ecdh": "PRIVKEY"},
|
||||
}
|
||||
}
|
||||
|
||||
m_urlopen.return_value = _FakeHTTPResponse(
|
||||
json.dumps(payload).encode("utf-8")
|
||||
)
|
||||
|
||||
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
||||
res = client.generate_keypair("User123")
|
||||
|
||||
self.assertEqual(res["public_key"], "PUBKEY")
|
||||
self.assertEqual(res["private_key"], "PRIVKEY")
|
||||
|
||||
req = m_urlopen.call_args[0][0]
|
||||
self.assertEqual(req.method, "POST")
|
||||
self.assertTrue(
|
||||
req.full_url.endswith(
|
||||
"/api/Generate-a-keypair,-reading-identity-from-data"
|
||||
)
|
||||
)
|
||||
Loading…
Reference in New Issue