Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Distribute an embarrassingly-parallel workload across a worker cluster. Dependency-free!
#!/usr/bin/env perl
# Author: Jamie Davis <davisjam@vt.edu>
# Description:
# - Perform a bunch of operations, using a cluster of worker nodes.
# - To cleanly exit early, check the first few lines of stderr for a file to touch.
# Dependencies.
use strict;
use warnings;
use threads;
use threads::shared;
use JSON::PP;
use Getopt::Long;
use Time::HiRes qw( usleep );
use Carp;
# Globals.
my %globals;
my $LOG_LOCK : shared; # Logging.
my $WORKERS_DONE : shared; # Worker management.
$WORKERS_DONE = 0;
my $TASK_LOCK : shared; # Getting and delivering tasks.
my $tasksLoaded = 0;
my $TASKS : shared; # array ref
my $RESULT_LOCK : shared; # Emitting results.
my $RESULT_FH;
my $resultFHOpened : shared;
$resultFHOpened = 0;
my $exitEarlyFile = "/tmp/distributed-work-exit-early-pid$$"; # External signaling.
unlink $exitEarlyFile;
my $EXIT_EARLY : shared;
$EXIT_EARLY = 0;
my $NO_TASKS_LEFT = -1;
# Process args.
my $invocation = "$0 " . join(" ", @ARGV);
my %args;
GetOptions(\%args,
"cluster=s",
"workScript=s",
"taskFile=s",
"resultFile=s",
"workers=s@",
"notWorkers=s@",
"unusedCores=i",
"copy=s",
"PATHprefix=s",
"verbose",
"help",
) or die "Error parsing args\n";
%globals = &processArgs(%args);
&log("Invocation: $invocation");
my @workerNames = map { $_->{host} } @{$globals{cluster}};
&log(scalar(@{$globals{cluster}}) . " workers: <@workerNames>");
# Load tasks.
&log("Loading tasks from $globals{taskFile}");
$TASKS = shared_clone([&loadTasks("taskFile"=>$globals{taskFile})]);
&log("Opening resultFile $globals{resultFile}");
$RESULT_FH = &openResultFile($globals{resultFile});
# Start and await workers.
&log("Starting workers");
&log("To cleanly exit early, run the following command:\n touch $exitEarlyFile");
&runWorkers("cluster"=>$globals{cluster});
&log("See results in $globals{resultFile}");
&noMoreResults();
# Write out any remaining tasks.
if ($EXIT_EARLY) {
&log("Exited early, writing out any remaining tasks");
&writeOutRemainingTasks();
}
exit(0);
# Launch and await workers on all workers
#
# input: %args keys: cluster
sub runWorkers {
my (%args) = @_;
&assertUsage("runWorkers: Error, usage: hash with keys: cluster", $args{cluster});
# Propagate copy
if ($globals{copy_src} and $globals{copy_dest}) {
&log("runWorkers: Propagating copy");
&propagateCopyDir("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$args{cluster});
}
# Start workers
&log("runWorkers: Starting workers");
my @my_threads;
my $thread_exitEarly;
for my $worker (@{$args{cluster}}) {
my $unusedCores = &min($globals{unusedCores}, $worker->{logicalCores});
if ($worker->{unusedCores}) { # Override global recommendation?
$unusedCores = $worker->{unusedCores};
}
my $usedCores = $worker->{logicalCores} - $unusedCores;
&log("runWorkers: Starting $usedCores processes on worker $worker->{host}, leaving $unusedCores unused cores");
for my $core (1 .. $usedCores) {
my %workerWithID = %{$worker};
$workerWithID{id} = $core;
&log("runWorkers: $workerWithID{host}:$workerWithID{id}");
push @my_threads, threads->create(\&thread_worker, \%workerWithID);
}
}
$thread_exitEarly = threads->create(\&thread_exitEarly, ($exitEarlyFile));
&log("runWorkers: Waiting on my " . scalar(@my_threads) . " threads");
for my $thr (@my_threads) {
&log("runWorkers: thread finished!");
$thr->join();
}
# If not already done, signal $thread_exitEarly.
{ lock($WORKERS_DONE);
$WORKERS_DONE = 1;
}
&log("runWorkers: Waiting on thread_exitEarly");
$thread_exitEarly->join();
&log("runWorkers: Done");
return;
}
sub getRemainingTasks {
# Extract all remaining tasks.
my @remainingTasks;
while (1) {
my $t = &getNextTask();
last if ($t eq $NO_TASKS_LEFT);
push @remainingTasks, $t;
}
my @strings = map { &task_toString($_) } @remainingTasks;
&log("getRemainingTasks: Got " . scalar(@remainingTasks) . " remaining tasks: <@strings>");
return @remainingTasks;
}
# input: ()
# output: ($anyRemainingTasks)
sub writeOutRemainingTasks {
# A file to write to.
my $remainingWorkFile = "/tmp/distributed-work-remainingWork-pid$$\.txt";
unlink $remainingWorkFile;
my @remainingTasks = &getRemainingTasks();
if (@remainingTasks) {
# Convert back to JSON and write out.
my @lines = map { encode_json($_->{task}) } @remainingTasks;
my $contents = join("\n", @lines);
&writeToFile("file"=>$remainingWorkFile, "contents"=>$contents);
&log(scalar(@remainingTasks) . " tasks remaining, see $remainingWorkFile");
}
return (0 < scalar(@remainingTasks));
}
# Thread.
# Get and do tasks until none remain, then return.
#
# input: ($worker) worker from &getClusterInfo, with extra key 'id'
# output: ()
sub thread_worker {
my ($worker) = @_;
my $tid = threads->tid();
my $workerStr = "$worker->{host}:$worker->{id}";
my $logPref = $workerStr;
for (my $taskNum = 0; ; $taskNum++) {
# Should we EXIT_EARLY?
{
lock($EXIT_EARLY);
if ($EXIT_EARLY) {
&log("$logPref: exiting early");
return;
}
}
# Get and complete a task.
my $task = &getNextTask();
if ($task eq $NO_TASKS_LEFT) {
&log("$logPref: No tasks left");
last;
}
&log("$logPref: task " . scalar(&task_toString($task)));
my $out = &work("worker"=>$worker, "task"=>$task);
&log("$logPref: completed task $task->{id}. Output: $out");
my $result = { "task"=>$task, "worker"=>$workerStr, "output"=>$out };
&emitResult($result);
}
return;
}
# input: %args: keys: src dest worker
# output: $dest
#
# Transfer the specified src
sub transferFile {
my %args = @_;
&assertUsage("transferFile: usage: src dest worker", $args{src}, $args{dest}, $args{worker});
my ($out, $rc) = &cmd("scp -P$args{worker}->{port} $args{src} $args{worker}->{user}\@$args{worker}->{host}:$args{dest}");
return $args{dest};
}
# input: %args: keys: file contents
# output: $file
sub writeToFile {
my %args = @_;
&assertUsage("writeToFile: usage: file contents", $args{file}, $args{contents});
open(my $fh, '>', $args{file});
print $fh $args{contents};
close $fh;
return $args{file};
}
# input: (%args) keys: worker task
# worker entry from &getClusterInfo
# task created by &createTask
# output: ($out)
sub work {
my %args = @_;
&assertUsage("work: usage: worker task", $args{worker}, $args{task});
my $tid = threads->tid();
# Create task file.
my $localTaskFile = "/tmp/parallel-process-repo-$$\_$tid-LOCAL.json";
my $remoteTaskFile = "/tmp/parallel-process-repo-$$\_$tid-REMOTE.json";
&writeToFile("file"=>$localTaskFile, "contents"=>encode_json($args{task}->{task}));
&transferFile("src"=>$localTaskFile, "dest"=>$remoteTaskFile, "worker"=>$args{worker});
unlink $localTaskFile;
# Process task remotely and log output.
my $PATHprefix = $globals{PATHprefix} ? "PATH=$globals{PATHprefix}:\$PATH" : "";
my $remoteCmd = "$PATHprefix $globals{workScript} $remoteTaskFile 2>/dev/null; rm $remoteTaskFile";
my $out = &remoteCommand("accessCreds"=>$args{worker}, "command"=>$remoteCmd);
unlink $remoteTaskFile;
return $out;
}
# Thread.
# Forever: Check whether we should exit early.
# If so, set $EXIT_EARLY and then return.
# Otherwise, if workers have finished ($WORKERS_DONE), return.
#
# input: ($exitEarlyFile)
# output: ()
sub thread_exitEarly {
my ($exitEarlyFile) = @_;
while (1) {
# Exit early?
if (-f $exitEarlyFile) {
{ lock($EXIT_EARLY);
$EXIT_EARLY = 1;
}
last;
}
# Workers done?
my $done;
{ lock($WORKERS_DONE);
$done = $WORKERS_DONE;
}
if ($done) {
last;
}
usleep(100*1000); # 100 ms
}
}
# input: ($clusterFile)
# output: @cluster: list of node objects with keys: host user port logicalCores
sub getClusterInfo {
my ($clusterFile) = @_;
&assertUsage("getClusterInfo: Error, usage: (clusterFile)", $clusterFile);
if (not -f $clusterFile or $clusterFile !~ m/\.json$/i) {
die "getClusterInfo: Error, invalid clusterFile <$clusterFile>\n";
}
my ($out, $rc) = &cmd("cat $clusterFile 2>/dev/null");
if ($rc) {
die "getClusterInfo: Error, could not read clusterFile <$clusterFile>: $!\n";
}
my $cluster = eval {
return decode_json($out);
};
if ($@) {
die "getClusterInfo: Error parsing clusterFile <$clusterFile>: $@\n";
}
# Confirm nodes are valid.
my @cluster = @$cluster;
my $i = 0;
for my $node (@cluster) {
if (not &_isClusterNodeValid($node)) {
die "getClusterInfo: Error, node $i is invalid (0-indexed)\n";
}
$i++;
}
# Augment with a "logicalCores" field if none provided
for my $node (@cluster) {
if (not defined $node->{logicalCores}) {
my $out = &remoteCommand("accessCreds"=>$node, "command"=>"nproc");
if ($out =~ m/^(\d+)$/) {
$node->{logicalCores} = int($out);
}
else {
die "getClusterInfo: Error, could not get logical cores for node $node->{host}:\n$out\n";
}
}
&log("$node->{host} has $node->{logicalCores} logical cores");
}
return @$cluster;
}
# input: ($clusterNode)
# output: ($isValid)
sub _isClusterNodeValid {
my ($node) = @_;
my @keys = ("host", "port", "user");
if (not $node) {
return 0;
}
for my $key (@keys) {
if (not $node->{$key}) {
return 0;
}
}
return 1;
}
###
# Usage message, arg parsing.
###
sub getTerseUsage {
my $terseUsage = "Usage: $0 --cluster C.json --workScript W --taskFile F
[--resultFile R] [--workers w1,...] [--notWorkers w1,...]
[--unusedCores N]
[--copy src:dest] [--PATHprefix dir1:...]
[--verbose] [--help]
";
return $terseUsage;
}
sub shortUsage {
print &getTerseUsage();
exit 0;
}
sub longUsage {
my $terseUsage = &getTerseUsage();
print "Description: Distribute tasks across workers
$terseUsage
--cluster C.json JSON-formatted cluster of workers
Should be an array of \"node\" objects with minimal keys: host port user [logicalCores] [unusedCores]
host, port, user: suitable for passwordless ssh
[logicalCores]: skip query of node for # logical cores
[unusedCores]: override global --unusedCores
--workScript W Script to execute against each task
**Must exist on every worker**
Argument is a filename, its stdout is saved in resultFile
--taskFile F One task per line, JSON-encoded.
[--resultFile R] One result per line, NOT guaranteed in the same order
JSON-encoded.
[--workers w1,... | --notWorkers w1,... ] Workers to use | workers not to use
[--unusedCores N] Cores to leave available on a worker
[--copy src:dest] Copy src to dest on every worker before running workScript
Must be that src != dest. Can be a file or a dir.
[--PATHprefix dir1:...] Prefix PATH with this string when executing workScript
[--verbose]
[--help]
";
}
# Process args after GetOptions, ensure validity, etc.
#
# input: (%args) from GetOptions
# output: (%globals) with keys:
# cluster listref of hashrefs representing nodes to use in the worker cluster
# workScript script to execute
# taskFile one task per line
# resultFile one result per line
# unusedCores cores to leave idle on each worker
# [copy_src dir on manager]
# [copy_dest dir on worker]
# [PATHprefix prefix for PATH when invoking workScript]
# verbose extra loud
sub processArgs {
my %args = @_;
my $invalidArgs = 0;
# Bail out on no args or help.
if (not scalar(keys %args)) {
&shortUsage();
exit 0;
}
if ($args{help}) {
&longUsage();
exit 0;
}
# cluster
my @cluster;
if ($args{cluster} and -f $args{cluster} and $args{cluster} =~ m/\.js(on)?$/i) {
&log("Cluster file $args{cluster}");
@cluster = &getClusterInfo($args{cluster});
# workers
if ($args{workers}) {
$args{workers} = [split(",", join(",", @{$args{workers}}))]; # --w x,y --w z becomes x,y,z
}
else {
$args{workers} = [map { $_->{host} } @cluster]; # Default to include all
}
# Filter in workers
&log("Filtering in workers <@{$args{workers}}>");
@cluster = grep { &listContains($args{workers}, $_->{host}) } @cluster;
# my @filteredCluster;
# for my $maybeNode (@cluster) {
# if (&listContains($args{workers}, $maybeNode->{host})) {
# push @filteredCluster, $maybeNode;
# }
# }
# @cluster = @filteredCluster;
# notWorkers
if ($args{notWorkers}) {
$args{notWorkers} = [split(",", join(",", @{$args{notWorkers}}))]; # --w x,y --w z becomes x,y,z
}
else {
$args{notWorkers} = []; # Default to exclude none
}
# Filter out notWorkers
&log("Filtering out workers <@{$args{notWorkers}}>");
@cluster = grep { not &listContains($args{notWorkers}, $_->{host}) } @cluster;
# @filteredCluster = ();
# for my $maybeNode (@cluster) {
# if (not &listContains($args{notWorkers}, $maybeNode->{host})) {
# push @filteredCluster, $maybeNode;
# }
# }
# @cluster = @filteredCluster;
my @survivingNames = map { $_->{host} } @cluster;
if (@survivingNames) {
&log("Using " . scalar(@cluster) . " workers <@survivingNames>");
}
else {
&log("Error, no workers");
$invalidArgs = 1;
}
}
else {
&log("Error, invalid cluster file");
$invalidArgs = 1;
}
# workScript
if ($args{workScript}) {
&log("Using workScript $args{workScript}");
# Can't use -f.
# workScript might not exist *anywhere* yet before we honor --copy.
# And this node might not be a worker so -f would still fail.
}
# taskFile
if ($args{taskFile} and -f $args{taskFile}) {
&log("Using taskFile $args{taskFile}");
}
else {
&log("Error, invalid taskFile");
$invalidArgs = 1;
}
# resultFile
if (not $args{resultFile}) {
$args{resultFile} = "/tmp/parallel-work-pid$$-results";
}
unlink $args{resultFile};
&log("Using resultFile $args{resultFile}");
# unusedCores
if (not defined $args{unusedCores}) {
$args{unusedCores} = 0;
}
if (0 <= $args{unusedCores}) {
&log("Using unusedCores $args{unusedCores}");
}
else {
&log("Error, invalid unusedCores $args{unusedCores}");
$invalidArgs = 1;
}
# copy
my ($copySrc, $copyDest);
if (defined $args{copy}) {
my @spl = split(":", $args{copy});
if (scalar(@spl) == 2) {
my ($src, $dest) = @spl;
if ($src ne $dest and -e $src) {
&log("Using copySrc $src copyDest $dest");
$copySrc = $src;
$copyDest = $dest;
}
else {
&log("Error, invalid copy src $src dest $dest. Must have src != dest, and src must exist");
$invalidArgs = 1;
}
}
else {
&log("Error, malformed copy. Must be src:dest");
$invalidArgs = 1;
}
}
# PATHprefix
if (not defined $args{PATHprefix}) {
$args{PATHprefix} = "";
}
# verbose
if (not defined $args{verbose}) {
$args{verbose} = 0;
}
# Error out on invalid args.
if ($invalidArgs) {
&shortUsage();
exit 1;
}
my %globals = ("cluster" => \@cluster,
"workScript" => $args{workScript},
"taskFile" => $args{taskFile},
"resultFile" => $args{resultFile},
"unusedCores" => $args{unusedCores},
"copy_src" => $copySrc,
"copy_dest" => $copyDest,
"PATHprefix" => $args{PATHprefix},
"verbose" => $args{verbose},
);
return %globals;
}
###
# Task management.
###
# input: (%args) keys: taskFile
# output: (@tasks)
# Each elt contains a task from &task_create,
# where the 'task' is a json-decoded version of one of the lines from the taskFile.
sub loadTasks {
my %args = @_;
&assertUsage("Usage: taskFile", $args{taskFile});
lock($TASK_LOCK);
return if ($tasksLoaded);
$tasksLoaded = 1;
my @taskLines = `cat $args{taskFile}`;
chomp @taskLines;
my $_id = 1; # 1-indexed for sanity
my @tasks;
for my $line (@taskLines) {
&log("Task line: <$line>");
my $task = decode_json($line);
my $id = $_id;
push @tasks, &task_create("id"=>$id, "task"=>$task);
$_id++;
}
return @tasks;
}
# Get the next task from @TASKS.
#
# input: ()
# output: ($task) or $NO_TASKS_LEFT
# $task as created by &task_create.
sub getNextTask {
lock($TASK_LOCK);
&assert(($tasksLoaded), "getNextTask: Tasks never loaded");
if (not @{$TASKS}) {
return $NO_TASKS_LEFT;
}
my $nextTask = shift @{$TASKS};
&log("getNextTask: got <" . &task_toString($nextTask) . ">, " . scalar(@${TASKS}) . " tasks remaining");
return $nextTask;
}
# input: (%args) keys: id task
# output: ($task) a ref with keys: id task
sub task_create {
my (%args) = @_;
&assertUsage("createTask: usage: id task", $args{id}, $args{task});
return { "id" => $args{id},
"task" => $args{task},
};
}
sub task_toString {
my ($task) = @_;
return "$task->{id}: " . encode_json($task->{task});
}
###
# Result management.
###
# input: ($resultFile)
# output: ($FH)
sub openResultFile {
my ($resultFile) = @_;
lock($RESULT_LOCK);
if (not $resultFHOpened) {
&log("emitResult: Opening $resultFile for results");
open($RESULT_FH, ">", $resultFile) or confess "Error, could not open resultFile: $!\n";
$resultFHOpened = 1;
}
return $RESULT_FH;
}
# input: ($result) result object with keys: task workerInfo output
# task: from &getNextTask
# result: command-line output
# output: ()
sub emitResult {
my ($result) = @_;
lock($RESULT_LOCK);
if (not $resultFHOpened) {
confess "Error, must call openResultFile first\n";
}
my $encodedResult = encode_json($result);
&log("emitResult: Emitting: <$encodedResult>");
print $RESULT_FH "$encodedResult\n";
return;
}
sub noMoreResults {
if ($RESULT_FH) {
close($RESULT_FH) or &log("Error, closing result_fh failed: $!");
}
}
###
# Handle copy
###
# input: (%args) keys: src dest destAccessCreds
# destAccessCreds: array ref of hashrefs of destAccessCreds for use with remoteCopy
# output: ()
sub propagateCopyDir {
my %args = @_;
my @helpers;
# Parallel propagation.
for my $cred (@{$args{destAccessCreds}}) {
my %args = ("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$cred);
my $thr = threads->create(sub {
my %args = @_;
my $rc = &remoteCopy("src"=>$args{src}, "dest"=>$args{dest}, "destAccessCreds"=>$cred);
return $rc;
}, %args);
push @helpers, $thr;
}
# Confirm that all propagation succeeded.
my @results;
for my $helper (@helpers) {
push @results, $helper->join();
}
if (grep { $_ ne 0 } @results) {
confess "Error, at least one copy failure: <@results>\n";
}
return;
}
###
# Utility
###
# input: (\@list, $e)
# output: true if $e is in @list, else false
sub listContains {
my ($list, $e) = @_;
for my $elt (@$list) {
if ($elt eq $e) {
return 1;
}
}
return 0;
}
sub min {
my (@nums) = @_;
my $min = $nums[0];
for my $n (@nums) {
if ($n < $min) {
$min = $n;
}
}
return $min;
}
sub max {
my (@nums) = @_;
my $max = $nums[0];
for my $n (@nums) {
if ($max < $n) {
$max = $n;
}
}
return $max;
}
sub assert {
my ($cond, $msg) = @_;
if (not $cond) {
print "ERROR: $msg\n";
exit 1;
}
}
# input: ($msg, @varsThatShouldBeDefined)
# output: ()
sub assertUsage {
my ($msg, @shouldBeDefined) = @_;
my @undefined = grep { not defined $_ } @shouldBeDefined;
&assert((not @undefined), $msg);
}
# input: ($cmd)
# output: ($out, $rc)
sub cmd {
my ($cmd) = @_;
&log($cmd);
my $out = `$cmd 2>&1`;
return ($out, $? >> 8);
}
sub log {
my ($msg) = @_;
my $now = localtime;
lock($LOG_LOCK);
print STDERR "$now: $msg\n";
}
# input: (%args) keys: accessCreds command
# accessCreds: hashref, keys: port user host
# hint: a worker object can be used as accessCreds
# command: string to execute over ssh
# output: ($out)
sub remoteCommand {
my %args = @_;
&assertUsage("remoteCommand: Error, usage: hash with keys: accessCreds command", $args{accessCreds}, $args{command});
my $cmd = "ssh -p$args{accessCreds}->{port} $args{accessCreds}->{user}\@$args{accessCreds}->{host} '$args{command}'";
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it.
return ($out);
}
# Copy local file to remote host
# Recursively copy dirs
#
# input: (%args) keys: src dest destAccessCreds
# src/dest: files or dirs
# destAccessCreds: hashref, keys: port user host
# output: ($rc) 0 success, non-zero failure
sub remoteCopy {
my %args = @_;
my $cmd = "scp -r -P$args{destAccessCreds}->{port} $args{src} $args{destAccessCreds}->{user}\@$args{destAccessCreds}->{host}:$args{dest}";
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it.
return ($rc);
}
@davisjam

This comment has been minimized.

Copy link
Owner Author

davisjam commented Jan 6, 2018

I've made a few extensions, haven't figured out how to push yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.