Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Distribute an embarrassingly-parallel workload across a worker cluster. Dependency-free!
#!/usr/bin/env perl
# Author: Jamie Davis <davisjam@vt.edu>
# Description:
# - Perform a bunch of operations, using a cluster of worker nodes.
# - To cleanly exit early, check the first few lines of stderr for a file to touch.
# Dependencies.
use strict;
use warnings;
use threads;
use threads::shared;
use JSON::PP;
use Getopt::Long;
use Time::HiRes qw( usleep );
use Carp;
# Globals.
my %globals;
my $LOG_LOCK : shared; # Logging.
my $WORKERS_DONE : shared; # Worker management.
$WORKERS_DONE = 0;
my $TASK_LOCK : shared; # Getting and delivering tasks.
my $tasksLoaded = 0;
my $TASKS : shared; # array ref
my $RESULT_LOCK : shared; # Emitting results.
my $RESULT_FH;
my $resultFHOpened : shared;
$resultFHOpened = 0;
my $exitEarlyFile = "/tmp/distributed-work-exit-early-pid$$"; # External signaling.
unlink $exitEarlyFile;
my $EXIT_EARLY : shared;
$EXIT_EARLY = 0;
my $NO_TASKS_LEFT = -1;
# Process args.
my $invocation = "$0 " . join(" ", @ARGV);
my %args;
GetOptions(\%args,
"cluster=s",
"workScript=s",
"taskFile=s",
"resultFile=s",
"workers=s@",
"notWorkers=s@",
"unusedCores=i",
"copy=s",
"PATHprefix=s",
"verbose",
"help",
) or die "Error parsing args\n";
%globals = &processArgs(%args);
&log("Invocation: $invocation");
my @workerNames = map { $_->{host} } @{$globals{cluster}};
&log(scalar(@{$globals{cluster}}) . " workers: <@workerNames>");
# Load tasks.
&log("Loading tasks from $globals{taskFile}");
$TASKS = shared_clone([&loadTasks("taskFile"=>$globals{taskFile})]);
&log("Opening resultFile $globals{resultFile}");
$RESULT_FH = &openResultFile($globals{resultFile});
# Start and await workers.
&log("Starting workers");
&log("To cleanly exit early, run the following command:\n touch $exitEarlyFile");
&runWorkers("cluster"=>$globals{cluster});
&log("See results in $globals{resultFile}");
&noMoreResults();
# Write out any remaining tasks.
if ($EXIT_EARLY) {
&log("Exited early, writing out any remaining tasks");
&writeOutRemainingTasks();
}
exit(0);
# Launch and await workers on all workers
#
# input: %args keys: cluster
sub runWorkers {
my (%args) = @_;
&assertUsage("runWorkers: Error, usage: hash with keys: cluster", $args{cluster});
# Propagate copy
if ($globals{copy_src} and $globals{copy_dest}) {
&log("runWorkers: Propagating copy");
&propagateCopyDir("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$args{cluster});
}
# Start workers
&log("runWorkers: Starting workers");
my @my_threads;
my $thread_exitEarly;
for my $worker (@{$args{cluster}}) {
my $unusedCores = &min($globals{unusedCores}, $worker->{logicalCores});
if ($worker->{unusedCores}) { # Override global recommendation?
$unusedCores = $worker->{unusedCores};
}
my $usedCores = $worker->{logicalCores} - $unusedCores;
&log("runWorkers: Starting $usedCores processes on worker $worker->{host}, leaving $unusedCores unused cores");
for my $core (1 .. $usedCores) {
my %workerWithID = %{$worker};
$workerWithID{id} = $core;
&log("runWorkers: $workerWithID{host}:$workerWithID{id}");
push @my_threads, threads->create(\&thread_worker, \%workerWithID);
}
}
$thread_exitEarly = threads->create(\&thread_exitEarly, ($exitEarlyFile));
&log("runWorkers: Waiting on my " . scalar(@my_threads) . " threads");
for my $thr (@my_threads) {
&log("runWorkers: thread finished!");
$thr->join();
}
# If not already done, signal $thread_exitEarly.
{ lock($WORKERS_DONE);
$WORKERS_DONE = 1;
}
&log("runWorkers: Waiting on thread_exitEarly");
$thread_exitEarly->join();
&log("runWorkers: Done");
return;
}
sub getRemainingTasks {
# Extract all remaining tasks.
my @remainingTasks;
while (1) {
my $t = &getNextTask();
last if ($t eq $NO_TASKS_LEFT);
push @remainingTasks, $t;
}
my @strings = map { &task_toString($_) } @remainingTasks;
&log("getRemainingTasks: Got " . scalar(@remainingTasks) . " remaining tasks: <@strings>");
return @remainingTasks;
}
# input: ()
# output: ($anyRemainingTasks)
sub writeOutRemainingTasks {
# A file to write to.
my $remainingWorkFile = "/tmp/distributed-work-remainingWork-pid$$\.txt";
unlink $remainingWorkFile;
my @remainingTasks = &getRemainingTasks();
if (@remainingTasks) {
# Convert back to JSON and write out.
my @lines = map { encode_json($_->{task}) } @remainingTasks;
my $contents = join("\n", @lines);
&writeToFile("file"=>$remainingWorkFile, "contents"=>$contents);
&log(scalar(@remainingTasks) . " tasks remaining, see $remainingWorkFile");
}
return (0 < scalar(@remainingTasks));
}
# Thread.
# Get and do tasks until none remain, then return.
#
# input: ($worker) worker from &getClusterInfo, with extra key 'id'
# output: ()
sub thread_worker {
my ($worker) = @_;
my $tid = threads->tid();
my $workerStr = "$worker->{host}:$worker->{id}";
my $logPref = $workerStr;
for (my $taskNum = 0; ; $taskNum++) {
# Should we EXIT_EARLY?
{
lock($EXIT_EARLY);
if ($EXIT_EARLY) {
&log("$logPref: exiting early");
return;
}
}
# Get and complete a task.
my $task = &getNextTask();
if ($task eq $NO_TASKS_LEFT) {
&log("$logPref: No tasks left");
last;
}
&log("$logPref: task " . scalar(&task_toString($task)));
my $out = &work("worker"=>$worker, "task"=>$task);
&log("$logPref: completed task $task->{id}. Output: $out");
my $result = { "task"=>$task, "worker"=>$workerStr, "output"=>$out };
&emitResult($result);
}
return;
}
# input: %args: keys: src dest worker
# output: $dest
#
# Transfer the specified src
sub transferFile {
my %args = @_;
&assertUsage("transferFile: usage: src dest worker", $args{src}, $args{dest}, $args{worker});
my ($out, $rc) = &cmd("scp -P$args{worker}->{port} $args{src} $args{worker}->{user}\@$args{worker}->{host}:$args{dest}");
return $args{dest};
}
# input: %args: keys: file contents
# output: $file
sub writeToFile {
my %args = @_;
&assertUsage("writeToFile: usage: file contents", $args{file}, $args{contents});
open(my $fh, '>', $args{file});
print $fh $args{contents};
close $fh;
return $args{file};
}
# input: (%args) keys: worker task
# worker entry from &getClusterInfo
# task created by &createTask
# output: ($out)
sub work {
my %args = @_;
&assertUsage("work: usage: worker task", $args{worker}, $args{task});
my $tid = threads->tid();
# Create task file.
my $localTaskFile = "/tmp/parallel-process-repo-$$\_$tid-LOCAL.json";
my $remoteTaskFile = "/tmp/parallel-process-repo-$$\_$tid-REMOTE.json";
&writeToFile("file"=>$localTaskFile, "contents"=>encode_json($args{task}->{task}));
&transferFile("src"=>$localTaskFile, "dest"=>$remoteTaskFile, "worker"=>$args{worker});
unlink $localTaskFile;
# Process task remotely and log output.
my $PATHprefix = $globals{PATHprefix} ? "PATH=$globals{PATHprefix}:\$PATH" : "";
my $remoteCmd = "$PATHprefix $globals{workScript} $remoteTaskFile 2>/dev/null; rm $remoteTaskFile";
my $out = &remoteCommand("accessCreds"=>$args{worker}, "command"=>$remoteCmd);
unlink $remoteTaskFile;
return $out;
}
# Thread.
# Forever: Check whether we should exit early.
# If so, set $EXIT_EARLY and then return.
# Otherwise, if workers have finished ($WORKERS_DONE), return.
#
# input: ($exitEarlyFile)
# output: ()
sub thread_exitEarly {
my ($exitEarlyFile) = @_;
while (1) {
# Exit early?
if (-f $exitEarlyFile) {
{ lock($EXIT_EARLY);
$EXIT_EARLY = 1;
}
last;
}
# Workers done?
my $done;
{ lock($WORKERS_DONE);
$done = $WORKERS_DONE;
}
if ($done) {
last;
}
usleep(100*1000); # 100 ms
}
}
# input: ($clusterFile)
# output: @cluster: list of node objects with keys: host user port logicalCores
sub getClusterInfo {
my ($clusterFile) = @_;
&assertUsage("getClusterInfo: Error, usage: (clusterFile)", $clusterFile);
if (not -f $clusterFile or $clusterFile !~ m/\.json$/i) {
die "getClusterInfo: Error, invalid clusterFile <$clusterFile>\n";
}
my ($out, $rc) = &cmd("cat $clusterFile 2>/dev/null");
if ($rc) {
die "getClusterInfo: Error, could not read clusterFile <$clusterFile>: $!\n";
}
my $cluster = eval {
return decode_json($out);
};
if ($@) {
die "getClusterInfo: Error parsing clusterFile <$clusterFile>: $@\n";
}
# Confirm nodes are valid.
my @cluster = @$cluster;
my $i = 0;
for my $node (@cluster) {
if (not &_isClusterNodeValid($node)) {
die "getClusterInfo: Error, node $i is invalid (0-indexed)\n";
}
$i++;
}
# Augment with a "logicalCores" field if none provided
for my $node (@cluster) {
if (not defined $node->{logicalCores}) {
my $out = &remoteCommand("accessCreds"=>$node, "command"=>"nproc");
if ($out =~ m/^(\d+)$/) {
$node->{logicalCores} = int($out);
}
else {
die "getClusterInfo: Error, could not get logical cores for node $node->{host}:\n$out\n";
}
}
&log("$node->{host} has $node->{logicalCores} logical cores");
}
return @$cluster;
}
# input: ($clusterNode)
# output: ($isValid)
sub _isClusterNodeValid {
my ($node) = @_;
my @keys = ("host", "port", "user");
if (not $node) {
return 0;
}
for my $key (@keys) {
if (not $node->{$key}) {
return 0;
}
}
return 1;
}
###
# Usage message, arg parsing.
###
sub getTerseUsage {
my $terseUsage = "Usage: $0 --cluster C.json --workScript W --taskFile F
[--resultFile R] [--workers w1,...] [--notWorkers w1,...]
[--unusedCores N]
[--copy src:dest] [--PATHprefix dir1:...]
[--verbose] [--help]
";
return $terseUsage;
}
sub shortUsage {
print &getTerseUsage();
exit 0;
}
sub longUsage {
my $terseUsage = &getTerseUsage();
print "Description: Distribute tasks across workers
$terseUsage
--cluster C.json JSON-formatted cluster of workers
Should be an array of \"node\" objects with minimal keys: host port user [logicalCores] [unusedCores]
host, port, user: suitable for passwordless ssh
[logicalCores]: skip query of node for # logical cores
[unusedCores]: override global --unusedCores
--workScript W Script to execute against each task
**Must exist on every worker**
Argument is a filename, its stdout is saved in resultFile
--taskFile F One task per line, JSON-encoded.
[--resultFile R] One result per line, NOT guaranteed in the same order
JSON-encoded.
[--workers w1,... | --notWorkers w1,... ] Workers to use | workers not to use
[--unusedCores N] Cores to leave available on a worker
[--copy src:dest] Copy src to dest on every worker before running workScript
Must be that src != dest. Can be a file or a dir.
[--PATHprefix dir1:...] Prefix PATH with this string when executing workScript
[--verbose]
[--help]
";
}
# Process args after GetOptions, ensure validity, etc.
#
# input: (%args) from GetOptions
# output: (%globals) with keys:
# cluster listref of hashrefs representing nodes to use in the worker cluster
# workScript script to execute
# taskFile one task per line
# resultFile one result per line
# unusedCores cores to leave idle on each worker
# [copy_src dir on manager]
# [copy_dest dir on worker]
# [PATHprefix prefix for PATH when invoking workScript]
# verbose extra loud
sub processArgs {
my %args = @_;
my $invalidArgs = 0;
# Bail out on no args or help.
if (not scalar(keys %args)) {
&shortUsage();
exit 0;
}
if ($args{help}) {
&longUsage();
exit 0;
}
# cluster
my @cluster;
if ($args{cluster} and -f $args{cluster} and $args{cluster} =~ m/\.js(on)?$/i) {
&log("Cluster file $args{cluster}");
@cluster = &getClusterInfo($args{cluster});
# workers
if ($args{workers}) {
$args{workers} = [split(",", join(",", @{$args{workers}}))]; # --w x,y --w z becomes x,y,z
}
else {
$args{workers} = [map { $_->{host} } @cluster]; # Default to include all
}
# Filter in workers
&log("Filtering in workers <@{$args{workers}}>");
@cluster = grep { &listContains($args{workers}, $_->{host}) } @cluster;
# my @filteredCluster;
# for my $maybeNode (@cluster) {
# if (&listContains($args{workers}, $maybeNode->{host})) {
# push @filteredCluster, $maybeNode;
# }
# }
# @cluster = @filteredCluster;
# notWorkers
if ($args{notWorkers}) {
$args{notWorkers} = [split(",", join(",", @{$args{notWorkers}}))]; # --w x,y --w z becomes x,y,z
}
else {
$args{notWorkers} = []; # Default to exclude none
}
# Filter out notWorkers
&log("Filtering out workers <@{$args{notWorkers}}>");
@cluster = grep { not &listContains($args{notWorkers}, $_->{host}) } @cluster;
# @filteredCluster = ();
# for my $maybeNode (@cluster) {
# if (not &listContains($args{notWorkers}, $maybeNode->{host})) {
# push @filteredCluster, $maybeNode;
# }
# }
# @cluster = @filteredCluster;
my @survivingNames = map { $_->{host} } @cluster;
if (@survivingNames) {
&log("Using " . scalar(@cluster) . " workers <@survivingNames>");
}
else {
&log("Error, no workers");
$invalidArgs = 1;
}
}
else {
&log("Error, invalid cluster file");
$invalidArgs = 1;
}
# workScript
if ($args{workScript}) {
&log("Using workScript $args{workScript}");
# Can't use -f.
# workScript might not exist *anywhere* yet before we honor --copy.
# And this node might not be a worker so -f would still fail.
}
# taskFile
if ($args{taskFile} and -f $args{taskFile}) {
&log("Using taskFile $args{taskFile}");
}
else {
&log("Error, invalid taskFile");
$invalidArgs = 1;
}
# resultFile
if (not $args{resultFile}) {
$args{resultFile} = "/tmp/parallel-work-pid$$-results";
}
unlink $args{resultFile};
&log("Using resultFile $args{resultFile}");
# unusedCores
if (not defined $args{unusedCores}) {
$args{unusedCores} = 0;
}
if (0 <= $args{unusedCores}) {
&log("Using unusedCores $args{unusedCores}");
}
else {
&log("Error, invalid unusedCores $args{unusedCores}");
$invalidArgs = 1;
}
# copy
my ($copySrc, $copyDest);
if (defined $args{copy}) {
my @spl = split(":", $args{copy});
if (scalar(@spl) == 2) {
my ($src, $dest) = @spl;
if ($src ne $dest and -e $src) {
&log("Using copySrc $src copyDest $dest");
$copySrc = $src;
$copyDest = $dest;
}
else {
&log("Error, invalid copy src $src dest $dest. Must have src != dest, and src must exist");
$invalidArgs = 1;
}
}
else {
&log("Error, malformed copy. Must be src:dest");
$invalidArgs = 1;
}
}
# PATHprefix
if (not defined $args{PATHprefix}) {
$args{PATHprefix} = "";
}
# verbose
if (not defined $args{verbose}) {
$args{verbose} = 0;
}
# Error out on invalid args.
if ($invalidArgs) {
&shortUsage();
exit 1;
}
my %globals = ("cluster" => \@cluster,
"workScript" => $args{workScript},
"taskFile" => $args{taskFile},
"resultFile" => $args{resultFile},
"unusedCores" => $args{unusedCores},
"copy_src" => $copySrc,
"copy_dest" => $copyDest,
"PATHprefix" => $args{PATHprefix},
"verbose" => $args{verbose},
);
return %globals;
}
###
# Task management.
###
# input: (%args) keys: taskFile
# output: (@tasks)
# Each elt contains a task from &task_create,
# where the 'task' is a json-decoded version of one of the lines from the taskFile.
sub loadTasks {
my %args = @_;
&assertUsage("Usage: taskFile", $args{taskFile});
lock($TASK_LOCK);
return if ($tasksLoaded);
$tasksLoaded = 1;
my @taskLines = `cat $args{taskFile}`;
chomp @taskLines;
my $_id = 1; # 1-indexed for sanity
my @tasks;
for my $line (@taskLines) {
&log("Task line: <$line>");
my $task = decode_json($line);
my $id = $_id;
push @tasks, &task_create("id"=>$id, "task"=>$task);
$_id++;
}
return @tasks;
}
# Get the next task from @TASKS.
#
# input: ()
# output: ($task) or $NO_TASKS_LEFT
# $task as created by &task_create.
sub getNextTask {
lock($TASK_LOCK);
&assert(($tasksLoaded), "getNextTask: Tasks never loaded");
if (not @{$TASKS}) {
return $NO_TASKS_LEFT;
}
my $nextTask = shift @{$TASKS};
&log("getNextTask: got <" . &task_toString($nextTask) . ">, " . scalar(@${TASKS}) . " tasks remaining");
return $nextTask;
}
# input: (%args) keys: id task
# output: ($task) a ref with keys: id task
sub task_create {
my (%args) = @_;
&assertUsage("createTask: usage: id task", $args{id}, $args{task});
return { "id" => $args{id},
"task" => $args{task},
};
}
sub task_toString {
my ($task) = @_;
return "$task->{id}: " . encode_json($task->{task});
}
###
# Result management.
###
# input: ($resultFile)
# output: ($FH)
sub openResultFile {
my ($resultFile) = @_;
lock($RESULT_LOCK);
if (not $resultFHOpened) {
&log("emitResult: Opening $resultFile for results");
open($RESULT_FH, ">", $resultFile) or confess "Error, could not open resultFile: $!\n";
$resultFHOpened = 1;
}
return $RESULT_FH;
}
# input: ($result) result object with keys: task workerInfo output
# task: from &getNextTask
# result: command-line output
# output: ()
sub emitResult {
my ($result) = @_;
lock($RESULT_LOCK);
if (not $resultFHOpened) {
confess "Error, must call openResultFile first\n";
}
my $encodedResult = encode_json($result);
&log("emitResult: Emitting: <$encodedResult>");
print $RESULT_FH "$encodedResult\n";
return;
}
sub noMoreResults {
if ($RESULT_FH) {
close($RESULT_FH) or &log("Error, closing result_fh failed: $!");
}
}
###
# Handle copy
###
# input: (%args) keys: src dest destAccessCreds
# destAccessCreds: array ref of hashrefs of destAccessCreds for use with remoteCopy
# output: ()
sub propagateCopyDir {
my %args = @_;
my @helpers;
# Parallel propagation.
for my $cred (@{$args{destAccessCreds}}) {
my %args = ("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$cred);
my $thr = threads->create(sub {
my %args = @_;
my $rc = &remoteCopy("src"=>$args{src}, "dest"=>$args{dest}, "destAccessCreds"=>$cred);
return $rc;
}, %args);
push @helpers, $thr;
}
# Confirm that all propagation succeeded.
my @results;
for my $helper (@helpers) {
push @results, $helper->join();
}
if (grep { $_ ne 0 } @results) {
confess "Error, at least one copy failure: <@results>\n";
}
return;
}
###
# Utility
###
# input: (\@list, $e)
# output: true if $e is in @list, else false
sub listContains {
my ($list, $e) = @_;
for my $elt (@$list) {
if ($elt eq $e) {
return 1;
}
}
return 0;
}
sub min {
my (@nums) = @_;
my $min = $nums[0];
for my $n (@nums) {
if ($n < $min) {
$min = $n;
}
}
return $min;
}
sub max {
my (@nums) = @_;
my $max = $nums[0];
for my $n (@nums) {
if ($max < $n) {
$max = $n;
}
}
return $max;
}
sub assert {
my ($cond, $msg) = @_;
if (not $cond) {
print "ERROR: $msg\n";
exit 1;
}
}
# input: ($msg, @varsThatShouldBeDefined)
# output: ()
sub assertUsage {
my ($msg, @shouldBeDefined) = @_;
my @undefined = grep { not defined $_ } @shouldBeDefined;
&assert((not @undefined), $msg);
}
# input: ($cmd)
# output: ($out, $rc)
sub cmd {
my ($cmd) = @_;
&log($cmd);
my $out = `$cmd 2>&1`;
return ($out, $? >> 8);
}
sub log {
my ($msg) = @_;
my $now = localtime;
lock($LOG_LOCK);
print STDERR "$now: $msg\n";
}
# input: (%args) keys: accessCreds command
# accessCreds: hashref, keys: port user host
# hint: a worker object can be used as accessCreds
# command: string to execute over ssh
# output: ($out)
sub remoteCommand {
my %args = @_;
&assertUsage("remoteCommand: Error, usage: hash with keys: accessCreds command", $args{accessCreds}, $args{command});
my $cmd = "ssh -p$args{accessCreds}->{port} $args{accessCreds}->{user}\@$args{accessCreds}->{host} '$args{command}'";
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it.
return ($out);
}
# Copy local file to remote host
# Recursively copy dirs
#
# input: (%args) keys: src dest destAccessCreds
# src/dest: files or dirs
# destAccessCreds: hashref, keys: port user host
# output: ($rc) 0 success, non-zero failure
sub remoteCopy {
my %args = @_;
my $cmd = "scp -r -P$args{destAccessCreds}->{port} $args{src} $args{destAccessCreds}->{user}\@$args{destAccessCreds}->{host}:$args{dest}";
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it.
return ($rc);
}
@davisjam

This comment has been minimized.

Copy link
Owner Author

@davisjam davisjam commented Jan 6, 2018

I've made a few extensions, haven't figured out how to push yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment