Skip to content

Instantly share code, notes, and snippets.

@saillinux
Created December 20, 2013 11:57
Show Gist options
  • Save saillinux/8053779 to your computer and use it in GitHub Desktop.
Save saillinux/8053779 to your computer and use it in GitHub Desktop.
DAG Scheduler in perl
use strict;
use warnings;
use Graph;
use Data::Dumper;
use JSON;
use Capture::Tiny ':all';
use constant {
WAITING => 0,
RUNNING => 1,
DONE => 2,
FAIL => 3,
};
my %nodes = (
"Task1" => {
"action" => "curl",
"params" => ["http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quote%20where%20symbol%20in%20(%22TWTR%22%2C%22FB%22%2C%22TSLA%22%2C%22XOM%22)&format=json&diagnostics=true&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys&callback="],
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
"Task2" => {
"action" => \&retrieve_stock,
"params" => ["TWTR", "Change"],
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
"Task3" => {
"action" => \&retrieve_stock,
"params" => ["FB", "Change"],
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
"Task4" => {
"action" => \&aggregator,
"params" => [],
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
);
my %edges = (
"Task1" => [ "Task2", "Task3" ],
"Task2" => [ "Task4" ],
"Task3" => [ "Task4" ],
"Task4" => [ ],
);
my $g0 = Graph->new; # there is a song called zero g love in Macross
# add each task to the graph as node
foreach my $task (keys %nodes) {
$g0->add_vertex($task);
}
# connect each task
foreach my $task (keys %edges) {
foreach my $dep (@{$edges{$task}}) {
$g0->add_edge($task, $dep);
}
}
print "INFO: The graph is $g0\n";
validate($g0);
scheduler($g0);
# print Dumper(\%nodes);
exit(0);
sub validate {
my $DAG = shift;
unless ($DAG->is_dag) {
print "FATAL: This graph is not a directed and acyclic graph so exiting...\n";
exit;
}
if ($DAG->is_cyclic) {
print "FATAL: This graph contains a cycle which forms a loop in execution.\n";
exit;
}
my @heads = ();
my @tasks = $DAG->vertices;
foreach my $task ( @tasks ) {
my $in_degree = $DAG->in_degree($task);
unless ($in_degree) {
push @heads, $task;
}
}
if (@heads > 1) {
print "FATAL: There is more than one execution start points\n";
exit;
}
}
sub scheduler {
my $DAG = shift;
my @ts = $DAG->topological_sort;
foreach my $task ( @ts ) {
if ( $DAG->in_degree($task) ) {
print "INFO: check whether predecessors of [$task] were executed successfully\n";
foreach my $predecessor ( $DAG->predecessors($task) ) {
if ( $nodes{$predecessor}->{'state'} == FAIL ) {
print "FATAL: The predecessor [$predecessor] of $task was failed so exiting...\n";
exit;
} elsif ( $nodes{$predecessor}->{'state'} == DONE ) {
print "INFO: The predecessor [$predecessor] of $task ran successful so continuing...\n";
} else {
print "FATAL: something went wrong exiting...\n";
exit;
}
}
} else {
print "INFO: $task is the head, starting this task now\n";
}
my $node = $nodes{$task};
print "INFO: running task [$task]\n";
$node->{'state'} = RUNNING;
$node->{'start_time'} = time();
my $action = $nodes{$task}->{'action'};
my @params = @{$nodes{$task}->{'params'}};
my @predecessors = $DAG->predecessors($task);
if ( ref $action eq 'CODE' ) {
$action->($task, {
"preds" => \@predecessors,
"params" => \@params,
});
} else {
@$node{'stdout', 'stderr', 'exit'} = capture {
system $action, @params;
};
}
$node->{'end_time'} = time();
unless ($node->{'exit'}) {
$node->{'state'} = DONE;
} else {
$node->{'state'} = FAIL;
}
}
};
sub retrieve_stock {
my ($self, $args) = @_;
my $task = $args->{"preds"}[0];
my ($stock, $field) = @{$args->{"params"}};
my $json = decode_json($nodes{$task}->{'stdout'});
my @quotes = @{$json->{'query'}{'results'}{'quote'}};
foreach my $entry ( @quotes ) {
if ($entry->{'symbol'} eq $stock) {
$nodes{$self}->{'stdout'} = $entry->{$field};
$nodes{$self}->{'exit'} = 0;
}
}
unless (exists $nodes{$self}->{'stdout'}) {
$nodes{$self}->{'exit'} = 1;
}
}
sub aggregator {
my ($self, $args) = @_;
my %changes = ();
foreach my $task (@{$args->{"preds"}}) {
my $stock = $nodes{$task}->{'params'}[0];
my $change = $nodes{$task}->{'stdout'};
$changes{$stock} = $change;
}
my @sorted = sort { $changes{$b} <=> $changes{$a} } keys %changes;
my $winner = $sorted[0];
$nodes{$self}->{'stdout'} = $winner;
print "OUTPUT: The winner is $winner by change $changes{$winner}\n";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment