Created
November 26, 2012 03:30
-
-
Save tzangms/4146466 to your computer and use it in GitHub Desktop.
Private content signing for CloudFront in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Crypto.Hash.SHA | |
import Crypto.PublicKey.RSA | |
import Crypto.Signature.PKCS1_v1_5 | |
import base64 | |
import time | |
KEY_PAIR_ID = "your_key_pair_id" | |
PRIV_KEY_FILE = "pk-xxxxxxxxxx.pem" | |
def get_signature(message, private_key): | |
key = Crypto.PublicKey.RSA.importKey(private_key) | |
signer = Crypto.Signature.PKCS1_v1_5.new(key) | |
sha1_hash = Crypto.Hash.SHA.new() | |
sha1_hash.update(bytes(message)) | |
return signer.sign(sha1_hash) | |
def aws_url_base64_encode(msg): | |
msg_base64 = base64.b64encode(msg) | |
msg_base64 = msg_base64.replace('+', '-') | |
msg_base64 = msg_base64.replace('=', '_') | |
msg_base64 = msg_base64.replace('/', '~') | |
return msg_base64 | |
def create_url(url, encoded_signature, key_pair_id, expires): | |
signed_url = "%(url)s?Expires=%(expires)s&Signature=%(encoded_signature)s&Key-Pair-Id=%(key_pair_id)s" % { | |
'url': url, | |
'expires': expires, | |
'encoded_signature': encoded_signature, | |
'key_pair_id': key_pair_id, | |
} | |
return signed_url | |
def get_canned_policy_url(url, priv_key_string, key_pair_id, expires): | |
# manually construct this policy string to ensure formatting matches signature | |
canned_policy = '{"Statement":[{"Resource":"%(url)s","Condition":{"DateLessThan":{"AWS:EpochTime":%(expires)s}}}]}' % {'url':url, 'expires':expires} | |
# sign the non-encoded policy | |
signature = get_signature(canned_policy, priv_key_string) | |
# now base64 encode the signature (URL safe as well) | |
encoded_signature = aws_url_base64_encode(signature) | |
# combine these into a full url | |
signed_url = create_url(url, encoded_signature, key_pair_id, expires) | |
return signed_url | |
def encode_query_param(resource): | |
enc = resource | |
enc = enc.replace('?', '%3F') | |
enc = enc.replace('=', '%3D') | |
enc = enc.replace('&', '%26') | |
return enc | |
def cloudfront(resource, secs=10): | |
expires = int(time.time()) + secs | |
priv_key_string = open(PRIV_KEY_FILE).read() | |
signed_url = get_canned_policy_url(resource, priv_key_string, KEY_PAIR_ID, expires) | |
# Flash player doesn't like query params so encode them if you're using a streaming distribution | |
# enc_url = encode_query_param(signed_url) | |
# print(enc_url) | |
return signed_url | |
if __name__ == '__main__': | |
url = "http://xxxxxxxxxx.cloudfront.net/your_file" | |
expires = 30 | |
print cloudfront(url, expires) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment