Skip to content

Instantly share code, notes, and snippets.

@fsck-mount
Forked from garnaat/download.py
Created October 11, 2017 20:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fsck-mount/9103fa14ceaa4f84dfa80457f85a513a to your computer and use it in GitHub Desktop.
Save fsck-mount/9103fa14ceaa4f84dfa80457f85a513a to your computer and use it in GitHub Desktop.
Use multiprocess to download objects from S3
"""
"""
import multiprocessing
import boto
import os
import sys
import datetime
import logging
import Queue
class Downloader(object):
def __init__(self, download_path, bucket_name, num_processes=2,
log_file=None, log_level=logging.INFO):
self.download_path = download_path
self.bucket_name = bucket_name
self.num_processes = num_processes
if log_file:
boto.set_file_logger('boto-downloader', log_file, log_level)
else:
boto.set_stream_logger('boto-downloader', log_level)
self.task_queue = multiprocessing.JoinableQueue()
self.s3 = boto.connect_s3()
self.bucket = self.s3.lookup(self.bucket_name)
self.n_tasks = 0
def queue_tasks(self):
for key in self.bucket:
self.task_queue.put(key.name)
self.n_tasks += 1
def worker(self, input):
print 'Starting worker'
while 1:
try:
key_name = input.get(True, 1)
except Queue.Empty:
p_name = multiprocessing.current_process().name
boto.log.info('%s has no more tasks' % p_name)
break
key = self.bucket.lookup(key_name)
path = os.path.join(self.download_path, key_name)
try:
key.get_contents_to_filename(path)
input.task_done()
except:
boto.log.error('Unexpected error: %s', sys.exc_info()[0])
boto.log.error('Error processing %s' % path)
def main(self):
self.queue_tasks()
self.start_time = datetime.datetime.now()
for i in range(self.num_processes):
multiprocessing.Process(target=self.worker,
args=(self.task_queue,)).start()
self.task_queue.join()
self.task_queue.close()
self.end_time = datetime.datetime.now()
print 'total time = ', (self.end_time - self.start_time)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment