public
Last active — forked from robhudson/storage.py

  • Download Gist
storage.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
from mimetypes import guess_type
from django.core.exceptions import ImproperlyConfigured
from django.core.files.storage import Storage
from django.utils.encoding import iri_to_uri
import re
 
try:
import S3
except ImportError:
raise ImproperlyConfigured, "Could not load amazon's S3 bindings.\
\nSee http://developer.amazonwebservices.com/connect/entry.jspa?externalID=134"
 
try:
from settings import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET_NAME
except ImportError:
raise ImproperlyConfigured, "AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET_NAME required in settings.py."
 
try:
from settings import AWS_CALLING_FORMAT
except ImportError:
AWS_CALLING_FORMAT = S3.CallingFormat.PATH
 
class S3StorageException(Exception):
"""Exception raised when an S3 error occurs."""
 
class S3Storage(Storage):
"""
Storage backend for Amazon Simple Storage Service.
"""
 
def __init__(self, access_key=AWS_ACCESS_KEY_ID, secret_key=AWS_SECRET_ACCESS_KEY,
bucket=AWS_STORAGE_BUCKET_NAME, acl='public-read',
calling_format=AWS_CALLING_FORMAT):
self.bucket = bucket
self.acl = acl
self.connection = S3.AWSAuthConnection(access_key, secret_key,
calling_format=calling_format)
self.generator = S3.QueryStringAuthGenerator(access_key, secret_key,
calling_format=calling_format, is_secure=False)
def is_valid(self):
"""
Tests if the connection is valid by checking the basic
rest GET endpoint.
"""
response = self.connection.list_all_my_buckets().http_response
return response.status == 200
def open(self, name, mode='rb', mixin=None):
file = self.connection.get(self.bucket, name).object.data
if mixin:
# Add the mixin as a parent class of the File returned from storage.
file.__class__ = type(mixin.__name__, (mixin, file.__class__), {})
return file
def save(self, name, content):
# S3 name can be a path
if name is None:
name = content.name
headers = {
'x-amz-acl': self.acl,
'Content-Type': content.content_type or guess_type(name)[0] or "application/x-octet-stream",
'Content-Disposition': 'filename="' + iri_to_uri(content.name) + '";',
}
response = self.connection.put(self.bucket, name, content.read(), headers)
if response.http_response.status != 200:
raise S3StorageException(response.message)
return name
def delete(self, name):
self.connection.delete(self.bucket, name)
def copy(self, old_name, name):
try:
filename = name.rsplit('/', 1)[1]
except IndexError:
filename = name
# copy current content-type
response = self.connection._make_request('HEAD', self.bucket, old_name)
content_type = response.getheader('Content-Type')
headers = {
'x-amz-copy-source': '%s/%s' % (self.bucket, old_name),
'x-amz-metadata-directive': 'REPLACE', # change content-disposition for new name
'x-amz-acl': self.acl, # otherwise would be reset to private
'Content-Type': content_type,
'Content-Disposition': 'filename="' + iri_to_uri(filename) + '";',
}
response = self.connection.copy(self.bucket, name, headers)
if response.http_response.status != 200:
raise S3StorageException(response.message)
return name
def move(self, old_name, name):
# copy, then delete
self.copy(old_name, name)
self.delete(old_name)
 
def exists(self, name):
print '*** S3 EXISTS'
#response = self.connection.make_request('HEAD', self.bucket, name)
#print response
#return response.status == 200
raise NotImplementedError()
def listbucket(self):
response = self.connection.list_bucket(self.bucket)
return [entry.key for entry in response.entries]
 
def listdir(self, path):
"""
Lists the contents of the specified path, returning a 2-tuple of lists;
the first item being directories, the second item being files.
"""
directories, files = [], []
entries = self.listbucket()
 
if path:
path_re = re.compile('^(?P<path>%s/)(?P<subpath>.*)' % path)
else: # root directory
path_re = re.compile('^(?P<path>)(?P<subpath>.*)')
for entry in entries:
match = path_re.match(entry)
if match:
subpieces = match.group('subpath').split('/', 1)
try: # check if this is a nested file
directory, more_path = subpieces
if not directory in directories:
directories.append(directory)
except ValueError: # this is a file - no more path
files.append(subpieces[0])
 
return directories, files
 
def size(self, name):
data = self.connection.get(self.bucket, name).object.data
return len(data)
 
def url(self, name):
return self.generator.make_bare_url(self.bucket, name)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.