Skip to content

Instantly share code, notes, and snippets.

@saillinux
Last active December 31, 2015 11:59
Show Gist options
  • Save saillinux/7982913 to your computer and use it in GitHub Desktop.
Save saillinux/7982913 to your computer and use it in GitHub Desktop.
Graph를 이용한 DAG 빌드 및 스케쥴링 타스크
use strict;
use warnings;
use Graph;
use Data::Dumper;
use Capture::Tiny ':all';
use constant {
WAITING => 0,
RUNNING => 1,
DONE => 2,
FAIL => 3,
};
# TODO: callback example
my %nodes = (
"Task1" => {
"command" => "uname -a",
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
"Task2" => {
"command" => "perl -V",
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
"Task3" => {
"command" => "python --version",
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
"Task4" => {
"command" => "emacs --version",
"start_time" => 0,
"end_time" => 0,
"state" => WAITING,
},
);
my %edges = (
"Task1" => [ "Task2", "Task3" ],
"Task2" => [ "Task4" ],
"Task3" => [ "Task4" ],
"Task4" => [ ],
);
my $g0 = Graph->new;
# add each task to the graph as node
foreach my $task (keys %nodes) {
$g0->add_vertex($task);
}
# connect each task
foreach my $task (keys %edges) {
foreach my $dep (@{$edges{$task}}) {
$g0->add_edge($task, $dep);
}
}
print "The graph is $g0\n";
validate($g0);
scheduler($g0);
exit(0);
sub validate {
my $DAG = shift;
if ($DAG->is_cyclic) {
print "This graph contains a cycle which forms a loop in execution.\n";
exit;
}
unless ($DAG->is_dag) {
print "This graph is not a directed and acyclic graph so exiting...\n";
exit;
}
my @heads = ();
my @tasks = $DAG->vertices;
foreach my $task ( @tasks ) {
my $in_degree = $DAG->in_degree($task);
unless ($in_degree) {
push @heads, $task;
}
}
if (@heads > 1) {
print "There is more than one execution start points\n";
exit;
}
}
sub scheduler {
my $DAG = shift;
my @ts = $DAG->topological_sort;
# TODO: state check and callback function run
foreach my $task ( @ts ) {
if ( $DAG->in_degree($task) ) {
print "check whether predecessors of [$task] were executed successfully\n";
foreach my $predecessor ( $DAG->predecessors($task) ) {
if ( $nodes{$predecessor}->{'state'} == FAIL ) {
print "The predecessor [$predecessor] of $task was failed so exiting...\n";
exit;
} elsif ( $nodes{$predecessor}->{'state'} == DONE ) {
print "The predecessor [$predecessor] of $task ran successful so continuing...\n";
} else {
print "something went wrong exiting...\n";
exit;
}
}
} else {
print "$task is a head, starting now\n";
}
my $node = $nodes{$task};
print "running task [$task]\n";
$node->{'state'} = RUNNING;
$node->{'start_time'} = time();
my @cmd = split / /, $nodes{$task}->{'command'};
@$node{'stdout', 'stderr', 'exit'} = capture {
system @cmd;
};
$node->{'end_time'} = time();
unless ($node->{'exit'}) {
$node->{'state'} = DONE;
} else {
$node->{'state'} = FAIL;
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment