diff --git a/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc b/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc index d90133f..414e968 100644 Binary files a/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc and b/ca_core/crypto/__pycache__/zenroom_service_client.cpython-313.pyc differ diff --git a/ca_core/crypto/zenroom_service_client.py b/ca_core/crypto/zenroom_service_client.py index 1ad8473..09300e8 100644 --- a/ca_core/crypto/zenroom_service_client.py +++ b/ca_core/crypto/zenroom_service_client.py @@ -1,7 +1,7 @@ import json import urllib.request import urllib.error -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple class ZenroomServiceError(RuntimeError): @@ -80,8 +80,11 @@ class ZenroomServiceClient: return self._request_json("POST", path, payload) def _post_data(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]: + # Per your rule: if "keys" is empty, omit it entirely. + # All your services in this round only need {"data": ...} res = self._post(path, {"data": data}) + # RESTroom convention: on failure you get zenroom_errors and/or exception if "zenroom_errors" in res or "exception" in res: exc = res.get("exception", "") ze = res.get("zenroom_errors") @@ -101,20 +104,52 @@ class ZenroomServiceClient: raise ValueError(f"{name} cannot be empty") return v - def generate_keypair(self, my_name: str) -> Dict[str, str]: - """Generate an ECDH keypair using the RESTroom contract: + @staticmethod + def _require_dict(name: str, value: Any) -> Dict[str, Any]: + if not isinstance(value, dict): + raise TypeError(f"{name} must be a dict") + return value - POST /api/Generate-a-keypair,-reading-identity-from-data - body: { "data": { "myName": "" } } + @staticmethod + def _require_keys(d: Dict[str, Any], *, required: Tuple[str, ...], ctx: str) -> None: + missing = [k for k in required if k not in d] + if missing: + raise ZenroomServiceError(f"Missing {missing} in {ctx}: {d!r}") - Observed response from your service: - { "": { "keyring": { "ecdh": "" } } } + def _pick_owner_block(self, res: Dict[str, Any], my_name: str, ctx: str) -> Dict[str, Any]: + """ + Zenroom often returns: { "": { ... } } + Prefer res[my_name] when present, else accept single-entry dict. + """ + if my_name in res: + owner = res[my_name] + elif len(res) == 1: + owner = next(iter(res.values())) + else: + raise ZenroomServiceError(f"Ambiguous {ctx} response (no key '{my_name}', len={len(res)}): {res!r}") - Some variants also include: - "ecdh_public_key": "" + if not isinstance(owner, dict): + raise ZenroomServiceError(f"Invalid {ctx} response structure: {res!r}") + return owner - This method returns: - { "private_key": "...", "public_key": "..." } (public_key only if present) + # ------------------------------------------------------------------------- + # Service 1: Generate-a-keypair,-reading-identity-from-data + # ------------------------------------------------------------------------- + def generate_keypair(self, my_name: str) -> Dict[str, Any]: + """ + POST Generate-a-keypair,-reading-identity-from-data + body: {"data": {"myName": ""}} + + Observed response: + { "": { "keyring": { "ecdh": "" } } } + + Return (normalized, plus backward-compatible fields): + { + "my_name": "", + "keyring": {"ecdh": ""}, + "private_key": "", + # "public_key": "" only if the service variant returned it + } """ my_name = self._require_non_empty_str("my_name", my_name) @@ -123,13 +158,7 @@ class ZenroomServiceClient: {"myName": my_name}, ) - if not res: - raise ZenroomServiceError("Empty keypair response") - - # Zenroom typically returns { "": { ... } } - owner = next(iter(res.values())) - if not isinstance(owner, dict): - raise ZenroomServiceError(f"Invalid keypair response structure: {res!r}") + owner = self._pick_owner_block(res, my_name, "keypair") keyring = owner.get("keyring") if not isinstance(keyring, dict): @@ -139,10 +168,284 @@ class ZenroomServiceClient: if not isinstance(private_key, str) or not private_key.strip(): raise ZenroomServiceError(f"Invalid keypair response (missing keyring.ecdh): {res!r}") - out: Dict[str, str] = {"private_key": private_key} + out: Dict[str, Any] = { + "my_name": my_name, + "keyring": keyring, + "private_key": private_key, # convenience alias + } + # Some variants might include this (but your current one does not) public_key = owner.get("ecdh_public_key") if isinstance(public_key, str) and public_key.strip(): out["public_key"] = public_key return out + + # ------------------------------------------------------------------------- + # Service 2: Generate-public-key + # ------------------------------------------------------------------------- + def generate_public_key(self, keyring: Dict[str, Any]) -> str: + """ + POST Generate-public-key + body: {"data": {"keyring": {"ecdh": "..."} }} + + Response: + {"ecdh_public_key": ""} + + Returns the public key string. + """ + keyring = self._require_dict("keyring", keyring) + + res = self._post_data( + "Generate-public-key", + {"keyring": keyring}, + ) + + pub = res.get("ecdh_public_key") + if not isinstance(pub, str) or not pub.strip(): + raise ZenroomServiceError(f"Invalid public key response: {res!r}") + return pub + + # ------------------------------------------------------------------------- + # Service 3: Encrypt-a-message-with-the-password (symmetric) + # ------------------------------------------------------------------------- + def symmetric_encrypt(self, *, header: str, message: str, shared_key: str) -> Dict[str, str]: + """ + POST Encrypt-a-message-with-the-password + body: {"data": {"header": "...", "message": "...", "password": "..."}} + + Response: + {"secret_message": {"checksum": "...", "header": "...", "iv": "...", "text": "..."}} + + Returns the inner secret_message dict. + """ + header = self._require_non_empty_str("header", header) + message = self._require_non_empty_str("message", message) + shared_key = self._require_non_empty_str("shared_key", shared_key) + + res = self._post_data( + "Encrypt-a-message-with-the-password", + {"header": header, "message": message, "password": shared_key}, + ) + + sm = res.get("secret_message") + if not isinstance(sm, dict): + raise ZenroomServiceError(f"Invalid encrypt response (missing secret_message): {res!r}") + + self._require_keys(sm, required=("checksum", "header", "iv", "text"), ctx="secret_message") + for k in ("checksum", "header", "iv", "text"): + if not isinstance(sm.get(k), str) or not sm[k].strip(): + raise ZenroomServiceError(f"Invalid secret_message.{k}: {sm!r}") + + return { + "checksum": sm["checksum"], + "header": sm["header"], + "iv": sm["iv"], + "text": sm["text"], + } + + # ------------------------------------------------------------------------- + # Service 4: Decrypt-the-message-with-the-password (symmetric) + # ------------------------------------------------------------------------- + def symmetric_decrypt(self, *, secret_message: Dict[str, Any], shared_key: str) -> str: + """ + POST Decrypt-the-message-with-the-password + body: {"data": {"secret_message": {...}, "password": "..."}} + + Response: + {"textDecrypted": ""} + + Returns decrypted plaintext. + """ + secret_message = self._require_dict("secret_message", secret_message) + shared_key = self._require_non_empty_str("shared_key", shared_key) + + res = self._post_data( + "Decrypt-the-message-with-the-password", + {"secret_message": secret_message, "password": shared_key}, + ) + + txt = res.get("textDecrypted") + if not isinstance(txt, str): + raise ZenroomServiceError(f"Invalid decrypt response: {res!r}") + return txt + + # ------------------------------------------------------------------------- + # Service 5: Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography + # ------------------------------------------------------------------------- + def asymmetric_encrypt( + self, + *, + receiver_public_key: str, + sender_keyring: Dict[str, Any], + header: str, + message: str, + ) -> Dict[str, str]: + """ + POST Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography + + Note: service expects 'reciever' spelling. + + Returns inner 'secret' dict with checksum/header/iv/text. + """ + receiver_public_key = self._require_non_empty_str("receiver_public_key", receiver_public_key) + sender_keyring = self._require_dict("sender_keyring", sender_keyring) + header = self._require_non_empty_str("header", header) + message = self._require_non_empty_str("message", message) + + res = self._post_data( + "Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography", + { + "reciever": {"public_key": receiver_public_key}, + "sender": {"keyring": sender_keyring}, + "header": header, + "message": message, + }, + ) + + sec = res.get("secret") + if not isinstance(sec, dict): + raise ZenroomServiceError(f"Invalid asymmetric encrypt response (missing secret): {res!r}") + + self._require_keys(sec, required=("checksum", "header", "iv", "text"), ctx="secret") + for k in ("checksum", "header", "iv", "text"): + if not isinstance(sec.get(k), str) or not sec[k].strip(): + raise ZenroomServiceError(f"Invalid secret.{k}: {sec!r}") + + return { + "checksum": sec["checksum"], + "header": sec["header"], + "iv": sec["iv"], + "text": sec["text"], + } + + # ------------------------------------------------------------------------- + # Service 6: Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography + # ------------------------------------------------------------------------- + def asymmetric_decrypt( + self, + *, + sender_public_key: str, + receiver_keyring: Dict[str, Any], + secret: Dict[str, Any], + ) -> Dict[str, str]: + """ + POST Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography + + Note: service expects 'reciever' spelling. + + Returns {"header": "...", "text": "..."}. + """ + sender_public_key = self._require_non_empty_str("sender_public_key", sender_public_key) + receiver_keyring = self._require_dict("receiver_keyring", receiver_keyring) + secret = self._require_dict("secret", secret) + + res = self._post_data( + "Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography", + { + "sender": {"public_key": sender_public_key}, + "reciever": {"keyring": receiver_keyring}, + "secret": secret, + }, + ) + + hdr = res.get("header") + txt = res.get("text") + if not isinstance(hdr, str) or not isinstance(txt, str): + raise ZenroomServiceError(f"Invalid asymmetric decrypt response: {res!r}") + + return {"header": hdr, "text": txt} + + # ------------------------------------------------------------------------- + # Service 7: Sign-objects-using-asymmetric-cryptography + # ------------------------------------------------------------------------- + def sign_objects(self, *, objects: Dict[str, Any], signer_keyring: Dict[str, Any]) -> Dict[str, Any]: + """ + POST Sign-objects-using-asymmetric-cryptography + body: {"data": {"mySecretStuff": {...}, "signer": {"keyring": {...}}}} + + Response echoes fields and adds "<field>.signature": {"r": "...", "s": "..."}. + Returns response as-is (validated to contain at least one signature). + """ + objects = self._require_dict("objects", objects) + signer_keyring = self._require_dict("signer_keyring", signer_keyring) + + res = self._post_data( + "Sign-objects-using-asymmetric-cryptography", + {"mySecretStuff": objects, "signer": {"keyring": signer_keyring}}, + ) + + # Validate at least one "*.signature" and that each has r/s. + sig_keys = [k for k in res.keys() if isinstance(k, str) and k.endswith(".signature")] + if not sig_keys: + raise ZenroomServiceError(f"No signatures found in sign response: {res!r}") + + for k in sig_keys: + sig = res.get(k) + if not isinstance(sig, dict): + raise ZenroomServiceError(f"Invalid signature object for {k}: {res!r}") + if not isinstance(sig.get("r"), str) or not isinstance(sig.get("s"), str): + raise ZenroomServiceError(f"Invalid signature fields for {k}: {sig!r}") + + return res + + # ------------------------------------------------------------------------- + # Service 8: Verify-asymmetric-cryptography-signature + # ------------------------------------------------------------------------- + def verify_signature( + self, + *, + message_field: str, + message_value: str, + signature: Dict[str, Any], + signer_public_key: str, + ) -> bool: + """ + POST Verify-asymmetric-cryptography-signature + + Input example uses dynamic field names like: + "myMessage": "...", + "myMessage.signature": {"r": "...", "s": "..."}, + "signer": {"public_key": "..."} + + On success, response includes: + {"output": ["Zenroom_certifies_that_signature_is_correct!"], ...} + + On failure, RESTroom returns zenroom_errors/exception which _post_data raises. + Returns True on success. + """ + message_field = self._require_non_empty_str("message_field", message_field) + message_value = self._require_non_empty_str("message_value", message_value) + signature = self._require_dict("signature", signature) + signer_public_key = self._require_non_empty_str("signer_public_key", signer_public_key) + + payload: Dict[str, Any] = { + message_field: message_value, + f"{message_field}.signature": signature, + "signer": {"public_key": signer_public_key}, + } + + res = self._post_data("Verify-asymmetric-cryptography-signature", payload) + + out = res.get("output") + if not isinstance(out, list) or not out: + raise ZenroomServiceError(f"Invalid verify response: {res!r}") + + # We accept any non-empty success output, but the canonical string is: + # "Zenroom_certifies_that_signature_is_correct!" + return True + + # ------------------------------------------------------------------------- + # Backward-compatible alias names (used by existing live tests / older code) + # ------------------------------------------------------------------------- + def generate_a_keypair_reading_identity_from_data(self, my_name: str) -> Dict[str, Any]: + return self.generate_keypair(my_name) + + def encrypt_a_message_with_the_password(self, *, header: str, message: str, password: str) -> Dict[str, str]: + return self.symmetric_encrypt(header=header, message=message, shared_key=password) + + def decrypt_the_message_with_the_password(self, *, secret_message: Dict[str, Any], password: str) -> Dict[str, str]: + # Historical alias returned {"textDecrypted": "..."} in some tests; + # keep that shape for compatibility. + txt = self.symmetric_decrypt(secret_message=secret_message, shared_key=password) + return {"textDecrypted": txt} diff --git a/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc b/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc index 55af9bb..6481604 100644 Binary files a/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc and b/tests/__pycache__/test_zenroom_service_client.cpython-313.pyc differ diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/__pycache__/__init__.cpython-313.pyc b/tests/integration/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..871338f Binary files /dev/null and b/tests/integration/__pycache__/__init__.cpython-313.pyc differ diff --git a/tests/integration/__pycache__/test_zenroom_live.cpython-313.pyc b/tests/integration/__pycache__/test_zenroom_live.cpython-313.pyc new file mode 100644 index 0000000..25969f1 Binary files /dev/null and b/tests/integration/__pycache__/test_zenroom_live.cpython-313.pyc differ diff --git a/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc b/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc index b892a8d..552bde6 100644 Binary files a/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc and b/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc differ diff --git a/tests/integration/test_zenroom_service_client_integration.py b/tests/integration/test_zenroom_service_client_integration.py index 87c79b3..90947ca 100644 --- a/tests/integration/test_zenroom_service_client_integration.py +++ b/tests/integration/test_zenroom_service_client_integration.py @@ -11,20 +11,25 @@ from crypto.zenroom_service_client import ZenroomServiceClient class TestZenroomServiceClientIntegration(unittest.TestCase): - @unittest.skipUnless( - os.getenv("ZENROOM_BASE_URL"), - "No ZENROOM_BASE_URL set", - ) - def test_generate_keypair_real_service(self): + @unittest.skipUnless(os.getenv("ZENROOM_BASE_URL"), "No ZENROOM_BASE_URL set") + def test_end_to_end_smoke(self): client = ZenroomServiceClient(base_url=os.environ["ZENROOM_BASE_URL"]) - res = client.generate_keypair("IntegrationUser") + # 1) keypair -> public key + kp = client.generate_keypair("IntegrationUser123456") + self.assertIn("keyring", kp) + self.assertIn("private_key", kp) - self.assertIn("private_key", res) - self.assertIsInstance(res["private_key"], str) - self.assertTrue(res["private_key"].strip()) + pub = client.generate_public_key(kp["keyring"]) + self.assertIsInstance(pub, str) + self.assertTrue(pub.strip()) - # public_key may or may not be returned by this contract variant - if "public_key" in res: - self.assertIsInstance(res["public_key"], str) - self.assertTrue(res["public_key"].strip()) + # 2) symmetric roundtrip + plaintext = "Dear Bob, your name is too short, goodbye - Alice." + sm = client.symmetric_encrypt( + header="A very important secret", + message=plaintext, + shared_key="myVerySecretPassword", + ) + pt = client.symmetric_decrypt(secret_message=sm, shared_key="myVerySecretPassword") + self.assertEqual(pt, plaintext) diff --git a/tests/test_zenroom_service_client.py b/tests/test_zenroom_service_client.py index 24fccdd..ee36fde 100644 --- a/tests/test_zenroom_service_client.py +++ b/tests/test_zenroom_service_client.py @@ -7,7 +7,7 @@ from pathlib import Path code_path = Path(__file__).parents[1] / "ca_core" sys.path.insert(0, str(code_path)) -from crypto.zenroom_service_client import ZenroomServiceClient +from crypto.zenroom_service_client import ZenroomServiceClient, ZenroomServiceError class _FakeHTTPResponse: @@ -27,41 +27,198 @@ class _FakeHTTPResponse: class TestZenroomServiceClient(unittest.TestCase): @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") - def test_generate_keypair_unpacks_private_key(self, m_urlopen): + def test_generate_keypair_returns_keyring_and_private_key(self, m_urlopen): payload = { - "IntegrationUser": { - "keyring": {"ecdh": "PRIVKEY"}, - } + "User123456": {"keyring": {"ecdh": "PRIVKEY"}} } - m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") - res = client.generate_keypair("IntegrationUser") + res = client.generate_keypair("User123456") + self.assertEqual(res["my_name"], "User123456") self.assertEqual(res["private_key"], "PRIVKEY") + self.assertEqual(res["keyring"], {"ecdh": "PRIVKEY"}) self.assertNotIn("public_key", res) req = m_urlopen.call_args[0][0] self.assertEqual(req.method, "POST") self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data")) - sent = json.loads(req.data.decode("utf-8")) - self.assertEqual(sent, {"data": {"myName": "IntegrationUser"}}) + self.assertEqual(sent, {"data": {"myName": "User123456"}}) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") - def test_generate_keypair_includes_public_key_if_present(self, m_urlopen): - payload = { - "IntegrationUser": { - "ecdh_public_key": "PUBKEY", - "keyring": {"ecdh": "PRIVKEY"}, - } - } - + def test_generate_public_key_returns_string(self, m_urlopen): + payload = {"ecdh_public_key": "PUBKEY"} m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") - res = client.generate_keypair("IntegrationUser") + pub = client.generate_public_key({"ecdh": "PRIVKEY"}) + self.assertEqual(pub, "PUBKEY") - self.assertEqual(res["private_key"], "PRIVKEY") - self.assertEqual(res["public_key"], "PUBKEY") + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Generate-public-key")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual(sent, {"data": {"keyring": {"ecdh": "PRIVKEY"}}}) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_symmetric_encrypt_returns_secret_message_dict(self, m_urlopen): + payload = { + "secret_message": { + "checksum": "C", + "header": "H", + "iv": "IV", + "text": "T", + } + } + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + sm = client.symmetric_encrypt( + header="A very important secret", + message="hello", + shared_key="myVerySecretPassword", + ) + self.assertEqual(sm["checksum"], "C") + self.assertEqual(sm["header"], "H") + self.assertEqual(sm["iv"], "IV") + self.assertEqual(sm["text"], "T") + + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-with-the-password")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual( + sent, + {"data": {"header": "A very important secret", "message": "hello", "password": "myVerySecretPassword"}}, + ) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_symmetric_decrypt_returns_plaintext(self, m_urlopen): + payload = {"textDecrypted": "PLAINTEXT"} + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + txt = client.symmetric_decrypt(secret_message={"iv": "x"}, shared_key="k") + self.assertEqual(txt, "PLAINTEXT") + + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Decrypt-the-message-with-the-password")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual(sent, {"data": {"secret_message": {"iv": "x"}, "password": "k"}}) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_asymmetric_encrypt_returns_secret(self, m_urlopen): + payload = {"secret": {"checksum": "C", "header": "H", "iv": "IV", "text": "T"}} + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + sec = client.asymmetric_encrypt( + receiver_public_key="PUB", + sender_keyring={"ecdh": "PRIV"}, + header="hdr", + message="msg", + ) + self.assertEqual(sec, {"checksum": "C", "header": "H", "iv": "IV", "text": "T"}) + + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual( + sent, + { + "data": { + "reciever": {"public_key": "PUB"}, + "sender": {"keyring": {"ecdh": "PRIV"}}, + "header": "hdr", + "message": "msg", + } + }, + ) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_asymmetric_decrypt_returns_header_and_text(self, m_urlopen): + payload = {"header": "HDR", "text": "TXT"} + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + out = client.asymmetric_decrypt( + sender_public_key="PUB", + receiver_keyring={"ecdh": "PRIV"}, + secret={"iv": "IV"}, + ) + self.assertEqual(out, {"header": "HDR", "text": "TXT"}) + + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual( + sent, + { + "data": { + "sender": {"public_key": "PUB"}, + "reciever": {"keyring": {"ecdh": "PRIV"}}, + "secret": {"iv": "IV"}, + } + }, + ) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_sign_objects_returns_response_and_validates_signatures(self, m_urlopen): + payload = { + "myMessage": "hello", + "myMessage.signature": {"r": "R", "s": "S"}, + } + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + res = client.sign_objects(objects={"myMessage": "hello"}, signer_keyring={"ecdh": "PRIV"}) + + self.assertEqual(res["myMessage"], "hello") + self.assertEqual(res["myMessage.signature"]["r"], "R") + + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Sign-objects-using-asymmetric-cryptography")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual( + sent, + {"data": {"mySecretStuff": {"myMessage": "hello"}, "signer": {"keyring": {"ecdh": "PRIV"}}}}, + ) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_verify_signature_returns_true(self, m_urlopen): + payload = { + "myMessage": "hello", + "output": ["Zenroom_certifies_that_signature_is_correct!"], + } + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + ok = client.verify_signature( + message_field="myMessage", + message_value="hello", + signature={"r": "R", "s": "S"}, + signer_public_key="PUB", + ) + self.assertTrue(ok) + + req = m_urlopen.call_args[0][0] + self.assertTrue(req.full_url.endswith("/api/Verify-asymmetric-cryptography-signature")) + sent = json.loads(req.data.decode("utf-8")) + self.assertEqual( + sent, + {"data": {"myMessage": "hello", "myMessage.signature": {"r": "R", "s": "S"}, "signer": {"public_key": "PUB"}}}, + ) + + @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") + def test_zenroom_error_is_raised(self, m_urlopen): + payload = {"exception": "boom", "zenroom_errors": {"logs": "fail"}} + m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) + + client = ZenroomServiceClient(base_url="http://localhost:3300") + with self.assertRaises(ZenroomServiceError): + client.verify_signature( + message_field="myMessage", + message_value="hello", + signature={"r": "R", "s": "S"}, + signer_public_key="PUB", + )