Last active
April 6, 2016 08:15
-
-
Save righ/633a1b4162d7492b765f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import os | |
from boto.s3 import ( | |
connection as s3conn, | |
key as s3key, | |
) | |
from boto.exception import ( | |
S3ResponseError, | |
) | |
try: | |
basestring | |
except NameError: | |
basestring = str | |
class S3BaseException(Exception): | |
message = 's3 base exception.' | |
class S3NoBucketSelectedError(S3BaseException): | |
message = 'no bucket selected. use s3.use-method, and select bucket.' | |
class S3KeyDoesNotExist(S3BaseException): | |
message = 'remote key does not exist.' | |
class S3(object): | |
def __repr__(self): | |
return '<{}>'.format( | |
self.bucket.name if self.bucket else 'no bucket selected' | |
) | |
def __init__(self, access_key_id, secret_access_key, **option): | |
self.conn = s3conn.S3Connection( | |
access_key_id, secret_access_key, **option) | |
self.bucket = None | |
@property | |
def buckets(self): | |
return {b.name: b | |
for b in self.conn.get_all_buckets()} | |
@property | |
def keys(self): | |
return {k.name: k | |
for k in self.bucket.get_all_keys()} | |
def use( | |
self, bucket_name, validate=True, headers=None, | |
missing_and_create=False, location='', policy=None | |
): | |
""" | |
bucket selection | |
:param bucket_name: target bucket. | |
:param missing_and_create: it creates, when bucket does not exist. | |
:param *etc*: it will be specified, when connecting or creating. | |
""" | |
try: | |
self.bucket = self.conn.get_bucket( | |
bucket_name, validate, headers) | |
except S3ResponseError as e: | |
if missing_and_create: | |
self.bucket = self.conn.create_bucket( | |
bucket_name, headers, location, policy) | |
else: | |
raise e | |
return self | |
def create_key(self, path, body='', directory=False): | |
""" | |
create key by specified byte. | |
:param path: key path. | |
:param body: key body(content). | |
:param directory: directory flag | |
:return: result | |
""" | |
if directory: | |
path += '/' | |
key = s3key.Key(self.bucket, path) | |
size = key.set_contents_from_string(body) | |
return size == len(body) | |
def delete_key(self, key): | |
""" | |
delete key. | |
:param key: target delete key. | |
:return: result | |
""" | |
if isinstance(key, basestring): | |
key = self.bucket.get_key(key) | |
if key is None: | |
raise S3KeyDoesNotExist(S3KeyDoesNotExist.message) | |
deleted_key = key.delete() | |
return deleted_key.size is None | |
def upload_key(self, path, base_path=''): | |
""" | |
create key by local file.(upload) | |
:param path: remote and local common path. | |
:param base_path: base path on local | |
:return: result | |
""" | |
key = s3key.Key(self.bucket, path) | |
path = os.path.relpath(path, base_path) | |
with open(path, 'rb') as f: | |
size = key.set_contents_from_file(f) | |
result = size == f.tell() | |
return result | |
def download_key(self, key, base_path=''): | |
""" | |
download key. | |
:key: target key you want. | |
:base_path: base path on local. | |
:return: result. | |
""" | |
if isinstance(key, basestring): | |
key = self.bucket.get_key(key) | |
if key is None: | |
raise S3KeyDoesNotExist(S3KeyDoesNotExist.message) | |
local_path = os.path.relpath(key.name, base_path) | |
try: | |
os.makedirs(os.path.dirname(local_path)) | |
except OSError: | |
pass | |
with open(local_path, 'wb') as f: | |
key.get_file(f) | |
result = key.size == f.tell() | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment