Skip to content

Instantly share code, notes, and snippets.

Created Dec 18, 2017
What would you like to do?
Lambda Function for Signing POST request to S3(Direct Uploads)
import base64
import hashlib
import hmac
import json
import logging
import os
logger = logging.getLogger(__name__)
ACCESS_KEY = os.environ.get('ACCESS_KEY')
SECRET_KEY = os.environ.get('SECRET_KEY')
# Key derivation functions. See:
def sign(key, msg):
return, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def sign_policy(policy, credential):
""" Sign and return the policy document for a simple upload. """
base64_policy = base64.b64encode(policy)
parts = credential.split('/')
date_stamp = parts[1]
region = parts[2]
service = parts[3]
signedKey = getSignatureKey(SECRET_KEY, date_stamp, region, service)
signature =, base64_policy, hashlib.sha256).hexdigest()
base64_policy = base64_policy.decode('utf-8')
return {'policy': base64_policy, 'signature': signature}
def sign_headers(headers):
""" Sign and return the headers for a chunked upload. """
# headers = str(bytearray(headers, 'utf-8')) # hmac doesn't want unicode
parts = headers.split('\n')
canonical_request = ('\n'.join(parts[3:]))
algorithm = parts[0]
amz_date = parts[1]
credential_scope = parts[2]
string_to_sign = algorithm + '\n' + amz_date + '\n' + credential_scope + '\n' + hashlib.sha256(
cred_parts = credential_scope.split('/')
date_stamp = cred_parts[0]
region = cred_parts[1]
service = cred_parts[2]
signed_key = getSignatureKey(SECRET_KEY, date_stamp, region, service)
signature =, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
return {'signature': signature}
def index(event, context):
""" Route for signing the policy document or REST headers. """
print('event: ', event)
request_payload = event
if request_payload.get('headers'):
response_data = sign_headers(request_payload['headers'])
credential = list([c for c in request_payload['conditions'] if 'x-amz-credential' in c][0].values())[0]
print(credential, type(credential))
credential = str(credential)
response_data = sign_policy(json.dumps(event).encode('utf-8'), credential)
return response_data
Copy link

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment