225 lines
8.9 KiB
Python
225 lines
8.9 KiB
Python
import json
|
|
import unittest
|
|
from unittest import mock
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
code_path = Path(__file__).parents[1] / "ca_core"
|
|
sys.path.insert(0, str(code_path))
|
|
|
|
from crypto.zenroom_service_client import ZenroomServiceClient, ZenroomServiceError
|
|
|
|
|
|
class _FakeHTTPResponse:
|
|
def __init__(self, body: bytes):
|
|
self._body = body
|
|
|
|
def read(self):
|
|
return self._body
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
|
|
class TestZenroomServiceClient(unittest.TestCase):
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_generate_keypair_returns_keyring_and_private_key(self, m_urlopen):
|
|
payload = {
|
|
"User123456": {"keyring": {"ecdh": "PRIVKEY"}}
|
|
}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
res = client.generate_keypair("User123456")
|
|
|
|
self.assertEqual(res["my_name"], "User123456")
|
|
self.assertEqual(res["private_key"], "PRIVKEY")
|
|
self.assertEqual(res["keyring"], {"ecdh": "PRIVKEY"})
|
|
self.assertNotIn("public_key", res)
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertEqual(req.method, "POST")
|
|
self.assertTrue(req.full_url.endswith("/api/Generate-a-keypair,-reading-identity-from-data"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(sent, {"data": {"myName": "User123456"}})
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_generate_public_key_returns_string(self, m_urlopen):
|
|
payload = {"ecdh_public_key": "PUBKEY"}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
pub = client.generate_public_key({"ecdh": "PRIVKEY"})
|
|
self.assertEqual(pub, "PUBKEY")
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Generate-public-key"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(sent, {"data": {"keyring": {"ecdh": "PRIVKEY"}}})
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_symmetric_encrypt_returns_secret_message_dict(self, m_urlopen):
|
|
payload = {
|
|
"secret_message": {
|
|
"checksum": "C",
|
|
"header": "H",
|
|
"iv": "IV",
|
|
"text": "T",
|
|
}
|
|
}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
sm = client.symmetric_encrypt(
|
|
header="A very important secret",
|
|
message="hello",
|
|
shared_key="myVerySecretPassword",
|
|
)
|
|
self.assertEqual(sm["checksum"], "C")
|
|
self.assertEqual(sm["header"], "H")
|
|
self.assertEqual(sm["iv"], "IV")
|
|
self.assertEqual(sm["text"], "T")
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-with-the-password"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(
|
|
sent,
|
|
{"data": {"header": "A very important secret", "message": "hello", "password": "myVerySecretPassword"}},
|
|
)
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_symmetric_decrypt_returns_plaintext(self, m_urlopen):
|
|
payload = {"textDecrypted": "PLAINTEXT"}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
txt = client.symmetric_decrypt(secret_message={"iv": "x"}, shared_key="k")
|
|
self.assertEqual(txt, "PLAINTEXT")
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Decrypt-the-message-with-the-password"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(sent, {"data": {"secret_message": {"iv": "x"}, "password": "k"}})
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_asymmetric_encrypt_returns_secret(self, m_urlopen):
|
|
payload = {"secret": {"checksum": "C", "header": "H", "iv": "IV", "text": "T"}}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
sec = client.asymmetric_encrypt(
|
|
receiver_public_key="PUB",
|
|
sender_keyring={"ecdh": "PRIV"},
|
|
header="hdr",
|
|
message="msg",
|
|
)
|
|
self.assertEqual(sec, {"checksum": "C", "header": "H", "iv": "IV", "text": "T"})
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Encrypt-a-message-for-two-recipients-using-asymmetric-cryptography"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(
|
|
sent,
|
|
{
|
|
"data": {
|
|
"reciever": {"public_key": "PUB"},
|
|
"sender": {"keyring": {"ecdh": "PRIV"}},
|
|
"header": "hdr",
|
|
"message": "msg",
|
|
}
|
|
},
|
|
)
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_asymmetric_decrypt_returns_header_and_text(self, m_urlopen):
|
|
payload = {"header": "HDR", "text": "TXT"}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
out = client.asymmetric_decrypt(
|
|
sender_public_key="PUB",
|
|
receiver_keyring={"ecdh": "PRIV"},
|
|
secret={"iv": "IV"},
|
|
)
|
|
self.assertEqual(out, {"header": "HDR", "text": "TXT"})
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Decrypt-a-message-for-two-recipients-using-asymmetric-cryptography"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(
|
|
sent,
|
|
{
|
|
"data": {
|
|
"sender": {"public_key": "PUB"},
|
|
"reciever": {"keyring": {"ecdh": "PRIV"}},
|
|
"secret": {"iv": "IV"},
|
|
}
|
|
},
|
|
)
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_sign_objects_returns_response_and_validates_signatures(self, m_urlopen):
|
|
payload = {
|
|
"myMessage": "hello",
|
|
"myMessage.signature": {"r": "R", "s": "S"},
|
|
}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
res = client.sign_objects(objects={"myMessage": "hello"}, signer_keyring={"ecdh": "PRIV"})
|
|
|
|
self.assertEqual(res["myMessage"], "hello")
|
|
self.assertEqual(res["myMessage.signature"]["r"], "R")
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Sign-objects-using-asymmetric-cryptography"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(
|
|
sent,
|
|
{"data": {"mySecretStuff": {"myMessage": "hello"}, "signer": {"keyring": {"ecdh": "PRIV"}}}},
|
|
)
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_verify_signature_returns_true(self, m_urlopen):
|
|
payload = {
|
|
"myMessage": "hello",
|
|
"output": ["Zenroom_certifies_that_signature_is_correct!"],
|
|
}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
ok = client.verify_signature(
|
|
message_field="myMessage",
|
|
message_value="hello",
|
|
signature={"r": "R", "s": "S"},
|
|
signer_public_key="PUB",
|
|
)
|
|
self.assertTrue(ok)
|
|
|
|
req = m_urlopen.call_args[0][0]
|
|
self.assertTrue(req.full_url.endswith("/api/Verify-asymmetric-cryptography-signature"))
|
|
sent = json.loads(req.data.decode("utf-8"))
|
|
self.assertEqual(
|
|
sent,
|
|
{"data": {"myMessage": "hello", "myMessage.signature": {"r": "R", "s": "S"}, "signer": {"public_key": "PUB"}}},
|
|
)
|
|
|
|
@mock.patch("crypto.zenroom_service_client.urllib.request.urlopen")
|
|
def test_zenroom_error_is_raised(self, m_urlopen):
|
|
payload = {"exception": "boom", "zenroom_errors": {"logs": "fail"}}
|
|
m_urlopen.return_value = _FakeHTTPResponse(json.dumps(payload).encode("utf-8"))
|
|
|
|
client = ZenroomServiceClient(base_url="http://localhost:3300")
|
|
with self.assertRaises(ZenroomServiceError):
|
|
client.verify_signature(
|
|
message_field="myMessage",
|
|
message_value="hello",
|
|
signature={"r": "R", "s": "S"},
|
|
signer_public_key="PUB",
|
|
)
|