Skip to content

Instantly share code, notes, and snippets.

@phil-monroe
Last active May 10, 2019 15:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save phil-monroe/4477190 to your computer and use it in GitHub Desktop.
Save phil-monroe/4477190 to your computer and use it in GitHub Desktop.
Parallel, Multiple File Remote File Copy. Fills up network bandwidth while copying many files
#!/bin/bash
mkdir -p ~/bin/
curl -s -L https://gist.github.com/raw/4477190/pmrcp.rb > ~/bin/pmrcp
chmod +x ~/bin/pmrcp
#!/usr/bin/env ruby
START = Time.now
BATCH_SIZE = (ENV['B'] || 20).to_i
CONCURRENCY = (ENV['C'] || 15).to_i
RSH = ENV['RSH'] || 'rsh'
RCP = ENV['RCP'] || 'rcp'
require 'thread'
require 'pp'
class Pool
attr_accessor :pool, :queue, :queue_limit
def initialize(size, limit)
@queue, @queue_limit = Queue.new, limit
@pool = Array.new(size) do |i|
spawn(i)
end
end
def size
pool.select{|t| t.alive?}.size
end
def inc(n = 1)
n.times do
pool << spawn(pool.size)
end
end
def dec
pool.select{ |t| t.alive? }.first[:stop] = true
end
def schedule(*args, &block)
sleep 1 while(queue.size > queue_limit)
queue << [block, args]
end
def join
sleep 1 while !queue.empty?
shutdown
end
def shutdown
pool.map { |t| t[:stop] = true }
pool.select{ |t| t.alive? && t[:state] == :popping }.map { |t| t.kill }
pool.select{ |t| t.alive? }.map(&:join)
queue.instance_variable_get(:@waiting).clear
pool.clear
end
private
def spawn index
Thread.new do
t = Thread.current
t[:id] = index
t[:stop] = false
loop do
unless t[:stop]
begin
t[:state] = :popping
job, args = queue.pop
t[:state] = :processing
job.call(*args)
rescue => e
STDERR.puts "ERROR - #{Thread.current[:id]} - #{e.inspect}"
end
else
Thread.current.exit
end
end
end
end
end
puts "Starting with pid #{Process.pid}"
puts
pool = Pool.new 0, 1000
# Setup signals to alter pool size
Signal.trap("TTIN") do
puts "Received signal TTIN: Adding another worker to the pool."
pool.inc
end
Signal.trap("TTOU") do
puts "Received signal TTOU: Removing a worker to the pool."
pool.dec
end
Signal.trap("USR1") do
pp pool
end
Signal.trap("STOP") do
pool.shutdown
end
Signal.trap("CONT") do
pool.inc CONCURRENCY
end
LOG_MUTEX = Mutex.new
def worker_log str, io = STDOUT
LOG_MUTEX.synchronize do
io.puts "WORKER[#{Thread.current[:id]}] - #{str}"
end
end
src_dir = File.expand_path(ARGV[0] )|| raise("Need a source directory")
dst_hst, dst_dir = ARGV[1].split(':')
dst_hst || raise("Need a destination host")
dst_dir || raise("Need a destination directory")
puts "--- Copying Folder Structure ---"
src_folders = Dir["#{src_dir}/**/**"].select{ |f| File.directory? f }.unshift(src_dir)
dst_folders = src_folders.map{ |d| d.gsub(src_dir, dst_dir)}
cmd = "#{RSH} #{dst_hst} 'mkdir -p #{dst_folders.join(' ')}'"
puts cmd
system(cmd) || raise("failed to create folder structure")
puts
src_folders.select! { |f| f !~ /#{ENV['EXCLUDE']}/} if ENV['EXCLUDE']
puts "--- Scheduling File Transfers ---"
src_folders.each do |src_folder|
src_files = Dir.entries(src_folder).select{ |f| !File.directory?("#{src_folder}/#{f}") }
src_files.map!{ |f| "#{src_folder}/#{f}" }
src_files.each_slice(BATCH_SIZE) do |batch|
dst_folder = src_folder.gsub(src_dir, dst_dir)
puts "Scheduling #{src_folder} => #{dst_folder} - #{batch.size}"
pool.schedule(batch, dst_folder) do |file_batch, remote_folder|
sleep rand(CONCURRENCY)
s = Time.now
file_batch.select!{ |f| File.exist? f}
worker_log "Starting Batch: Batch Size: #{batch.size} \t Time: #{s} \t Queue Size: #{pool.queue.size}"
unless file_batch.empty?
cmd = "#{RCP} #{file_batch.join(' ')} #{dst_hst}:#{remote_folder}"
worker_log cmd
unless system cmd
worker_log "ERROR: #{cmd}"
worker_log "ERROR: #{cmd}", STDERR
pool.schedule file_batch, remote_folder
end
end
worker_log "Finished batch in: #{Time.now - s}"
end
end
end
puts
puts "Number of RCP jobs: #{pool.queue.size}"
puts
puts "--- Starting file transfer... ---"
puts
pool.inc CONCURRENCY
pool.join
puts "--- Finished file transfer ---"
DONE = Time.now
puts "Started: #{START}"
puts "Finished: #{DONE}"
puts "It took: #{(DONE - START)/3600}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment