Created
August 12, 2015 23:06
-
-
Save ariestiyansyah/f164d9ba9afffd866893 to your computer and use it in GitHub Desktop.
Manage S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Usage: bkt <command> [options] | |
bkt handles file uploads to S3. | |
Commands: | |
ls List all buckets or all files in a bucket | |
get Download a file from a bucket | |
mk Create a new bucket | |
put Upload a file to a bucket | |
rm Delete a bucket or files from a bucket | |
""" | |
import fnmatch | |
import os | |
import shlex | |
import subprocess | |
import sys | |
try: | |
import configparser | |
except ImportError: | |
import ConfigParser as configparser | |
from boto.exception import S3CreateError, S3ResponseError | |
from boto.s3.connection import S3Connection, OrdinaryCallingFormat | |
from boto.s3.key import Key | |
PROGNAME = os.path.basename(sys.argv[0]) | |
CONFIG_DIR = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config")) | |
CONFIG_FILE = os.path.join(CONFIG_DIR, PROGNAME, 'config.ini') | |
HIDE_CURSOR = '\x1b[?25l' | |
SHOW_CURSOR = '\x1b[?25h' | |
class Command(object): | |
def __init__(self): | |
self.setup_config() | |
self.setup_auth() | |
self.setup_connection() | |
self.args = sys.argv[2:] | |
def error(self, msg, error=1): | |
sys.stderr.write('%s: %s%s\n' % (PROGNAME, msg, SHOW_CURSOR)) | |
sys.exit(error) | |
def eval(self, cmd): | |
cmd = shlex.split(cmd) | |
devnull = open(os.devnull, 'wb') | |
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=devnull) | |
return proc.communicate()[0].strip() | |
def get_local_filename(self, fn): | |
count = 0 | |
new_fn = fn | |
while True: | |
if count: | |
new_fn = '%s.%d' % (fn, count) | |
if os.path.exists(new_fn): | |
count += 1 | |
continue | |
return new_fn | |
def help(self): | |
base = 'Usage: %s %s\n' % (PROGNAME, self.__doc__.strip()) | |
sys.stderr.write(base) | |
sys.exit() | |
def progress(self, cur, total): | |
if cur >= total: | |
finished = True | |
cur = total | |
else: | |
finished = False | |
percent = cur * 100 / total | |
sys.stderr.write('%s\r%s: %d%%' % (HIDE_CURSOR, self._fn, percent)) | |
sys.stderr.flush() | |
if finished: | |
sys.stderr.write('%s\n' % SHOW_CURSOR) | |
def setup_auth(self): | |
self.access_key = None | |
self.secret_key = None | |
for option in ['access_key', 'secret_key']: | |
# try to get the regular options | |
if self.config.has_option('auth', option): | |
value = self.config.get('auth', option).decode('utf-8') | |
setattr(self, option, value) | |
# try to get the `_eval` option | |
if self.config.has_option('auth', option + '_eval'): | |
opt_eval = self.config.get('auth', option + '_eval') | |
value = self.eval(opt_eval).decode('utf-8') | |
setattr(self, option, value) | |
if getattr(self, option, None) is None: | |
self.error("Missing config option '%s'." % option) | |
def setup_config(self): | |
try: | |
self.config = configparser.ConfigParser() | |
self.config.readfp(open(CONFIG_FILE)) | |
except IOError: | |
self.error("Couldn't read your config file.") | |
def setup_connection(self): | |
self.conn = S3Connection(self.access_key, | |
self.secret_key, | |
calling_format=OrdinaryCallingFormat()) | |
class GetCommand(Command): | |
""" | |
get <bucket> <file> [...] | |
""" | |
name = 'get' | |
def run(self): | |
if len(self.args) < 1: | |
self.help() | |
try: | |
bucket = self.conn.get_bucket(self.args[0], validate=False) | |
except S3ResponseError as e: | |
self.error(e.error_message) | |
keys = dict([(key.name, key) for key in bucket.list()]) | |
for fn in self.args[1:]: | |
for name in fnmatch.filter(keys.keys(), fn): | |
basename = os.path.basename(name) | |
try: | |
key = keys[name] | |
self._fn = basename | |
fn = self.get_local_filename(basename) | |
fh = open(os.path.join(os.getcwd(), fn), 'w') | |
key.get_file(fh, cb=self.progress) | |
self._fn = None | |
except IOError: | |
self.error("Could not write to local file '%s'" % fn) | |
class LsCommand(Command): | |
""" | |
ls [<bucket>] | |
""" | |
name = 'ls' | |
def run(self): | |
# listing all buckets | |
if len(self.args) == 0: | |
for b in self.conn.get_all_buckets(): | |
sys.stdout.write(b.name + '\n') | |
return | |
# list contents of a bucket | |
try: | |
bucket = self.conn.get_bucket(self.args[0], validate=False) | |
for key in bucket.list(): | |
sys.stdout.write(key.name + '\n') | |
except S3ResponseError as e: | |
self.error(e.error_message) | |
class RmCommand(Command): | |
""" | |
rm [-r] <bucket> [<file>, ...] | |
""" | |
name = 'rm' | |
def run(self): | |
if self.args[0] in ['-r', '--recurse']: | |
recurse = True | |
del self.args[0] | |
else: | |
recurse = False | |
if len(self.args) < 1: | |
self.help() | |
if recurse: | |
for arg in self.args: | |
try: | |
bucket = self.conn.get_bucket(arg, validate=False) | |
for key in bucket.list(): | |
bucket.delete_key(key.name) | |
self.conn.delete_bucket(bucket) | |
except S3ResponseError as e: | |
self.error(e.error_message) | |
else: | |
try: | |
bucket = self.conn.get_bucket(self.args[0], validate=False) | |
if len(self.args) == 1: | |
self.conn.delete_bucket(self.args[0]) | |
else: | |
for fn in self.args[1:]: | |
bucket.delete_key(fn) | |
except S3ResponseError as e: | |
self.error(e.error_message) | |
class MkCommand(Command): | |
""" | |
mk <bucket> [...] | |
""" | |
name = 'mk' | |
def run(self): | |
if len(self.args) < 1: | |
self.help() | |
for bucket in self.args: | |
try: | |
self.conn.create_bucket(bucket) | |
except (S3CreateError, S3ResponseError) as e: | |
self.error(e.error_message) | |
class PutCommand(Command): | |
""" | |
put <bucket> <file> [...] | |
""" | |
name = 'put' | |
def run(self): | |
if len(self.args) < 2: | |
self.help() | |
try: | |
bucket = self.conn.get_bucket(self.args[0], validate=False) | |
for fn in self.args[1:]: | |
self._fn = os.path.basename(fn) | |
k = Key(bucket) | |
k.name = os.path.basename(fn) | |
k.set_contents_from_filename(fn, cb=self.progress, num_cb=100) | |
self._fn = None | |
except S3ResponseError as e: | |
self.error(e.error_message) | |
def main(): | |
# no commands or arguments, print the help message | |
if len(sys.argv) <= 1 or sys.argv[1] in ('-h', '--help', 'help'): | |
sys.stderr.write(__doc__.lstrip()) | |
sys.exit() | |
for c in Command.__subclasses__(): | |
if c.name == sys.argv[1]: | |
command = c() | |
command.run() | |
sys.exit() | |
# unknown command | |
sys.stderr.write(__doc__.lstrip()) | |
sys.exit(1) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
sys.stderr.write('\nAborting...%s\n' % SHOW_CURSOR) | |
sys.exit(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment