Create a gist now

Instantly share code, notes, and snippets.

@kazjote /aggregate.rb Secret
Last active Aug 29, 2015

What would you like to do?
Article on GNU Parallel
user_offers = Hash.new { |h, k| h[k] = [] }
File.new(ARGV[0]).each do |line|
user, offer = line.split(':').map(&:to_i)
user_offers[user] << offer
end
# { offer1 => { preceeding_offer1 => number_of_times_preceeding_offer1_prceseeds_offer1,
# preceeding_offer2 => ...
# ...
# },
# ...
# }
preceedings = Hash.new { |h, k| h[k] = Hash.new(0) }
user_offers.each do |user, offers|
offers.each_with_index do |offer, index|
0.upto(index - 1) do |preceeding_offer_index|
preceedings[offer][offers[preceeding_offer_index]] += 1
end
end
end
output = preceedings.each_with_object([]) do |(offer, preceeding_offers), array|
preceeding_offers.each do |preceeding_offer, count|
array << [offer, preceeding_offer, count]
end
end
output.each { |l| puts "#{l[0]}-#{l[1]}:#{l[2]}" }

There are many solutions for distributed batch processing like [Hadoop](http://hadoop.apache.org/ Hadoop website). However, in this article I will present much simplier and equally stable approach which might be enough for simple tasks where you don't:

  • operate on big data
  • don't need data warehouse capabilities

GNU Parallel

Some time ago I discovered GNU parallel. It allows you to run any of your scripts in parallel. It will:

  • Collect your input
  • Execute jobs on any list of servers
  • Load balance the load according to the number of cores and computation time
  • Collect results on standard output keeping care it doesn't get mixed

It is really simple and powerful.

You can run it also locally. The following example will download Hadoop and Parallel websites in parallel. {} is used as a placeholder that is going to be replaced with one of parameters specified after :::.

parallel wget {} 2>/dev/null ::: 'http://hadoop.apache.org/' 'http://www.gnu.org/software/parallel/'

Problem

I currently work in advertising industry at Fyber so I chose some example from my domain.

Let's imagine you have a file with list of pairs <user_id>:<offer_id>. Your goal will be to generate list of how many times users clicked offer A after clicking offer B. As a result you expect a list <offerA_id>-<offerB_id>:<count>. For purpose of this article I created some naive solution for this problem. It consists of 3 scripts:

  1. divide.rb - divides the input file into smaller files based on user ids. [link] Complexity: O(n)
  2. aggregate.rb - does the main computation part. Complexity: O(n^2)
  3. collect.rb - combines the results from computation. Complexity O(n * log n)

(2) is the part that I want to run in parallel. In this article I won't cover how this algorithm actually works.

Solution with GNU Parallel

My input file with initial list is called clicks_10_000_000.txt. I will perform my processing with this script:

#/bin/bash

INPUT_FILE=$1
NUMBER_OF_JOBS=$2

rm input_*.txt # Remove old input files

# List of servers on which jobs will be run. ':' points to local machine.
SERVERS=":,192.168.1.10,sandbox2.carriles.pl"

for server in $SERVERS; do
  scp aggregate.rb $SERVER:$HOME # Transfers job script to the servers
done

ruby divide.rb $INPUT_FILE $NUMBER_OF_JOBS \
&& parallel \
    --eta \
    --transfer \
    -S $SERVERS \
    ruby aggregate.rb \
    ::: input_*.txt \
| ruby collect.rb \
| sort --field-separator=':' --numeric-sort --key=2 \
> output_parallel.txt

What is what

Firstly, I need to transfer script to all servers on which I want to run computation. All dependencies for the script must already be installed there (in my case ruby).

for server in $SERVERS; do
  scp aggregate.rb $SERVER:$HOME # Transfer script to the servers
done

Secondly, I prepare the set of input files. divide.rb script will create series of input_*.txt with clusters of users from $INPUT_FILE

ruby divide.rb $INPUT_FILE $NUMBER_OF_JOBS

Fun starts here. Parallel application is executed.

parallel \
  --eta \
  --transfer \
  -S $SERVERS \
  ruby aggregate.rb \
  ::: input_*.txt \
  1. ssh connections to each server is established and the number of cores on each server is detected
  2. jobs are run on servers and files are being transfered
  3. after each job is finished its result is sent to standard output

Used options:

  • --eta makes parallel display some progress information
  • --transfer makes parallel upload input files to the server
  • -S option allows to specify the list of servers on which script should be executed
  • ruby aggregate.rb script that should be executed on remote server
  • ::: <list of files> input files that should be processed one by one by aggregate.rb script

As the last step I need to reaggregate the results from all jobs as I am interested in final counts and not those from clusters. collect.rb will receive all cluster results on standard input.

ruby collect.rb

I sort the results to get nicer output:

sort --field-separator=':' --numeric-sort --key=2

The nice feature of parallel is that the output of each job is sent to standard output as soon as job is finished. On the other hand, outputs from different jobs are never mixed.

Load is correctly distributed. Each server will get new job only if it has a free "CPU slot".

Because parallel writes to standard output I can use pipe operator to execute even more computations in parallel. Parallel can accept input on standard input which makes it possible to run a few parallel processes which pipes information to each other.

Showtime

In my benchmark I used my desktop (4 cores), laptop (4 cores / LAN connection) and remote server (8 cores / DSL connection). Parallel script was executed on the desktop.

Let's see the run without parallel. I execute aggregate.rb on the whole set of data before clustering. This way I don't have to reaggregate.

➜  parallel git:(master) ✗ time ruby aggregate.rb clicks_10_000_000.txt | sort --field-separator=':' --numeric-sort --key=2 > output_non_parallel.txt
ruby aggregate.rb clicks_10_000_000.txt  1321,47s user 1,27s system 99% cpu 22:03,42 total
sort --field-separator=':' --numeric-sort --key=2 > output_non_parallel.txt  0,09s user 0,01s system 0% cpu 22:03,50 total

With parallel

➜  parallel git:(master) ✗ time ./run.sh clicks_10_000_000.txt 500

Computers / CPU cores / Max jobs to run
1:192.168.1.10 / 4 / 4
2:local / 4 / 4
3:sandbox2.carriles.pl / 8 / 8

Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete
ETA: 0s 0left 0.40avg  192.168.1.10:0/85/17%/2.4s  local:0/232/46%/0.9s  sandbox2.carriles.pl:0/183/36%/1.1s    
./run.sh clicks_10_000_000.txt 500  728,61s user 33,93s system 369% cpu 3:26,53 total

Does it work?

➜  parallel git:(master) ✗ diff -s output_parallel.txt output_non_parallel.txt
Files output_parallel.txt and output_non_parallel.txt are identical

It works! The point of this article is not to provide accurate benchmark. However, you can see expected speedup ~ 6 times.

Conclusion

I find GNU parallel very useful in my professional life. Typical use case for me is ad hoc information retrieval from log files. I usually use group of testing servers that nearly always stay idle.

The first selling point for me is almost no dependencies. Parallel is written in perl. I managed to run it just by copying parallel script from desktop to server without installation in all the cases so far.

Secondly, simplicity is mind blowing. Parallel is shipped with tutorial man parallel_tutorial that you can go through in one hour. Majority of use cases can be implemented as one liners. No scripts and configuration is needed. What is more, your jobs can be written in any language as it uses shell to execute them. Whole interaction with jobs is done with shell arguments and standard input / output.

collected = Hash.new(0)
$stdin.each do |line|
key, value = line.split(':')
collected[key] += value.to_i
end
collected.each do |key, value|
puts "#{key}:#{value}"
end
number_of_processes = ARGV[1].to_i
files = Array.new(number_of_processes) do |i|
File.new("input_#{i}.txt", "w")
end
File.new(ARGV[0]).each do |line|
user = line.to_i
files[user % number_of_processes].puts line
end
class RandomGaussian
def initialize(mean, stddev, rand_helper = lambda { Kernel.rand })
@rand_helper = rand_helper
@mean = mean
@stddev = stddev
@valid = false
@next = 0
end
def rand
if @valid then
@valid = false
return @next
else
@valid = true
x, y = self.class.gaussian(@mean, @stddev, @rand_helper)
@next = y
return x
end
end
private
def self.gaussian(mean, stddev, rand)
theta = 2 * Math::PI * rand.call
rho = Math.sqrt(-2 * Math.log(1 - rand.call))
scale = stddev * rho
x = mean + scale * Math.cos(theta)
y = mean + scale * Math.sin(theta)
return x, y
end
end
random_user = RandomGaussian.new(5_000, 2_000)
random_offer = RandomGaussian.new(100, 50)
ARGV[0].to_i.times do
user = random_user.rand.floor
offer = random_offer.rand.floor
next unless user > 0 && offer > 0
puts "#{user}:#{offer}"
end
#/bin/bash
INPUT_FILE=$1
NUMBER_OF_JOBS=$2
rm input_*.txt # Remove old input files
SERVERS=":,192.168.1.10,sandbox2.carriles.pl"
for server in $SERVERS; do
scp aggregate.rb $SERVER:$HOME # Transfer script to the servers
done
ruby divide.rb $INPUT_FILE $NUMBER_OF_JOBS \
&& parallel \
--eta \
--transfer \
-S $SERVERS \
ruby aggregate.rb \
::: input_*.txt \
| ruby collect.rb \
| sort --field-separator=':' --numeric-sort --key=2 \
> output_parallel.txt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment