import json import unittest from unittest import mock import sys from pathlib import Path code_path = Path(__file__).parents[1] / "ca_core" sys.path.insert(0, str(code_path)) from ca_core.crypto.zenroom_service_client import ZenroomServiceClient, ZenroomServiceError class _FakeHTTPResponse: def __init__(self, body: bytes): self._body = body def read(self): return self._body def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False class TestZenroomServiceClient(unittest.TestCase): @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_generate_keypair_returns_keyring_and_private_key(self, m_urlopen): payload = { "User123456": {"keyring": {"ecdh": "PRIVKEY"}} } m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") res = client.generate_keypair("User123456") self.assertEqual(res["my_name"], "User123456") self.assertEqual(res["private_key"], "PRIVKEY") self.assertEqual(res["keyring"], {"ecdh": "PRIVKEY"}) self.assertNotIn("public_key", res) req = m_urlopen.call_args[0][0] self.assertEqual(req.method, "POST") self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual(sent, {"data": {"myName": "User123456"}}) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_generate_public_key_returns_string(self, m_urlopen): payload = {"ecdh_public_key": "PUBKEY"} m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") pub = client.generate_public_key({"ecdh": "PRIVKEY"}) self.assertEqual(pub, "PUBKEY") req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Generate-public-key")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual(sent, {"data": {"keyring": {"ecdh": "PRIVKEY"}}}) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_symmetric_encrypt_returns_secret_message_dict(self, m_urlopen): payload = { "secret_message": { "checksum": "C", "header": "H", "iv": "IV", "text": "T", } } m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") sm = client.symmetric_encrypt( header="A very important secret", message="hello", shared_key="myVerySecretPassword", ) self.assertEqual(sm["checksum"], "C") self.assertEqual(sm["header"], "H") self.assertEqual(sm["iv"], "IV") self.assertEqual(sm["text"], "T") req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-with-the-password")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual( sent, {"data": {"header": "A very important secret", "message": "hello", "password": "myVerySecretPassword"}}, ) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_symmetric_decrypt_returns_plaintext(self, m_urlopen): payload = {"textDecrypted": "PLAINTEXT"} m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") txt = client.symmetric_decrypt(secret_message={"iv": "x"}, shared_key="k") self.assertEqual(txt, "PLAINTEXT") req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Decrypt-the-message-with-the-password")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual(sent, {"data": {"secret_message": {"iv": "x"}, "password": "k"}}) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_asymmetric_encrypt_returns_secret(self, m_urlopen): payload = {"secret": {"checksum": "C", "header": "H", "iv": "IV", "text": "T"}} m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") sec = client.asymmetric_encrypt( receiver_public_key="PUB", sender_keyring={"ecdh": "PRIV"}, header="hdr", message="msg", ) self.assertEqual(sec, {"checksum": "C", "header": "H", "iv": "IV", "text": "T"}) req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual( sent, { "data": { "reciever": {"public_key": "PUB"}, "sender": {"keyring": {"ecdh": "PRIV"}}, "header": "hdr", "message": "msg", } }, ) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_asymmetric_decrypt_returns_header_and_text(self, m_urlopen): payload = {"header": "HDR", "text": "TXT"} m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") out = client.asymmetric_decrypt( sender_public_key="PUB", receiver_keyring={"ecdh": "PRIV"}, secret={"iv": "IV"}, ) self.assertEqual(out, {"header": "HDR", "text": "TXT"}) req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual( sent, { "data": { "sender": {"public_key": "PUB"}, "reciever": {"keyring": {"ecdh": "PRIV"}}, "secret": {"iv": "IV"}, } }, ) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_sign_objects_returns_response_and_validates_signatures(self, m_urlopen): payload = { "myMessage": "hello", "myMessage.signature": {"r": "R", "s": "S"}, } m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") res = client.sign_objects(objects={"myMessage": "hello"}, signer_keyring={"ecdh": "PRIV"}) self.assertEqual(res["myMessage"], "hello") self.assertEqual(res["myMessage.signature"]["r"], "R") req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Sign-objects-using-asymmetric-cryptography")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual( sent, {"data": {"mySecretStuff": {"myMessage": "hello"}, "signer": {"keyring": {"ecdh": "PRIV"}}}}, ) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_verify_signature_returns_true(self, m_urlopen): payload = { "myMessage": "hello", "output": ["Zenroom_certifies_that_signature_is_correct!"], } m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") ok = client.verify_signature( message_field="myMessage", message_value="hello", signature={"r": "R", "s": "S"}, signer_public_key="PUB", ) self.assertTrue(ok) req = m_urlopen.call_args[0][0] self.assertTrue(req.full_url.endswith("/api/Verify-asymmetric-cryptography-signature")) sent = json.loads(req.data.decode("utf-8")) self.assertEqual( sent, {"data": {"myMessage": "hello", "myMessage.signature": {"r": "R", "s": "S"}, "signer": {"public_key": "PUB"}}}, ) @mock.patch("crypto.zenroom_service_client.urllib.request.urlopen") def test_zenroom_error_is_raised(self, m_urlopen): payload = {"exception": "boom", "zenroom_errors": {"logs": "fail"}} m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8")) client = ZenroomServiceClient(base_url="http://localhost:3300") with self.assertRaises(ZenroomServiceError): client.verify_signature( message_field="myMessage", message_value="hello", signature={"r": "R", "s": "S"}, signer_public_key="PUB", )