diff --git a/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc b/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc index 552bde6..3b8520b 100644 Binary files a/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc and b/tests/integration/__pycache__/test_zenroom_service_client_integration.cpython-313.pyc differ diff --git a/tests/integration/test_zenroom_live.py b/tests/integration/test_zenroom_live.py index b045cca..b07762e 100644 --- a/tests/integration/test_zenroom_live.py +++ b/tests/integration/test_zenroom_live.py @@ -3,7 +3,7 @@ import unittest import sys from pathlib import Path -# Import from ca_core (same pattern as other tests) +# Import from ca_core code_path = Path(__file__).parent.parent.parent / "ca_core" sys.path.insert(0, str(code_path)) @@ -11,15 +11,10 @@ from crypto.zenroom_service_client import ZenroomServiceClient def _live_enabled() -> bool: - return os.environ.get("RUN_LIVE_ZENROOM", "").strip().lower() in { - "1", "true", "yes" - } + return os.environ.get("RUN_LIVE_ZENROOM", "").strip().lower() in {"1", "true", "yes"} -@unittest.skipUnless( - _live_enabled(), - "Set RUN_LIVE_ZENROOM=1 to run live Zenroom service smoke tests", -) +@unittest.skipUnless(_live_enabled(), "Set RUN_LIVE_ZENROOM=1 to run live Zenroom service smoke tests") class TestZenroomLiveServices(unittest.TestCase): @classmethod @@ -34,45 +29,43 @@ class TestZenroomLiveServices(unittest.TestCase): timeout_s=timeout_s, ) - def test_keypair_reading_identity_from_data(self): - """ - Tests: - POST /api/Generate-a-keypair,-reading-identity-from-data - Payload wrapped as {"data": {"myName": "..."}} - """ - res = self.client.generate_a_keypair_reading_identity_from_data( - "LiveUser123456" - ) + def test_end_to_end_8_calls(self): + sender_kp = self.client.generate_keypair("LiveUser123456") + sender_pub = self.client.generate_public_key(sender_kp["keyring"]) - self.assertIn("public_key", res) - self.assertIn("private_key", res) - self.assertIsInstance(res["public_key"], str) - self.assertIsInstance(res["private_key"], str) - self.assertTrue(res["public_key"]) - self.assertTrue(res["private_key"]) - - def test_encrypt_decrypt_password_roundtrip(self): - """ - Tests: - POST /api/Encrypt-a-message-with-the-password - POST /api/Decrypt-the-message-with-the-password - """ plaintext = "Dear Bob, your name is too short, goodbye - Alice." - - encrypted = self.client.encrypt_a_message_with_the_password( + sm = self.client.symmetric_encrypt( header="A very important secret", message=plaintext, - password="myVerySecretPassword", + shared_key="myVerySecretPassword", ) + pt = self.client.symmetric_decrypt(secret_message=sm, shared_key="myVerySecretPassword") + self.assertEqual(pt, plaintext) - for k in ("checksum", "header", "iv", "text"): - self.assertIn(k, encrypted) - self.assertIsInstance(encrypted[k], str) - self.assertTrue(encrypted[k]) - - decrypted = self.client.decrypt_the_message_with_the_password( - secret_message=encrypted, - password="myVerySecretPassword", + secret = self.client.asymmetric_encrypt( + receiver_public_key=sender_pub, + sender_keyring=sender_kp["keyring"], + message="Hello from live test", + header="Live header", ) + dec = self.client.asymmetric_decrypt( + sender_public_key=sender_pub, + receiver_keyring=sender_kp["keyring"], + secret=secret, + ) + self.assertEqual(dec["header"], "Live header") + self.assertEqual(dec["text"], "Hello from live test") - self.assertEqual(decrypted.get("textDecrypted"), plaintext) + signed = self.client.sign_objects( + signer_keyring=sender_kp["keyring"], + objects={"myMessage": "Signed live message"}, + ) + sig = signed["myMessage.signature"] + + ok = self.client.verify_signature( + message_field="myMessage", + message_value=signed["myMessage"], + signature={"r": sig["r"], "s": sig["s"]}, + signer_public_key=sender_pub, + ) + self.assertTrue(ok) diff --git a/tests/integration/test_zenroom_service_client_integration.py b/tests/integration/test_zenroom_service_client_integration.py index 90947ca..cf75970 100644 --- a/tests/integration/test_zenroom_service_client_integration.py +++ b/tests/integration/test_zenroom_service_client_integration.py @@ -3,6 +3,7 @@ import unittest import sys from pathlib import Path +# Import from ca_core code_path = Path(__file__).parents[2] / "ca_core" sys.path.insert(0, str(code_path)) @@ -10,21 +11,39 @@ from crypto.zenroom_service_client import ZenroomServiceClient class TestZenroomServiceClientIntegration(unittest.TestCase): + """Service integration: runs against a live RESTroom/Zenroom HTTP service. + + Enable by setting: + ZENROOM_BASE_URL=http://localhost:3300 + """ @unittest.skipUnless(os.getenv("ZENROOM_BASE_URL"), "No ZENROOM_BASE_URL set") - def test_end_to_end_smoke(self): + def test_end_to_end_smoke_8_calls(self): + """Hits exactly these 8 endpoints once each (in typical service logs): + 1) Generate-a-keypair,-reading-identity-from-data + 2) Generate-public-key + 3) Encrypt-a-message-with-the-password + 4) Decrypt-the-message-with-the-password + 5) Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography + 6) Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography + 7) Sign-objects-using-asymmetric-cryptography + 8) Verify-asymmetric-cryptography-signature + + Note: To keep it to 8 calls, we reuse the same identity as both sender and receiver + for the asymmetric encrypt/decrypt roundtrip. + """ client = ZenroomServiceClient(base_url=os.environ["ZENROOM_BASE_URL"]) - # 1) keypair -> public key - kp = client.generate_keypair("IntegrationUser123456") - self.assertIn("keyring", kp) - self.assertIn("private_key", kp) + # 1) keypair (private) -> 2) public key + sender_kp = client.generate_keypair("IntegrationUser123456") + self.assertIn("keyring", sender_kp) + self.assertIn("private_key", sender_kp) - pub = client.generate_public_key(kp["keyring"]) - self.assertIsInstance(pub, str) - self.assertTrue(pub.strip()) + sender_pub = client.generate_public_key(sender_kp["keyring"]) + self.assertIsInstance(sender_pub, str) + self.assertTrue(sender_pub.strip()) - # 2) symmetric roundtrip + # 3) symmetric encrypt -> 4) symmetric decrypt plaintext = "Dear Bob, your name is too short, goodbye - Alice." sm = client.symmetric_encrypt( header="A very important secret", @@ -33,3 +52,38 @@ class TestZenroomServiceClientIntegration(unittest.TestCase): ) pt = client.symmetric_decrypt(secret_message=sm, shared_key="myVerySecretPassword") self.assertEqual(pt, plaintext) + + # 5) asymmetric encrypt (receiver pub == sender pub to avoid extra keypair/public-key calls) + secret = client.asymmetric_encrypt( + receiver_public_key=sender_pub, + sender_keyring=sender_kp["keyring"], + message="Hello from integration test", + header="Integration header", + ) + + # 6) asymmetric decrypt (sender public key is sender_pub; receiver private key == sender private key) + dec = client.asymmetric_decrypt( + sender_public_key=sender_pub, + receiver_keyring=sender_kp["keyring"], + secret=secret, + ) + self.assertEqual(dec["header"], "Integration header") + self.assertEqual(dec["text"], "Hello from integration test") + + # 7) sign -> 8) verify + signed = client.sign_objects( + signer_keyring=sender_kp["keyring"], + objects={"myMessage": "Signed integration message"}, + ) + self.assertIn("myMessage.signature", signed) + sig = signed["myMessage.signature"] + self.assertIn("r", sig) + self.assertIn("s", sig) + + ok = client.verify_signature( + message_field="myMessage", + message_value=signed["myMessage"], + signature={"r": sig["r"], "s": sig["s"]}, + signer_public_key=sender_pub, + ) + self.assertTrue(ok)