Created
January 20, 2010 11:50
-
-
Save simonw/281793 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django import forms | |
import datetime | |
import base64 | |
import hmac, sha, simplejson | |
from django.utils.safestring import mark_safe | |
""" | |
http://developer.amazonwebservices.com/connect/entry.jspa?externalID=1434 | |
<input type="hidden" name="key" value="uploads/${filename}"> | |
<input type="hidden" name="AWSAccessKeyId" value="YOUR_AWS_ACCESS_KEY"> | |
<input type="hidden" name="acl" value="private"> | |
<input type="hidden" name="success_action_redirect" value="http://localhost/"> | |
<input type="hidden" name="policy" value="YOUR_POLICY_DOCUMENT_BASE64_ENCODED"> | |
<input type="hidden" name="signature" value="YOUR_CALCULATED_SIGNATURE"> | |
<input type="hidden" name="Content-Type" value="image/jpeg"> | |
Suggested usage: | |
def upload_s3(request): | |
form = S3UploadForm( | |
aws_access_key = AWS_ACCESS_KEY, | |
aws_secret_key = AWS_SECRET_KEY, | |
bucket = 'my-bucket', | |
key = str(uuid.uuid1()), | |
expires_after = datetime.timedelta(minutes = 10), | |
acl = 'private', | |
success_action_redirect = 'http://127.0.0.1:8000/upload/', | |
max_size = 1024 * 100, | |
) | |
bucket = request.GET.get('bucket', '') | |
key = request.GET.get('key', '') | |
etag = request.GET.get('etag', '') | |
message = 'Please upload your file' | |
if bucket and key and etag: | |
if form.confirm_key_exists(bucket, key, etag): | |
message = 'Thank you for your file' | |
else: | |
message = 'Invalid upload, please try again' | |
return render('upload.html', { | |
'form': form, | |
'message': message, | |
}) | |
""" | |
class S3UploadForm(forms.Form): | |
key = forms.CharField(widget = forms.HiddenInput) | |
AWSAccessKeyId = forms.CharField(widget = forms.HiddenInput) | |
acl = forms.CharField(widget = forms.HiddenInput) | |
success_action_redirect = forms.CharField(widget = forms.HiddenInput) | |
policy = forms.CharField(widget = forms.HiddenInput) | |
signature = forms.CharField(widget = forms.HiddenInput) | |
Content_Type = forms.CharField(widget = forms.HiddenInput) | |
file = forms.FileField() | |
def __init__(self, aws_access_key, aws_secret_key, bucket, key, | |
expires_after = datetime.timedelta(days = 30), | |
acl = 'public-read', | |
success_action_redirect = None, | |
content_type = '', | |
min_size = 0, | |
max_size = None | |
): | |
self.aws_access_key = aws_access_key | |
self.aws_secret_key = aws_secret_key | |
self.bucket = bucket | |
self.key = key | |
self.expires_after = expires_after | |
self.acl = acl | |
self.success_action_redirect = success_action_redirect | |
self.content_type = content_type | |
self.min_size = min_size | |
self.max_size = max_size | |
policy = base64.b64encode(self.calculate_policy()) | |
signature = self.sign_policy(policy) | |
initial = { | |
'key': self.key, | |
'AWSAccessKeyId': self.aws_access_key, | |
'acl': self.acl, | |
'policy': policy, | |
'signature': signature, | |
'Content-Type': self.content_type, | |
} | |
if self.success_action_redirect: | |
initial['success_action_redirect'] = self.success_action_redirect | |
super(S3UploadForm, self).__init__(initial = initial) | |
# We need to manually rename the Content_Type field to Content-Type | |
self.fields['Content-Type'] = self.fields['Content_Type'] | |
del self.fields['Content_Type'] | |
# Don't show success_action_redirect if it's not being used | |
if not self.success_action_redirect: | |
del self.fields['success_action_redirect'] | |
def as_html(self): | |
""" | |
Use this instead of as_table etc, because S3 requires the file field | |
come AFTER the hidden fields, but Django's normal form display methods | |
position the visible fields BEFORE the hidden fields. | |
""" | |
html = ''.join(map(unicode, self.hidden_fields())) | |
html += unicode(self['file']) | |
return html | |
def as_form_html(self, prefix='', suffix=''): | |
html = """ | |
<form action="%s" method="post" enctype="multipart/form-data"> | |
<p>%s <input type="submit" value="Upload"></p> | |
</form> | |
""".strip() % (self.action(), self.as_html()) | |
return mark_safe(html) | |
def is_multipart(self): | |
return True | |
def action(self): | |
return 'https://%s.s3.amazonaws.com/' % self.bucket | |
def calculate_policy(self): | |
conditions = [ | |
{'bucket': self.bucket}, | |
{'acl': self.acl}, | |
['starts-with', '$key', self.key.replace('${filename}', '')], | |
["starts-with", "$Content-Type", self.content_type], | |
] | |
if self.max_size: | |
conditions.append( | |
['content-length-range', self.min_size, self.max_size] | |
) | |
if self.success_action_redirect: | |
conditions.append( | |
{'success_action_redirect': self.success_action_redirect}, | |
) | |
policy_document = { | |
"expiration": ( | |
datetime.datetime.now() + self.expires_after | |
).isoformat().split('.')[0] + 'Z', | |
"conditions": conditions, | |
} | |
return simplejson.dumps(policy_document, indent=2) | |
def sign_policy(self, policy): | |
return base64.b64encode( | |
hmac.new(self.aws_secret_key, policy, sha).digest() | |
) | |
def confirm_key_exists(self, bucket, key, etag): | |
import boto, boto.s3 | |
conn = boto.s3.connection.S3Connection( | |
self.aws_access_key, | |
self.aws_secret_key | |
) | |
try: | |
b = conn.get_bucket(bucket) | |
except boto.exception.S3ResponseError: | |
return False | |
k = b.get_key(key) | |
if k is None: | |
return False | |
return k.etag == etag |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment