pki_ca/tests/test_zenroom_service_client.py

225 lines
8.9 KiB
Python

import json
import unittest
from unittest import mock
import sys
from pathlib import Path
code_path = Path(__file__).parents[1] / "ca_core"
sys.path.insert(0, str(code_path))
from crypto.zenroom_service_client import ZenroomServiceClient, ZenroomServiceError
class _FakeHTTPResponse:
def __init__(self, body: bytes):
self._body = body
def read(self):
return self._body
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class TestZenroomServiceClient(unittest.TestCase):
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_keypair_returns_keyring_and_private_key(self, m_urlopen):
payload = {
"User123456": {"keyring": {"ecdh": "PRIVKEY"}}
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.generate_keypair("User123456")
self.assertEqual(res["my_name"], "User123456")
self.assertEqual(res["private_key"], "PRIVKEY")
self.assertEqual(res["keyring"], {"ecdh": "PRIVKEY"})
self.assertNotIn("public_key", res)
req = m_urlopen.call_args[0][0]
self.assertEqual(req.method, "POST")
self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(sent, {"data": {"myName": "User123456"}})
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_generate_public_key_returns_string(self, m_urlopen):
payload = {"ecdh_public_key": "PUBKEY"}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
pub = client.generate_public_key({"ecdh": "PRIVKEY"})
self.assertEqual(pub, "PUBKEY")
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Generate-public-key"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(sent, {"data": {"keyring": {"ecdh": "PRIVKEY"}}})
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_symmetric_encrypt_returns_secret_message_dict(self, m_urlopen):
payload = {
"secret_message": {
"checksum": "C",
"header": "H",
"iv": "IV",
"text": "T",
}
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
sm = client.symmetric_encrypt(
header="A very important secret",
message="hello",
shared_key="myVerySecretPassword",
)
self.assertEqual(sm["checksum"], "C")
self.assertEqual(sm["header"], "H")
self.assertEqual(sm["iv"], "IV")
self.assertEqual(sm["text"], "T")
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-with-the-password"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(
sent,
{"data": {"header": "A very important secret", "message": "hello", "password": "myVerySecretPassword"}},
)
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_symmetric_decrypt_returns_plaintext(self, m_urlopen):
payload = {"textDecrypted": "PLAINTEXT"}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
txt = client.symmetric_decrypt(secret_message={"iv": "x"}, shared_key="k")
self.assertEqual(txt, "PLAINTEXT")
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Decrypt-the-message-with-the-password"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(sent, {"data": {"secret_message": {"iv": "x"}, "password": "k"}})
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_asymmetric_encrypt_returns_secret(self, m_urlopen):
payload = {"secret": {"checksum": "C", "header": "H", "iv": "IV", "text": "T"}}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
sec = client.asymmetric_encrypt(
receiver_public_key="PUB",
sender_keyring={"ecdh": "PRIV"},
header="hdr",
message="msg",
)
self.assertEqual(sec, {"checksum": "C", "header": "H", "iv": "IV", "text": "T"})
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(
sent,
{
"data": {
"reciever": {"public_key": "PUB"},
"sender": {"keyring": {"ecdh": "PRIV"}},
"header": "hdr",
"message": "msg",
}
},
)
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_asymmetric_decrypt_returns_header_and_text(self, m_urlopen):
payload = {"header": "HDR", "text": "TXT"}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
out = client.asymmetric_decrypt(
sender_public_key="PUB",
receiver_keyring={"ecdh": "PRIV"},
secret={"iv": "IV"},
)
self.assertEqual(out, {"header": "HDR", "text": "TXT"})
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(
sent,
{
"data": {
"sender": {"public_key": "PUB"},
"reciever": {"keyring": {"ecdh": "PRIV"}},
"secret": {"iv": "IV"},
}
},
)
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_sign_objects_returns_response_and_validates_signatures(self, m_urlopen):
payload = {
"myMessage": "hello",
"myMessage.signature": {"r": "R", "s": "S"},
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
res = client.sign_objects(objects={"myMessage": "hello"}, signer_keyring={"ecdh": "PRIV"})
self.assertEqual(res["myMessage"], "hello")
self.assertEqual(res["myMessage.signature"]["r"], "R")
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Sign-objects-using-asymmetric-cryptography"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(
sent,
{"data": {"mySecretStuff": {"myMessage": "hello"}, "signer": {"keyring": {"ecdh": "PRIV"}}}},
)
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_verify_signature_returns_true(self, m_urlopen):
payload = {
"myMessage": "hello",
"output": ["Zenroom_certifies_that_signature_is_correct!"],
}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
ok = client.verify_signature(
message_field="myMessage",
message_value="hello",
signature={"r": "R", "s": "S"},
signer_public_key="PUB",
)
self.assertTrue(ok)
req = m_urlopen.call_args[0][0]
self.assertTrue(req.full_url.endswith("/api/Verify-asymmetric-cryptography-signature"))
sent = json.loads(req.data.decode("utf-8"))
self.assertEqual(
sent,
{"data": {"myMessage": "hello", "myMessage.signature": {"r": "R", "s": "S"}, "signer": {"public_key": "PUB"}}},
)
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
def test_zenroom_error_is_raised(self, m_urlopen):
payload = {"exception": "boom", "zenroom_errors": {"logs": "fail"}}
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
client = ZenroomServiceClient(base_url="http://localhost:3300")
with self.assertRaises(ZenroomServiceError):
client.verify_signature(
message_field="myMessage",
message_value="hello",
signature={"r": "R", "s": "S"},
signer_public_key="PUB",
)