Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@chrisdlangton
Last active December 2, 2019 22:56
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chrisdlangton/25c3f9356ff7ebfb324fb6acc5665b4a to your computer and use it in GitHub Desktop.
Save chrisdlangton/25c3f9356ff7ebfb324fb6acc5665b4a to your computer and use it in GitHub Desktop.
Enforcing Perfect Forward Secrecy for AWS Query Request HTTP API
"""
Based on https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
"""
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from requests.adapters import HTTPAdapter
import hmac
import hashlib
import datetime
import base64
import os
import sys
import requests
import socket, ssl
import pprint
ciphers = 'kEECDH:kEDH:!aNULL:!eNULL:!DES:!3DES:!RC4'
class CustomAdapter(HTTPAdapter):
"""
A TransportAdapter that enforces PFS using EDH in Requests.
"""
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=ciphers)
kwargs['ssl_context'] = context
return super(CustomAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=ciphers)
kwargs['ssl_context'] = context
return super(CustomAdapter, self).proxy_manager_for(*args, **kwargs)
method = 'GET'
service = 'ec2'
host = 'ec2.amazonaws.com'
region = 'us-east-1'
endpoint = 'https://ec2.amazonaws.com'
request_parameters = 'Action=DescribeRegions&Version=2013-10-15'
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
if access_key is None or secret_key is None:
print('No access key is available.')
sys.exit()
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
canonical_uri = '/'
canonical_querystring = request_parameters
canonical_headers = 'host:' + host + '\n' + 'x-amz-date:' + amzdate + '\n'
signed_headers = 'host;x-amz-date'
payload_hash = hashlib.sha256(('').encode('utf-8')).hexdigest()
canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + \
'\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + \
'/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + \
'\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
signing_key = getSignatureKey(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode(
'utf-8'), hashlib.sha256).hexdigest()
authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + \
credential_scope + ', ' + 'SignedHeaders=' + \
signed_headers + ', ' + 'Signature=' + signature
headers = {'x-amz-date': amzdate, 'Authorization': authorization_header}
# ************* SEND THE REQUEST *************
request_url = endpoint + '?' + canonical_querystring
print('\nBEGIN REQUEST++++++++++++++++++++++++++++++++++++')
print('Request URL = ' + request_url)
s = requests.Session()
s.mount(endpoint, CustomAdapter())
r = s.get(request_url, headers=headers)
print('\nRESPONSE++++++++++++++++++++++++++++++++++++')
print('Response code: %d\n' % r.status_code)
print(r.text)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment