Skip to content

Instantly share code, notes, and snippets.

@ariestiyansyah
Created August 12, 2015 23:06
Show Gist options
  • Save ariestiyansyah/f164d9ba9afffd866893 to your computer and use it in GitHub Desktop.
Save ariestiyansyah/f164d9ba9afffd866893 to your computer and use it in GitHub Desktop.
Manage S3
#!/usr/bin/env python
"""
Usage: bkt <command> [options]
bkt handles file uploads to S3.
Commands:
ls List all buckets or all files in a bucket
get Download a file from a bucket
mk Create a new bucket
put Upload a file to a bucket
rm Delete a bucket or files from a bucket
"""
import fnmatch
import os
import shlex
import subprocess
import sys
try:
import configparser
except ImportError:
import ConfigParser as configparser
from boto.exception import S3CreateError, S3ResponseError
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from boto.s3.key import Key
PROGNAME = os.path.basename(sys.argv[0])
CONFIG_DIR = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
CONFIG_FILE = os.path.join(CONFIG_DIR, PROGNAME, 'config.ini')
HIDE_CURSOR = '\x1b[?25l'
SHOW_CURSOR = '\x1b[?25h'
class Command(object):
def __init__(self):
self.setup_config()
self.setup_auth()
self.setup_connection()
self.args = sys.argv[2:]
def error(self, msg, error=1):
sys.stderr.write('%s: %s%s\n' % (PROGNAME, msg, SHOW_CURSOR))
sys.exit(error)
def eval(self, cmd):
cmd = shlex.split(cmd)
devnull = open(os.devnull, 'wb')
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=devnull)
return proc.communicate()[0].strip()
def get_local_filename(self, fn):
count = 0
new_fn = fn
while True:
if count:
new_fn = '%s.%d' % (fn, count)
if os.path.exists(new_fn):
count += 1
continue
return new_fn
def help(self):
base = 'Usage: %s %s\n' % (PROGNAME, self.__doc__.strip())
sys.stderr.write(base)
sys.exit()
def progress(self, cur, total):
if cur >= total:
finished = True
cur = total
else:
finished = False
percent = cur * 100 / total
sys.stderr.write('%s\r%s: %d%%' % (HIDE_CURSOR, self._fn, percent))
sys.stderr.flush()
if finished:
sys.stderr.write('%s\n' % SHOW_CURSOR)
def setup_auth(self):
self.access_key = None
self.secret_key = None
for option in ['access_key', 'secret_key']:
# try to get the regular options
if self.config.has_option('auth', option):
value = self.config.get('auth', option).decode('utf-8')
setattr(self, option, value)
# try to get the `_eval` option
if self.config.has_option('auth', option + '_eval'):
opt_eval = self.config.get('auth', option + '_eval')
value = self.eval(opt_eval).decode('utf-8')
setattr(self, option, value)
if getattr(self, option, None) is None:
self.error("Missing config option '%s'." % option)
def setup_config(self):
try:
self.config = configparser.ConfigParser()
self.config.readfp(open(CONFIG_FILE))
except IOError:
self.error("Couldn't read your config file.")
def setup_connection(self):
self.conn = S3Connection(self.access_key,
self.secret_key,
calling_format=OrdinaryCallingFormat())
class GetCommand(Command):
"""
get <bucket> <file> [...]
"""
name = 'get'
def run(self):
if len(self.args) < 1:
self.help()
try:
bucket = self.conn.get_bucket(self.args[0], validate=False)
except S3ResponseError as e:
self.error(e.error_message)
keys = dict([(key.name, key) for key in bucket.list()])
for fn in self.args[1:]:
for name in fnmatch.filter(keys.keys(), fn):
basename = os.path.basename(name)
try:
key = keys[name]
self._fn = basename
fn = self.get_local_filename(basename)
fh = open(os.path.join(os.getcwd(), fn), 'w')
key.get_file(fh, cb=self.progress)
self._fn = None
except IOError:
self.error("Could not write to local file '%s'" % fn)
class LsCommand(Command):
"""
ls [<bucket>]
"""
name = 'ls'
def run(self):
# listing all buckets
if len(self.args) == 0:
for b in self.conn.get_all_buckets():
sys.stdout.write(b.name + '\n')
return
# list contents of a bucket
try:
bucket = self.conn.get_bucket(self.args[0], validate=False)
for key in bucket.list():
sys.stdout.write(key.name + '\n')
except S3ResponseError as e:
self.error(e.error_message)
class RmCommand(Command):
"""
rm [-r] <bucket> [<file>, ...]
"""
name = 'rm'
def run(self):
if self.args[0] in ['-r', '--recurse']:
recurse = True
del self.args[0]
else:
recurse = False
if len(self.args) < 1:
self.help()
if recurse:
for arg in self.args:
try:
bucket = self.conn.get_bucket(arg, validate=False)
for key in bucket.list():
bucket.delete_key(key.name)
self.conn.delete_bucket(bucket)
except S3ResponseError as e:
self.error(e.error_message)
else:
try:
bucket = self.conn.get_bucket(self.args[0], validate=False)
if len(self.args) == 1:
self.conn.delete_bucket(self.args[0])
else:
for fn in self.args[1:]:
bucket.delete_key(fn)
except S3ResponseError as e:
self.error(e.error_message)
class MkCommand(Command):
"""
mk <bucket> [...]
"""
name = 'mk'
def run(self):
if len(self.args) < 1:
self.help()
for bucket in self.args:
try:
self.conn.create_bucket(bucket)
except (S3CreateError, S3ResponseError) as e:
self.error(e.error_message)
class PutCommand(Command):
"""
put <bucket> <file> [...]
"""
name = 'put'
def run(self):
if len(self.args) < 2:
self.help()
try:
bucket = self.conn.get_bucket(self.args[0], validate=False)
for fn in self.args[1:]:
self._fn = os.path.basename(fn)
k = Key(bucket)
k.name = os.path.basename(fn)
k.set_contents_from_filename(fn, cb=self.progress, num_cb=100)
self._fn = None
except S3ResponseError as e:
self.error(e.error_message)
def main():
# no commands or arguments, print the help message
if len(sys.argv) <= 1 or sys.argv[1] in ('-h', '--help', 'help'):
sys.stderr.write(__doc__.lstrip())
sys.exit()
for c in Command.__subclasses__():
if c.name == sys.argv[1]:
command = c()
command.run()
sys.exit()
# unknown command
sys.stderr.write(__doc__.lstrip())
sys.exit(1)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.stderr.write('\nAborting...%s\n' % SHOW_CURSOR)
sys.exit(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment