Distribute an embarrassingly-parallel workload across a worker cluster. Dependency-free!
#!/usr/bin/env perl | |
# Author: Jamie Davis <davisjam@vt.edu> | |
# Description: | |
# - Perform a bunch of operations, using a cluster of worker nodes. | |
# - To cleanly exit early, check the first few lines of stderr for a file to touch. | |
# Dependencies. | |
use strict; | |
use warnings; | |
use threads; | |
use threads::shared; | |
use JSON::PP; | |
use Getopt::Long; | |
use Time::HiRes qw( usleep ); | |
use Carp; | |
# Globals. | |
my %globals; | |
my $LOG_LOCK : shared; # Logging. | |
my $WORKERS_DONE : shared; # Worker management. | |
$WORKERS_DONE = 0; | |
my $TASK_LOCK : shared; # Getting and delivering tasks. | |
my $tasksLoaded = 0; | |
my $TASKS : shared; # array ref | |
my $RESULT_LOCK : shared; # Emitting results. | |
my $RESULT_FH; | |
my $resultFHOpened : shared; | |
$resultFHOpened = 0; | |
my $exitEarlyFile = "/tmp/distributed-work-exit-early-pid$$"; # External signaling. | |
unlink $exitEarlyFile; | |
my $EXIT_EARLY : shared; | |
$EXIT_EARLY = 0; | |
my $NO_TASKS_LEFT = -1; | |
# Process args. | |
my $invocation = "$0 " . join(" ", @ARGV); | |
my %args; | |
GetOptions(\%args, | |
"cluster=s", | |
"workScript=s", | |
"taskFile=s", | |
"resultFile=s", | |
"workers=s@", | |
"notWorkers=s@", | |
"unusedCores=i", | |
"copy=s", | |
"PATHprefix=s", | |
"verbose", | |
"help", | |
) or die "Error parsing args\n"; | |
%globals = &processArgs(%args); | |
&log("Invocation: $invocation"); | |
my @workerNames = map { $_->{host} } @{$globals{cluster}}; | |
&log(scalar(@{$globals{cluster}}) . " workers: <@workerNames>"); | |
# Load tasks. | |
&log("Loading tasks from $globals{taskFile}"); | |
$TASKS = shared_clone([&loadTasks("taskFile"=>$globals{taskFile})]); | |
&log("Opening resultFile $globals{resultFile}"); | |
$RESULT_FH = &openResultFile($globals{resultFile}); | |
# Start and await workers. | |
&log("Starting workers"); | |
&log("To cleanly exit early, run the following command:\n touch $exitEarlyFile"); | |
&runWorkers("cluster"=>$globals{cluster}); | |
&log("See results in $globals{resultFile}"); | |
&noMoreResults(); | |
# Write out any remaining tasks. | |
if ($EXIT_EARLY) { | |
&log("Exited early, writing out any remaining tasks"); | |
&writeOutRemainingTasks(); | |
} | |
exit(0); | |
# Launch and await workers on all workers | |
# | |
# input: %args keys: cluster | |
sub runWorkers { | |
my (%args) = @_; | |
&assertUsage("runWorkers: Error, usage: hash with keys: cluster", $args{cluster}); | |
# Propagate copy | |
if ($globals{copy_src} and $globals{copy_dest}) { | |
&log("runWorkers: Propagating copy"); | |
&propagateCopyDir("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$args{cluster}); | |
} | |
# Start workers | |
&log("runWorkers: Starting workers"); | |
my @my_threads; | |
my $thread_exitEarly; | |
for my $worker (@{$args{cluster}}) { | |
my $unusedCores = &min($globals{unusedCores}, $worker->{logicalCores}); | |
if ($worker->{unusedCores}) { # Override global recommendation? | |
$unusedCores = $worker->{unusedCores}; | |
} | |
my $usedCores = $worker->{logicalCores} - $unusedCores; | |
&log("runWorkers: Starting $usedCores processes on worker $worker->{host}, leaving $unusedCores unused cores"); | |
for my $core (1 .. $usedCores) { | |
my %workerWithID = %{$worker}; | |
$workerWithID{id} = $core; | |
&log("runWorkers: $workerWithID{host}:$workerWithID{id}"); | |
push @my_threads, threads->create(\&thread_worker, \%workerWithID); | |
} | |
} | |
$thread_exitEarly = threads->create(\&thread_exitEarly, ($exitEarlyFile)); | |
&log("runWorkers: Waiting on my " . scalar(@my_threads) . " threads"); | |
for my $thr (@my_threads) { | |
&log("runWorkers: thread finished!"); | |
$thr->join(); | |
} | |
# If not already done, signal $thread_exitEarly. | |
{ lock($WORKERS_DONE); | |
$WORKERS_DONE = 1; | |
} | |
&log("runWorkers: Waiting on thread_exitEarly"); | |
$thread_exitEarly->join(); | |
&log("runWorkers: Done"); | |
return; | |
} | |
sub getRemainingTasks { | |
# Extract all remaining tasks. | |
my @remainingTasks; | |
while (1) { | |
my $t = &getNextTask(); | |
last if ($t eq $NO_TASKS_LEFT); | |
push @remainingTasks, $t; | |
} | |
my @strings = map { &task_toString($_) } @remainingTasks; | |
&log("getRemainingTasks: Got " . scalar(@remainingTasks) . " remaining tasks: <@strings>"); | |
return @remainingTasks; | |
} | |
# input: () | |
# output: ($anyRemainingTasks) | |
sub writeOutRemainingTasks { | |
# A file to write to. | |
my $remainingWorkFile = "/tmp/distributed-work-remainingWork-pid$$\.txt"; | |
unlink $remainingWorkFile; | |
my @remainingTasks = &getRemainingTasks(); | |
if (@remainingTasks) { | |
# Convert back to JSON and write out. | |
my @lines = map { encode_json($_->{task}) } @remainingTasks; | |
my $contents = join("\n", @lines); | |
&writeToFile("file"=>$remainingWorkFile, "contents"=>$contents); | |
&log(scalar(@remainingTasks) . " tasks remaining, see $remainingWorkFile"); | |
} | |
return (0 < scalar(@remainingTasks)); | |
} | |
# Thread. | |
# Get and do tasks until none remain, then return. | |
# | |
# input: ($worker) worker from &getClusterInfo, with extra key 'id' | |
# output: () | |
sub thread_worker { | |
my ($worker) = @_; | |
my $tid = threads->tid(); | |
my $workerStr = "$worker->{host}:$worker->{id}"; | |
my $logPref = $workerStr; | |
for (my $taskNum = 0; ; $taskNum++) { | |
# Should we EXIT_EARLY? | |
{ | |
lock($EXIT_EARLY); | |
if ($EXIT_EARLY) { | |
&log("$logPref: exiting early"); | |
return; | |
} | |
} | |
# Get and complete a task. | |
my $task = &getNextTask(); | |
if ($task eq $NO_TASKS_LEFT) { | |
&log("$logPref: No tasks left"); | |
last; | |
} | |
&log("$logPref: task " . scalar(&task_toString($task))); | |
my $out = &work("worker"=>$worker, "task"=>$task); | |
&log("$logPref: completed task $task->{id}. Output: $out"); | |
my $result = { "task"=>$task, "worker"=>$workerStr, "output"=>$out }; | |
&emitResult($result); | |
} | |
return; | |
} | |
# input: %args: keys: src dest worker | |
# output: $dest | |
# | |
# Transfer the specified src | |
sub transferFile { | |
my %args = @_; | |
&assertUsage("transferFile: usage: src dest worker", $args{src}, $args{dest}, $args{worker}); | |
my ($out, $rc) = &cmd("scp -P$args{worker}->{port} $args{src} $args{worker}->{user}\@$args{worker}->{host}:$args{dest}"); | |
return $args{dest}; | |
} | |
# input: %args: keys: file contents | |
# output: $file | |
sub writeToFile { | |
my %args = @_; | |
&assertUsage("writeToFile: usage: file contents", $args{file}, $args{contents}); | |
open(my $fh, '>', $args{file}); | |
print $fh $args{contents}; | |
close $fh; | |
return $args{file}; | |
} | |
# input: (%args) keys: worker task | |
# worker entry from &getClusterInfo | |
# task created by &createTask | |
# output: ($out) | |
sub work { | |
my %args = @_; | |
&assertUsage("work: usage: worker task", $args{worker}, $args{task}); | |
my $tid = threads->tid(); | |
# Create task file. | |
my $localTaskFile = "/tmp/parallel-process-repo-$$\_$tid-LOCAL.json"; | |
my $remoteTaskFile = "/tmp/parallel-process-repo-$$\_$tid-REMOTE.json"; | |
&writeToFile("file"=>$localTaskFile, "contents"=>encode_json($args{task}->{task})); | |
&transferFile("src"=>$localTaskFile, "dest"=>$remoteTaskFile, "worker"=>$args{worker}); | |
unlink $localTaskFile; | |
# Process task remotely and log output. | |
my $PATHprefix = $globals{PATHprefix} ? "PATH=$globals{PATHprefix}:\$PATH" : ""; | |
my $remoteCmd = "$PATHprefix $globals{workScript} $remoteTaskFile 2>/dev/null; rm $remoteTaskFile"; | |
my $out = &remoteCommand("accessCreds"=>$args{worker}, "command"=>$remoteCmd); | |
unlink $remoteTaskFile; | |
return $out; | |
} | |
# Thread. | |
# Forever: Check whether we should exit early. | |
# If so, set $EXIT_EARLY and then return. | |
# Otherwise, if workers have finished ($WORKERS_DONE), return. | |
# | |
# input: ($exitEarlyFile) | |
# output: () | |
sub thread_exitEarly { | |
my ($exitEarlyFile) = @_; | |
while (1) { | |
# Exit early? | |
if (-f $exitEarlyFile) { | |
{ lock($EXIT_EARLY); | |
$EXIT_EARLY = 1; | |
} | |
last; | |
} | |
# Workers done? | |
my $done; | |
{ lock($WORKERS_DONE); | |
$done = $WORKERS_DONE; | |
} | |
if ($done) { | |
last; | |
} | |
usleep(100*1000); # 100 ms | |
} | |
} | |
# input: ($clusterFile) | |
# output: @cluster: list of node objects with keys: host user port logicalCores | |
sub getClusterInfo { | |
my ($clusterFile) = @_; | |
&assertUsage("getClusterInfo: Error, usage: (clusterFile)", $clusterFile); | |
if (not -f $clusterFile or $clusterFile !~ m/\.json$/i) { | |
die "getClusterInfo: Error, invalid clusterFile <$clusterFile>\n"; | |
} | |
my ($out, $rc) = &cmd("cat $clusterFile 2>/dev/null"); | |
if ($rc) { | |
die "getClusterInfo: Error, could not read clusterFile <$clusterFile>: $!\n"; | |
} | |
my $cluster = eval { | |
return decode_json($out); | |
}; | |
if ($@) { | |
die "getClusterInfo: Error parsing clusterFile <$clusterFile>: $@\n"; | |
} | |
# Confirm nodes are valid. | |
my @cluster = @$cluster; | |
my $i = 0; | |
for my $node (@cluster) { | |
if (not &_isClusterNodeValid($node)) { | |
die "getClusterInfo: Error, node $i is invalid (0-indexed)\n"; | |
} | |
$i++; | |
} | |
# Augment with a "logicalCores" field if none provided | |
for my $node (@cluster) { | |
if (not defined $node->{logicalCores}) { | |
my $out = &remoteCommand("accessCreds"=>$node, "command"=>"nproc"); | |
if ($out =~ m/^(\d+)$/) { | |
$node->{logicalCores} = int($out); | |
} | |
else { | |
die "getClusterInfo: Error, could not get logical cores for node $node->{host}:\n$out\n"; | |
} | |
} | |
&log("$node->{host} has $node->{logicalCores} logical cores"); | |
} | |
return @$cluster; | |
} | |
# input: ($clusterNode) | |
# output: ($isValid) | |
sub _isClusterNodeValid { | |
my ($node) = @_; | |
my @keys = ("host", "port", "user"); | |
if (not $node) { | |
return 0; | |
} | |
for my $key (@keys) { | |
if (not $node->{$key}) { | |
return 0; | |
} | |
} | |
return 1; | |
} | |
### | |
# Usage message, arg parsing. | |
### | |
sub getTerseUsage { | |
my $terseUsage = "Usage: $0 --cluster C.json --workScript W --taskFile F | |
[--resultFile R] [--workers w1,...] [--notWorkers w1,...] | |
[--unusedCores N] | |
[--copy src:dest] [--PATHprefix dir1:...] | |
[--verbose] [--help] | |
"; | |
return $terseUsage; | |
} | |
sub shortUsage { | |
print &getTerseUsage(); | |
exit 0; | |
} | |
sub longUsage { | |
my $terseUsage = &getTerseUsage(); | |
print "Description: Distribute tasks across workers | |
$terseUsage | |
--cluster C.json JSON-formatted cluster of workers | |
Should be an array of \"node\" objects with minimal keys: host port user [logicalCores] [unusedCores] | |
host, port, user: suitable for passwordless ssh | |
[logicalCores]: skip query of node for # logical cores | |
[unusedCores]: override global --unusedCores | |
--workScript W Script to execute against each task | |
**Must exist on every worker** | |
Argument is a filename, its stdout is saved in resultFile | |
--taskFile F One task per line, JSON-encoded. | |
[--resultFile R] One result per line, NOT guaranteed in the same order | |
JSON-encoded. | |
[--workers w1,... | --notWorkers w1,... ] Workers to use | workers not to use | |
[--unusedCores N] Cores to leave available on a worker | |
[--copy src:dest] Copy src to dest on every worker before running workScript | |
Must be that src != dest. Can be a file or a dir. | |
[--PATHprefix dir1:...] Prefix PATH with this string when executing workScript | |
[--verbose] | |
[--help] | |
"; | |
} | |
# Process args after GetOptions, ensure validity, etc. | |
# | |
# input: (%args) from GetOptions | |
# output: (%globals) with keys: | |
# cluster listref of hashrefs representing nodes to use in the worker cluster | |
# workScript script to execute | |
# taskFile one task per line | |
# resultFile one result per line | |
# unusedCores cores to leave idle on each worker | |
# [copy_src dir on manager] | |
# [copy_dest dir on worker] | |
# [PATHprefix prefix for PATH when invoking workScript] | |
# verbose extra loud | |
sub processArgs { | |
my %args = @_; | |
my $invalidArgs = 0; | |
# Bail out on no args or help. | |
if (not scalar(keys %args)) { | |
&shortUsage(); | |
exit 0; | |
} | |
if ($args{help}) { | |
&longUsage(); | |
exit 0; | |
} | |
# cluster | |
my @cluster; | |
if ($args{cluster} and -f $args{cluster} and $args{cluster} =~ m/\.js(on)?$/i) { | |
&log("Cluster file $args{cluster}"); | |
@cluster = &getClusterInfo($args{cluster}); | |
# workers | |
if ($args{workers}) { | |
$args{workers} = [split(",", join(",", @{$args{workers}}))]; # --w x,y --w z becomes x,y,z | |
} | |
else { | |
$args{workers} = [map { $_->{host} } @cluster]; # Default to include all | |
} | |
# Filter in workers | |
&log("Filtering in workers <@{$args{workers}}>"); | |
@cluster = grep { &listContains($args{workers}, $_->{host}) } @cluster; | |
# my @filteredCluster; | |
# for my $maybeNode (@cluster) { | |
# if (&listContains($args{workers}, $maybeNode->{host})) { | |
# push @filteredCluster, $maybeNode; | |
# } | |
# } | |
# @cluster = @filteredCluster; | |
# notWorkers | |
if ($args{notWorkers}) { | |
$args{notWorkers} = [split(",", join(",", @{$args{notWorkers}}))]; # --w x,y --w z becomes x,y,z | |
} | |
else { | |
$args{notWorkers} = []; # Default to exclude none | |
} | |
# Filter out notWorkers | |
&log("Filtering out workers <@{$args{notWorkers}}>"); | |
@cluster = grep { not &listContains($args{notWorkers}, $_->{host}) } @cluster; | |
# @filteredCluster = (); | |
# for my $maybeNode (@cluster) { | |
# if (not &listContains($args{notWorkers}, $maybeNode->{host})) { | |
# push @filteredCluster, $maybeNode; | |
# } | |
# } | |
# @cluster = @filteredCluster; | |
my @survivingNames = map { $_->{host} } @cluster; | |
if (@survivingNames) { | |
&log("Using " . scalar(@cluster) . " workers <@survivingNames>"); | |
} | |
else { | |
&log("Error, no workers"); | |
$invalidArgs = 1; | |
} | |
} | |
else { | |
&log("Error, invalid cluster file"); | |
$invalidArgs = 1; | |
} | |
# workScript | |
if ($args{workScript}) { | |
&log("Using workScript $args{workScript}"); | |
# Can't use -f. | |
# workScript might not exist *anywhere* yet before we honor --copy. | |
# And this node might not be a worker so -f would still fail. | |
} | |
# taskFile | |
if ($args{taskFile} and -f $args{taskFile}) { | |
&log("Using taskFile $args{taskFile}"); | |
} | |
else { | |
&log("Error, invalid taskFile"); | |
$invalidArgs = 1; | |
} | |
# resultFile | |
if (not $args{resultFile}) { | |
$args{resultFile} = "/tmp/parallel-work-pid$$-results"; | |
} | |
unlink $args{resultFile}; | |
&log("Using resultFile $args{resultFile}"); | |
# unusedCores | |
if (not defined $args{unusedCores}) { | |
$args{unusedCores} = 0; | |
} | |
if (0 <= $args{unusedCores}) { | |
&log("Using unusedCores $args{unusedCores}"); | |
} | |
else { | |
&log("Error, invalid unusedCores $args{unusedCores}"); | |
$invalidArgs = 1; | |
} | |
# copy | |
my ($copySrc, $copyDest); | |
if (defined $args{copy}) { | |
my @spl = split(":", $args{copy}); | |
if (scalar(@spl) == 2) { | |
my ($src, $dest) = @spl; | |
if ($src ne $dest and -e $src) { | |
&log("Using copySrc $src copyDest $dest"); | |
$copySrc = $src; | |
$copyDest = $dest; | |
} | |
else { | |
&log("Error, invalid copy src $src dest $dest. Must have src != dest, and src must exist"); | |
$invalidArgs = 1; | |
} | |
} | |
else { | |
&log("Error, malformed copy. Must be src:dest"); | |
$invalidArgs = 1; | |
} | |
} | |
# PATHprefix | |
if (not defined $args{PATHprefix}) { | |
$args{PATHprefix} = ""; | |
} | |
# verbose | |
if (not defined $args{verbose}) { | |
$args{verbose} = 0; | |
} | |
# Error out on invalid args. | |
if ($invalidArgs) { | |
&shortUsage(); | |
exit 1; | |
} | |
my %globals = ("cluster" => \@cluster, | |
"workScript" => $args{workScript}, | |
"taskFile" => $args{taskFile}, | |
"resultFile" => $args{resultFile}, | |
"unusedCores" => $args{unusedCores}, | |
"copy_src" => $copySrc, | |
"copy_dest" => $copyDest, | |
"PATHprefix" => $args{PATHprefix}, | |
"verbose" => $args{verbose}, | |
); | |
return %globals; | |
} | |
### | |
# Task management. | |
### | |
# input: (%args) keys: taskFile | |
# output: (@tasks) | |
# Each elt contains a task from &task_create, | |
# where the 'task' is a json-decoded version of one of the lines from the taskFile. | |
sub loadTasks { | |
my %args = @_; | |
&assertUsage("Usage: taskFile", $args{taskFile}); | |
lock($TASK_LOCK); | |
return if ($tasksLoaded); | |
$tasksLoaded = 1; | |
my @taskLines = `cat $args{taskFile}`; | |
chomp @taskLines; | |
my $_id = 1; # 1-indexed for sanity | |
my @tasks; | |
for my $line (@taskLines) { | |
&log("Task line: <$line>"); | |
my $task = decode_json($line); | |
my $id = $_id; | |
push @tasks, &task_create("id"=>$id, "task"=>$task); | |
$_id++; | |
} | |
return @tasks; | |
} | |
# Get the next task from @TASKS. | |
# | |
# input: () | |
# output: ($task) or $NO_TASKS_LEFT | |
# $task as created by &task_create. | |
sub getNextTask { | |
lock($TASK_LOCK); | |
&assert(($tasksLoaded), "getNextTask: Tasks never loaded"); | |
if (not @{$TASKS}) { | |
return $NO_TASKS_LEFT; | |
} | |
my $nextTask = shift @{$TASKS}; | |
&log("getNextTask: got <" . &task_toString($nextTask) . ">, " . scalar(@${TASKS}) . " tasks remaining"); | |
return $nextTask; | |
} | |
# input: (%args) keys: id task | |
# output: ($task) a ref with keys: id task | |
sub task_create { | |
my (%args) = @_; | |
&assertUsage("createTask: usage: id task", $args{id}, $args{task}); | |
return { "id" => $args{id}, | |
"task" => $args{task}, | |
}; | |
} | |
sub task_toString { | |
my ($task) = @_; | |
return "$task->{id}: " . encode_json($task->{task}); | |
} | |
### | |
# Result management. | |
### | |
# input: ($resultFile) | |
# output: ($FH) | |
sub openResultFile { | |
my ($resultFile) = @_; | |
lock($RESULT_LOCK); | |
if (not $resultFHOpened) { | |
&log("emitResult: Opening $resultFile for results"); | |
open($RESULT_FH, ">", $resultFile) or confess "Error, could not open resultFile: $!\n"; | |
$resultFHOpened = 1; | |
} | |
return $RESULT_FH; | |
} | |
# input: ($result) result object with keys: task workerInfo output | |
# task: from &getNextTask | |
# result: command-line output | |
# output: () | |
sub emitResult { | |
my ($result) = @_; | |
lock($RESULT_LOCK); | |
if (not $resultFHOpened) { | |
confess "Error, must call openResultFile first\n"; | |
} | |
my $encodedResult = encode_json($result); | |
&log("emitResult: Emitting: <$encodedResult>"); | |
print $RESULT_FH "$encodedResult\n"; | |
return; | |
} | |
sub noMoreResults { | |
if ($RESULT_FH) { | |
close($RESULT_FH) or &log("Error, closing result_fh failed: $!"); | |
} | |
} | |
### | |
# Handle copy | |
### | |
# input: (%args) keys: src dest destAccessCreds | |
# destAccessCreds: array ref of hashrefs of destAccessCreds for use with remoteCopy | |
# output: () | |
sub propagateCopyDir { | |
my %args = @_; | |
my @helpers; | |
# Parallel propagation. | |
for my $cred (@{$args{destAccessCreds}}) { | |
my %args = ("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$cred); | |
my $thr = threads->create(sub { | |
my %args = @_; | |
my $rc = &remoteCopy("src"=>$args{src}, "dest"=>$args{dest}, "destAccessCreds"=>$cred); | |
return $rc; | |
}, %args); | |
push @helpers, $thr; | |
} | |
# Confirm that all propagation succeeded. | |
my @results; | |
for my $helper (@helpers) { | |
push @results, $helper->join(); | |
} | |
if (grep { $_ ne 0 } @results) { | |
confess "Error, at least one copy failure: <@results>\n"; | |
} | |
return; | |
} | |
### | |
# Utility | |
### | |
# input: (\@list, $e) | |
# output: true if $e is in @list, else false | |
sub listContains { | |
my ($list, $e) = @_; | |
for my $elt (@$list) { | |
if ($elt eq $e) { | |
return 1; | |
} | |
} | |
return 0; | |
} | |
sub min { | |
my (@nums) = @_; | |
my $min = $nums[0]; | |
for my $n (@nums) { | |
if ($n < $min) { | |
$min = $n; | |
} | |
} | |
return $min; | |
} | |
sub max { | |
my (@nums) = @_; | |
my $max = $nums[0]; | |
for my $n (@nums) { | |
if ($max < $n) { | |
$max = $n; | |
} | |
} | |
return $max; | |
} | |
sub assert { | |
my ($cond, $msg) = @_; | |
if (not $cond) { | |
print "ERROR: $msg\n"; | |
exit 1; | |
} | |
} | |
# input: ($msg, @varsThatShouldBeDefined) | |
# output: () | |
sub assertUsage { | |
my ($msg, @shouldBeDefined) = @_; | |
my @undefined = grep { not defined $_ } @shouldBeDefined; | |
&assert((not @undefined), $msg); | |
} | |
# input: ($cmd) | |
# output: ($out, $rc) | |
sub cmd { | |
my ($cmd) = @_; | |
&log($cmd); | |
my $out = `$cmd 2>&1`; | |
return ($out, $? >> 8); | |
} | |
sub log { | |
my ($msg) = @_; | |
my $now = localtime; | |
lock($LOG_LOCK); | |
print STDERR "$now: $msg\n"; | |
} | |
# input: (%args) keys: accessCreds command | |
# accessCreds: hashref, keys: port user host | |
# hint: a worker object can be used as accessCreds | |
# command: string to execute over ssh | |
# output: ($out) | |
sub remoteCommand { | |
my %args = @_; | |
&assertUsage("remoteCommand: Error, usage: hash with keys: accessCreds command", $args{accessCreds}, $args{command}); | |
my $cmd = "ssh -p$args{accessCreds}->{port} $args{accessCreds}->{user}\@$args{accessCreds}->{host} '$args{command}'"; | |
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it. | |
return ($out); | |
} | |
# Copy local file to remote host | |
# Recursively copy dirs | |
# | |
# input: (%args) keys: src dest destAccessCreds | |
# src/dest: files or dirs | |
# destAccessCreds: hashref, keys: port user host | |
# output: ($rc) 0 success, non-zero failure | |
sub remoteCopy { | |
my %args = @_; | |
my $cmd = "scp -r -P$args{destAccessCreds}->{port} $args{src} $args{destAccessCreds}->{user}\@$args{destAccessCreds}->{host}:$args{dest}"; | |
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it. | |
return ($rc); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
I've made a few extensions, haven't figured out how to push yet.