Skip to content

Instantly share code, notes, and snippets.

@simoleone
Last active December 2, 2021 19:22
Show Gist options
  • Save simoleone/5f1fc81713ca5f9c174c3198c4f9a159 to your computer and use it in GitHub Desktop.
Save simoleone/5f1fc81713ca5f9c174c3198c4f9a159 to your computer and use it in GitHub Desktop.
import base64
import json
import time
import urllib.parse
# NB: using pip packages: requests and pycryptodome
import requests
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
AUTH_TOKEN = 'XXX'
BASE_URL = 'https://api.sandbox.hummingbird.co/'
# for example purposes send an empty JSON object which will fail validation AFTER decryption but prove decryption works
ALERT_DATA = {}
# POST /alert_uploads
######################
post_response = requests.request(
'POST',
urllib.parse.urljoin(BASE_URL, '/alert_uploads'),
headers={
'Accept': 'application/json',
'Authorization': f'Bearer {AUTH_TOKEN}'
},
json={}
)
post_response = post_response.json()
print(post_response)
# Encrypt an Alert
######################
session_key = get_random_bytes(32)
# NB: Ruby and python differ on what they call this. Ruby calls it IV and Python nonce for AES-GCM.
# Further, Ruby will only accept 12 byte IVs for AES-GCM.
iv = get_random_bytes(12)
cipher = AES.new(session_key, AES.MODE_GCM, nonce=iv)
ciphertext, auth_tag = cipher.encrypt_and_digest(bytes(json.dumps(ALERT_DATA), 'UTF-8'))
# key wrapping
recipient_key = RSA.import_key(post_response['alert_upload']['encryption']['public_key']['key'])
# NB: PKCS1_V1_5 versus PCKS1_OAEP. Our ruby implementation defaulted to PKCS1_V1_5 but there is nothing
# preventing us from using PCKS1_OAEP if you would prefer.
cipher_rsa = PKCS1_v1_5.new(recipient_key)
wrapped_session_key = cipher_rsa.encrypt(session_key)
header = {
'version': 1,
'key_id': post_response['alert_upload']['encryption']['public_key']['id'],
'encrypted_session_key': base64.b64encode(wrapped_session_key).decode('utf-8'),
'iv': base64.b64encode(iv).decode('utf-8'),
'auth_tag': base64.b64encode(auth_tag).decode('utf-8')
}
# NB: this needs to be assembled as bytes instead of a string to avoid encoding issues.
envelope = bytearray(json.dumps(header).encode('utf-8')) # convert to bytes
envelope.append(0x00) # delimiter
envelope.extend(ciphertext) # already bytes
# POST S3
s3_response = requests.request(
'POST',
post_response['alert_upload']['upload']['url'],
headers={},
data=post_response['alert_upload']['upload']['fields'],
files=[
('file', ('encrypted_envelope', envelope, 'application/octet-stream'))
]
)
print(s3_response)
print(s3_response.text)
# PUT /alert_uploads/:token/complete
put_response = requests.request(
'PUT',
urllib.parse.urljoin(BASE_URL, f"/alert_uploads/{post_response['alert_upload']['token']}/complete"),
headers={
'Accept': 'application/json',
'Authorization': f'Bearer {AUTH_TOKEN}'
}
)
print(put_response)
print(put_response.json())
# Poll for a result
print("Waiting for results....")
for i in range(0, 5):
get_response = requests.request(
'GET',
urllib.parse.urljoin(BASE_URL, f"/alert_uploads/{post_response['alert_upload']['token']}"),
headers={
'Accept': 'application/json',
'Authorization': f'Bearer {AUTH_TOKEN}'
}
)
print(get_response)
response_json = get_response.json()
print(response_json)
if response_json['alert_upload']['status'] == 'UPLOAD_ERROR':
break
time.sleep(5)
# expected result is an error about the decrypted contents
# {'alert_upload': {'token': 'ZvYZiSvsvC7NnQCDjp9iEvuQ', 'updated_at': '2021-12-02T19:04:48.371Z', 'created_at': '2021-12-02T19:04:48.169Z', 'alert_processing_started_at': None, 'failed_at': '2021-12-02T19:04:48.368Z', 'error_message': '#/components/schemas/AlertRequest missing required parameters: alerts', 'status': 'UPLOAD_ERROR', 'alerts': []}, 'success': True}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment