There are many solutions for distributed batch processing like [Hadoop](http://hadoop.apache.org/ Hadoop website). However, in this article I will present much simplier and equally stable approach which might be enough for simple tasks where you don't:
- operate on big data
- don't need data warehouse capabilities
Some time ago I discovered GNU parallel. It allows you to run any of your scripts in parallel. It will:
- Collect your input
- Execute jobs on any list of servers
- Load balance the load according to the number of cores and computation time
- Collect results on standard output keeping care it doesn't get mixed
It is really simple and powerful.
You can run it also locally. The following example will download Hadoop and Parallel websites in parallel. {}
is used as a placeholder that is going to be replaced with one of parameters specified after :::
.
parallel wget {} 2>/dev/null ::: 'http://hadoop.apache.org/' 'http://www.gnu.org/software/parallel/'
I currently work in advertising industry at Fyber so I chose some example from my domain.
Let's imagine you have a file with list of pairs <user_id>:<offer_id>
. Your goal will be to generate list of how many times users clicked offer A after clicking offer B. As a result you expect a list <offerA_id>-<offerB_id>:<count>
. For purpose of this article I created some naive solution for this problem. It consists of 3 scripts:
- divide.rb - divides the input file into smaller files based on user ids. [link] Complexity: O(n)
- aggregate.rb - does the main computation part. Complexity: O(n^2)
- collect.rb - combines the results from computation. Complexity O(n * log n)
(2) is the part that I want to run in parallel. In this article I won't cover how this algorithm actually works.
My input file with initial list is called clicks_10_000_000.txt
. I will perform my processing with this script:
#/bin/bash
INPUT_FILE=$1
NUMBER_OF_JOBS=$2
rm input_*.txt # Remove old input files
# List of servers on which jobs will be run. ':' points to local machine.
SERVERS=":,192.168.1.10,sandbox2.carriles.pl"
for server in $SERVERS; do
scp aggregate.rb $SERVER:$HOME # Transfers job script to the servers
done
ruby divide.rb $INPUT_FILE $NUMBER_OF_JOBS \
&& parallel \
--eta \
--transfer \
-S $SERVERS \
ruby aggregate.rb \
::: input_*.txt \
| ruby collect.rb \
| sort --field-separator=':' --numeric-sort --key=2 \
> output_parallel.txt
Firstly, I need to transfer script to all servers on which I want to run computation. All dependencies for the script must already be installed there (in my case ruby).
for server in $SERVERS; do
scp aggregate.rb $SERVER:$HOME # Transfer script to the servers
done
Secondly, I prepare the set of input files. divide.rb
script will create series of input_*.txt
with clusters of users from $INPUT_FILE
ruby divide.rb $INPUT_FILE $NUMBER_OF_JOBS
Fun starts here. Parallel application is executed.
parallel \
--eta \
--transfer \
-S $SERVERS \
ruby aggregate.rb \
::: input_*.txt \
- ssh connections to each server is established and the number of cores on each server is detected
- jobs are run on servers and files are being transfered
- after each job is finished its result is sent to standard output
Used options:
--eta
makes parallel display some progress information--transfer
makes parallel upload input files to the server-S
option allows to specify the list of servers on which script should be executedruby aggregate.rb
script that should be executed on remote server::: <list of files>
input files that should be processed one by one byaggregate.rb
script
As the last step I need to reaggregate the results from all jobs as I am interested in final counts and not those from clusters. collect.rb
will receive all cluster results on standard input.
ruby collect.rb
I sort the results to get nicer output:
sort --field-separator=':' --numeric-sort --key=2
The nice feature of parallel is that the output of each job is sent to standard output as soon as job is finished. On the other hand, outputs from different jobs are never mixed.
Load is correctly distributed. Each server will get new job only if it has a free "CPU slot".
Because parallel writes to standard output I can use pipe operator to execute even more computations in parallel. Parallel can accept input on standard input which makes it possible to run a few parallel processes which pipes information to each other.
In my benchmark I used my desktop (4 cores), laptop (4 cores / LAN connection) and remote server (8 cores / DSL connection). Parallel script was executed on the desktop.
Let's see the run without parallel. I execute aggregate.rb
on the whole set of data before clustering. This way I don't have to reaggregate.
➜ parallel git:(master) ✗ time ruby aggregate.rb clicks_10_000_000.txt | sort --field-separator=':' --numeric-sort --key=2 > output_non_parallel.txt
ruby aggregate.rb clicks_10_000_000.txt 1321,47s user 1,27s system 99% cpu 22:03,42 total
sort --field-separator=':' --numeric-sort --key=2 > output_non_parallel.txt 0,09s user 0,01s system 0% cpu 22:03,50 total
With parallel
➜ parallel git:(master) ✗ time ./run.sh clicks_10_000_000.txt 500
Computers / CPU cores / Max jobs to run
1:192.168.1.10 / 4 / 4
2:local / 4 / 4
3:sandbox2.carriles.pl / 8 / 8
Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete
ETA: 0s 0left 0.40avg 192.168.1.10:0/85/17%/2.4s local:0/232/46%/0.9s sandbox2.carriles.pl:0/183/36%/1.1s
./run.sh clicks_10_000_000.txt 500 728,61s user 33,93s system 369% cpu 3:26,53 total
Does it work?
➜ parallel git:(master) ✗ diff -s output_parallel.txt output_non_parallel.txt
Files output_parallel.txt and output_non_parallel.txt are identical
It works! The point of this article is not to provide accurate benchmark. However, you can see expected speedup ~ 6 times.
I find GNU parallel very useful in my professional life. Typical use case for me is ad hoc information retrieval from log files. I usually use group of testing servers that nearly always stay idle.
The first selling point for me is almost no dependencies. Parallel is written in perl. I managed to run it just by copying parallel script from desktop to server without installation in all the cases so far.
Secondly, simplicity is mind blowing. Parallel is shipped with tutorial man parallel_tutorial
that you can go through in one hour. Majority of use cases can be implemented as one liners. No scripts and configuration is needed. What is more, your jobs can be written in any language as it uses shell to execute them. Whole interaction with jobs is done with shell arguments and standard input / output.