-
-
Save lstoll/2555230 to your computer and use it in GitHub Desktop.
fork of s3-delete-files.rb -- Used for downloading a bucket with lots of files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'rubygems' | |
require 'thread' | |
require 'fog' | |
# Ensure a bucket is specified | |
if ARGV.count < 4 | |
raise "Specify a key, secret, bucket and output dir (eg ./downloader.rb key secret my-bucket '~/out')" | |
return | |
end | |
# Set up threads and variables. | |
bucket_name = ARGV[2] | |
outdir = File.expand_path(ARGV[3]) | |
thread_count = 20 | |
threads = [] | |
queue = Queue.new | |
semaphore = Mutex.new | |
total_listed = 0 | |
total_fetched = 0 | |
MAX_QUEUE_PENDING = 2000 | |
puts "== Downloading files from '#{bucket_name}' to '#{outdir}' ==" | |
# Create new Fog S3. Make sure the credentials are available from ENV. | |
s3 = Fog::Storage::AWS.new(:aws_access_key_id => ARGV[0], :aws_secret_access_key => ARGV[1]) | |
# Fetch the files for the bucket. | |
threads << Thread.new do | |
Thread.current[:name] = "lister" | |
puts "...started thread '#{Thread.current[:name]}'...\n" | |
# Get all the files from this bucket. Fog handles pagination internally. | |
s3.directories.get("#{bucket_name}").files.each do |file| | |
# Add this file to the queue. | |
queue.enq(file) | |
total_listed += 1 | |
if queue.size > MAX_QUEUE_PENDING | |
puts "Queue has more than #{MAX_QUEUE_PENDING} pending files, sleeping 2s" | |
sleep 2 | |
end | |
end | |
# Add a final EOF message to signal the deletion threads to stop. | |
thread_count.times {queue.enq(:EOF)} | |
end | |
# Delete all the files in the queue until EOF with N threads. | |
thread_count.times do |count| | |
threads << Thread.new(count) do |number| | |
Thread.current[:name] = "get files(#{number})" | |
puts "...started thread '#{Thread.current[:name]}'...\n" | |
# Dequeue until EOF. | |
file = nil | |
while file != :EOF | |
# Dequeue the latest file and delete it. (Will block until it gets a new file.) | |
file = queue.deq | |
unless file == :EOF || File.exists?(File.join(outdir, file.key)) | |
File.open(File.join(outdir, file.key), 'w') do |local_file| | |
local_file.write(file.body) | |
end | |
else | |
puts "... skipping #{file}" | |
end | |
# Increment the global synchronized counter. | |
semaphore.synchronize {total_fetched += 1} | |
puts "Fetched #{total_fetched} out of #{total_listed}\n" if (rand(100) == 1) | |
end | |
end | |
end | |
# Wait for the threads to finish. | |
threads.each do |t| | |
begin | |
t.join | |
rescue RuntimeError => e | |
puts "Failure on thread #{t[:name]}: #{e.message}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment