Skip to content

Instantly share code, notes, and snippets.

@lstoll
Forked from chrisjm/s3-delete-bucket.rb
Created April 30, 2012 03:34
Show Gist options
  • Save lstoll/2555230 to your computer and use it in GitHub Desktop.
Save lstoll/2555230 to your computer and use it in GitHub Desktop.
fork of s3-delete-files.rb -- Used for downloading a bucket with lots of files.
#!/usr/bin/env ruby
require 'rubygems'
require 'thread'
require 'fog'
# Ensure a bucket is specified
if ARGV.count < 4
raise "Specify a key, secret, bucket and output dir (eg ./downloader.rb key secret my-bucket '~/out')"
return
end
# Set up threads and variables.
bucket_name = ARGV[2]
outdir = File.expand_path(ARGV[3])
thread_count = 20
threads = []
queue = Queue.new
semaphore = Mutex.new
total_listed = 0
total_fetched = 0
MAX_QUEUE_PENDING = 2000
puts "== Downloading files from '#{bucket_name}' to '#{outdir}' =="
# Create new Fog S3. Make sure the credentials are available from ENV.
s3 = Fog::Storage::AWS.new(:aws_access_key_id => ARGV[0], :aws_secret_access_key => ARGV[1])
# Fetch the files for the bucket.
threads << Thread.new do
Thread.current[:name] = "lister"
puts "...started thread '#{Thread.current[:name]}'...\n"
# Get all the files from this bucket. Fog handles pagination internally.
s3.directories.get("#{bucket_name}").files.each do |file|
# Add this file to the queue.
queue.enq(file)
total_listed += 1
if queue.size > MAX_QUEUE_PENDING
puts "Queue has more than #{MAX_QUEUE_PENDING} pending files, sleeping 2s"
sleep 2
end
end
# Add a final EOF message to signal the deletion threads to stop.
thread_count.times {queue.enq(:EOF)}
end
# Delete all the files in the queue until EOF with N threads.
thread_count.times do |count|
threads << Thread.new(count) do |number|
Thread.current[:name] = "get files(#{number})"
puts "...started thread '#{Thread.current[:name]}'...\n"
# Dequeue until EOF.
file = nil
while file != :EOF
# Dequeue the latest file and delete it. (Will block until it gets a new file.)
file = queue.deq
unless file == :EOF || File.exists?(File.join(outdir, file.key))
File.open(File.join(outdir, file.key), 'w') do |local_file|
local_file.write(file.body)
end
else
puts "... skipping #{file}"
end
# Increment the global synchronized counter.
semaphore.synchronize {total_fetched += 1}
puts "Fetched #{total_fetched} out of #{total_listed}\n" if (rand(100) == 1)
end
end
end
# Wait for the threads to finish.
threads.each do |t|
begin
t.join
rescue RuntimeError => e
puts "Failure on thread #{t[:name]}: #{e.message}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment