-
-
Save simoleone/5f1fc81713ca5f9c174c3198c4f9a159 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import time | |
import urllib.parse | |
# NB: using pip packages: requests and pycryptodome | |
import requests | |
from Crypto.Cipher import AES, PKCS1_v1_5 | |
from Crypto.PublicKey import RSA | |
from Crypto.Random import get_random_bytes | |
AUTH_TOKEN = 'XXX' | |
BASE_URL = 'https://api.sandbox.hummingbird.co/' | |
# for example purposes send an empty JSON object which will fail validation AFTER decryption but prove decryption works | |
ALERT_DATA = {} | |
# POST /alert_uploads | |
###################### | |
post_response = requests.request( | |
'POST', | |
urllib.parse.urljoin(BASE_URL, '/alert_uploads'), | |
headers={ | |
'Accept': 'application/json', | |
'Authorization': f'Bearer {AUTH_TOKEN}' | |
}, | |
json={} | |
) | |
post_response = post_response.json() | |
print(post_response) | |
# Encrypt an Alert | |
###################### | |
session_key = get_random_bytes(32) | |
# NB: Ruby and python differ on what they call this. Ruby calls it IV and Python nonce for AES-GCM. | |
# Further, Ruby will only accept 12 byte IVs for AES-GCM. | |
iv = get_random_bytes(12) | |
cipher = AES.new(session_key, AES.MODE_GCM, nonce=iv) | |
ciphertext, auth_tag = cipher.encrypt_and_digest(bytes(json.dumps(ALERT_DATA), 'UTF-8')) | |
# key wrapping | |
recipient_key = RSA.import_key(post_response['alert_upload']['encryption']['public_key']['key']) | |
# NB: PKCS1_V1_5 versus PCKS1_OAEP. Our ruby implementation defaulted to PKCS1_V1_5 but there is nothing | |
# preventing us from using PCKS1_OAEP if you would prefer. | |
cipher_rsa = PKCS1_v1_5.new(recipient_key) | |
wrapped_session_key = cipher_rsa.encrypt(session_key) | |
header = { | |
'version': 1, | |
'key_id': post_response['alert_upload']['encryption']['public_key']['id'], | |
'encrypted_session_key': base64.b64encode(wrapped_session_key).decode('utf-8'), | |
'iv': base64.b64encode(iv).decode('utf-8'), | |
'auth_tag': base64.b64encode(auth_tag).decode('utf-8') | |
} | |
# NB: this needs to be assembled as bytes instead of a string to avoid encoding issues. | |
envelope = bytearray(json.dumps(header).encode('utf-8')) # convert to bytes | |
envelope.append(0x00) # delimiter | |
envelope.extend(ciphertext) # already bytes | |
# POST S3 | |
s3_response = requests.request( | |
'POST', | |
post_response['alert_upload']['upload']['url'], | |
headers={}, | |
data=post_response['alert_upload']['upload']['fields'], | |
files=[ | |
('file', ('encrypted_envelope', envelope, 'application/octet-stream')) | |
] | |
) | |
print(s3_response) | |
print(s3_response.text) | |
# PUT /alert_uploads/:token/complete | |
put_response = requests.request( | |
'PUT', | |
urllib.parse.urljoin(BASE_URL, f"/alert_uploads/{post_response['alert_upload']['token']}/complete"), | |
headers={ | |
'Accept': 'application/json', | |
'Authorization': f'Bearer {AUTH_TOKEN}' | |
} | |
) | |
print(put_response) | |
print(put_response.json()) | |
# Poll for a result | |
print("Waiting for results....") | |
for i in range(0, 5): | |
get_response = requests.request( | |
'GET', | |
urllib.parse.urljoin(BASE_URL, f"/alert_uploads/{post_response['alert_upload']['token']}"), | |
headers={ | |
'Accept': 'application/json', | |
'Authorization': f'Bearer {AUTH_TOKEN}' | |
} | |
) | |
print(get_response) | |
response_json = get_response.json() | |
print(response_json) | |
if response_json['alert_upload']['status'] == 'UPLOAD_ERROR': | |
break | |
time.sleep(5) | |
# expected result is an error about the decrypted contents | |
# {'alert_upload': {'token': 'ZvYZiSvsvC7NnQCDjp9iEvuQ', 'updated_at': '2021-12-02T19:04:48.371Z', 'created_at': '2021-12-02T19:04:48.169Z', 'alert_processing_started_at': None, 'failed_at': '2021-12-02T19:04:48.368Z', 'error_message': '#/components/schemas/AlertRequest missing required parameters: alerts', 'status': 'UPLOAD_ERROR', 'alerts': []}, 'success': True} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment