Skip to content

Instantly share code, notes, and snippets.

@saillinux
Created December 20, 2013 11:56
Show Gist options
  • Save saillinux/8053771 to your computer and use it in GitHub Desktop.
Save saillinux/8053771 to your computer and use it in GitHub Desktop.
DAG 기반의 타스크 스케쥴러 구현

Perl이 제공 하는 이득과 혜택 덕분에 작성한 툴과 서비스를 헤아리면 끝이 없을 정도로 많은 일을 헤쳐 나갔습니다. 뒤돌아 보면 Perl 없이 어떻게 처리 했을고 하고 아찔 해 하곤 하네요.

주로 Perl을 이용하여 자동화를 구현 해온 저에게는 Perl이 주는 가능성은 정말로 끝이 없답니다. 덕분에 허세와 욕심을 부리게 되고 더욱 많은것을 할라고 추구 하게 되는거죠.

지금까지 우리는 자동화를 할때면 스크립트를 작성하거나 모듈을 작성 하여 여러군데에 적용 하여 일을 처리 하곤 하였습니다. 허나 더욱 고난이도의 일을 처리 하다 보니 타스크 의존성을 요구하는 일들이 많이 생기고 있습니다. 이를 스크립트를 작성해서 처리 하다 보면 다른 곳에서도 같은 코드를 작성하게 되고 이런 패턴을 일일히 모듈화 해서 새로운 스크립트에 적용하여 작성하는것도 번거롭습니다.

근래 관심을 두게 된 방법론이 Flow Based Programming입니다만 쉽게 적용 할수 없는 이론이라서 그중에 몇가지 아이디어 만을 적용 하여 프로그램을 작성 해 보도록 하겠습니다.

  • Unix Phiolosophy를 숙지 하여 하나의 기능 만을 완벽히 잘하는 Component를 구현 합니다. (e.g. ls, grep, awk, sed, xargs, and more)
  • 이렇게 해서 구현한 Component를 서로 연결 시켜 프로그램을 작성하게 끔 도와 주는 툴을 만듭니다. 이를 통해 Component 재 사용 및 Availability를 개선해 줍니다.
  • 프로그램 규모가 커지면 소수의 인원 만으로 기능 구현이 힘들어집니다. 간단하고 직관적인 Framework를 제공하여 여러 인원들이 쉽게 Component를 작성하게 하여 수고를 덥니다.

아래와 같이 타스크 의존성을 존중하여 프로그램을 실행 해야 하는 서비스를 만들어 보도록 하겠습니다. 물론 스크립트 하나로 작성하면 편하겠지만 지속 적으로 추가 되는 기능 사항을 소화 하기 위해서 새로운 시도를 해 보도록 하겠습니다.

아래의 서비스는 야후 파이낸스에서 에서 지정한 주식들 중에 어떤 것이 가장 이득을 많이 냈는지 비교 하는 간단한 서비스 입니다.

Figure 1. Stock winner test

첫번째 타스크는 야후 파이낸스 API 서비스를 이용하여 요즘 관심 가져 보고 있는 주식인 Twitter, Facebook, Tesla등의 데이터를 가져 옵니다.

데이터를 무사히 가져 오면 Task2,3은 retrieve_stock component를 실행하여 twitter와 facebook이 Change 값을 참고 하여 가져 옵니다.

retrieve_stock 함수를 참조.

이렇게 해서 얻은 Change값을 비교화여 어떤 주식이 오늘 이득을 많이 볼수 있었는지 aggregator component를 통해 출력 합니다.

aggregator 함수를 참조.

이를 구현 하기 위해서는 각 Task가 사용 하는 component 구현과 스케쥴러를 디자인 해 보도록 합니다.

첫째로 위의 타스크 위존성을 저장해 줄수 있는 자료구조가 필요합니다. 여기서는 비유향 방향 그래프 (Directed Acyclic Graph, 줄여서 DAG)라는 그래프 구조체를 이용하여 각 타스크(node)와 의존선(edge)을 저장해 보도록 합니다. DAG는…

CPAN에서 제공하는 Graph 모듈은 모든 언어를 통틀어 현존하는 Graph library중에서는 최고라고 자부 할수 있을 정도로 모든 필요한 기능을 제공합니다. cpanm Graph를 이용하여 먼저 설치 하도록 합니다.

첫째로 해야 할일은 Graph를 구축하는 겁니다. 이를 위해 $g0라는 그래프 구조체를 생성 하였고 여기에 node 구조체로 정의된 타스크를 vertex로 그래프에 저장합니다.

my $g0 = Graph->new;

각 노드는 해쉬 키로 지정하여 가져 올수 있으면 각 노드 구조체는 다음과 같은 Property를 가집니다.

  • action: 실제로 타스크가 수행해야 하는 command tool 혹은 callback 함수를 정의 합니다.
  • params: action을 실행하기 위해 필요한 인자를 정의 합니다.
  • start, end time: 타스크가 실행 시작 했던 시점과 끝난 시점을 저장합니다.
  • state: 현재 타스크의 상태를 저장합니다. e.g. WAITING, RUNNING, DONE, FAIL
  • stdout, stderr, error: action의 수행 결과물들을 나중에 명령 수행 후 저장합니다.

my %nodes = ( “Task1” => { “action” > "curl", "params" => ["http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quote%20where%20symbol%20in%20(%22TWTR%22%2C%22FB%22%2C%22TSLA%22%2C%22XOM%22)&format=json&diagnostics=true&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys&callback”], “start_time” => 0, “end_time” => 0, “state” => WAITING, }, “Task2” => { “action” => \&retrieve_stock, “params” => [“TWTR”, “Change”], “start_time” => 0, “end_time” => 0, “state” => WAITING, }, “Task3” => { “action” => \&retrieve_stock, “params” => [“FB”, “Change”], “start_time” => 0, “end_time” => 0, “state” => WAITING, }, “Task4” => { “action” => \&aggregator, “params” => [], “start_time” => 0, “end_time” => 0, “state” => WAITING, }, );

foreach my $task (keys %nodes) { $g0->add_vertex($task); }

그리고 의존성을 edge로 표현하여 아래와 같이 표현하고 더하도록 하겠습니다. 아래의 구조체를 간단히 설명 하자면 %edges의 키중 하나인 Task1은 Task2, Task3와 연결 한다고 정의 해보았습니다. 이를 add_edge함수를 이용하여 더해 보았습니다.

my %edges = ( “Task1” => [ “Task2”, “Task3” ], “Task2” => [ “Task4” ], “Task3” => [ “Task4” ], “Task4” => [ ], );

o# connect each task foreach my $task (keys %edges) { foreach my $dep (@{$edges{$task}}) { $g0->add_edge($task, $dep); } }

그래프 객체인 $g0를 그대로 아래와같이 출력 하면 그래프 구조를 바로 아래와 같이 보여 줍니다.

print “INFO: The graph is $g0\n”;

출력물: INFO: The graph is Task1-Task2,Task1-Task3,Task2-Task4,Task3-Task4

타스크들을 실행하기 앞서 몇개의 validation을 거치도록 하겠습니다.

첫째로 graph ($DAG) 자체가 directed 그래프인지 확인 하도록 합니다. 즉 node와 node 사이를 연결해 주는 모든 edge가 방향성을 가지고 있는지 아래를 통해 확인 하도록 합니다.

unless ($DAG->is_dag) { print “FATAL: This graph is not a directed and acyclic graph so exiting…\n”; exit; }

둘째로 그래프 자체에 cycle을 가지는 의존성이 있는지 확인 합니다. 만약 node들 사이 중에 loop가 존재한다면 무한적으로 타스크를 실행하게 되는 문제가 발생됨으로 아래의 테스트를 미연에 방지를 합니다.

if ($DAG->is_cyclic) { print “FATAL: This graph contains a cycle which forms a loop in execution.\n”; exit; }

마지막으로 타스크를 시작 하는 지점이 여러군데가 존재하는지 확인 합니다. 물론 여러 시작 지점이 있는것에 문제가 있는 것은 아니지만 간단한 구현을 위해 일단 확인 하도록 하겠습니다. 가장 간단한 방법은 노드들 중에 in_degree가 없는 즉 의존성이 없는 타스크가 하나 이상 존재 한다면 시작 지점이 여러군데인걸로 체크하는 예제입니다.

my @heads = (); my @tasks = $DAG->vertices; foreach my $task ( @tasks ) { my $in_degree = $DAG->in_degree($task); unless ($in_degree) { push @heads, $task; } }

if (@heads > 1) { print “FATAL: There is more than one execution start points\n”; exit; }

모든 validation이 통과 했으면 실제로 타스크를 실행 시켜 주는 스케쥴러를 구현 하도록 합니다.

첫째로 Graph ($DAG)에 위상정렬(topological sorting)을 실행 하여 수행해야 하는 타스크의 순서를 가져 오도록 합니다.

my @ts = $DAG->topological_sort;

위상정렬에 대한 상세한 내용은 여기에서 http://ko.wikipedia.org/wiki/%EC%9C%84%EC%83%81%EC%A0%95%EB%A0%AC

순서가 정해진 타스크를 @ts에 저장하여 이를 하나 하나 실행 하도록 합니다.

foreach my $task ( @ts ) { if ( $DAG->in_degree($task) ) { print “INFO: check whether predecessors of [$task] were executed successfully\n”; foreach my $predecessor ( $DAG->predecessors($task) ) { if ( $nodes{$predecessor}->{‘state’} == FAIL ) { print “FATAL: The predecessor [$predecessor] of $task was failed so exiting…\n”; exit; } elsif ( $nodes{$predecessor}->{‘state’} == DONE ) { print “INFO: The predecessor [$predecessor] of $task ran successful so continuing…\n”; } else { print “FATAL: something went wrong exiting…\n”; exit; } } } else { print “INFO: $task is the head, starting this task now\n”; }

우리는 시작이 먼저 되야 하는 타스크가 무엇인지 확인 하기 위해 in_degree 체크를 통해 먼저 실행 되어야 하는 타스크가 있는지 확인합니다. 즉 in_degree가 0이면 시작 지점으로 간주 하여 바로 실행 합니다.

의존성을 지니고 있는 타스크가 있다면 predecessors 함수를 사용하여 현재 타스크가 실행 되기 전에 필요한 타스크를 리스트 하여 해당 타스크의 상태를 확인 합니다. 그중에 하나라도 FAIL한 타스크가 있다면 동작을 중단 합니다.

현재 실행 해야 하는 타스크의 노드 구조체를 가져온후 상태값과 시작 시간을 업데이트 해줍니다.

my $node = $nodes{$task}; print “INFO: running task [$task]\n”;

$node->{‘state’} = RUNNING; $node->{‘start_time’} = time();

타스크가 수행해야 하는 작업과 이를 위해 필요한 인자값및 이전에 실행 되었던 타스크 리스트를 준비합니다. my $action = $nodes{$task}->{‘action’}; my @params = @{$nodes{$task}->{‘params’}}; my @predecessors = $DAG->predecessors($task);

action이 만약 CODE 리퍼런스이면 인자값을 전달하여 함수로 실행 하도록 합니다. 여기서 predecessors를 인자로 전달하는 이유는 이전 타스크들의 결과물들을 참조 하기 위해서라고 이해 하시면 됩니다. 만약 CODE가 아니면 command tool로 간주하면 바로 실행 하도록 합니다. 여기서는 John Kang님이 소개 해 주셨던 Capture::Tiny툴을 이용하여 쉽게 stdout, stderr, exit값을 가져 왔습니다.

if ( ref $action eq ‘CODE’ ) { $action->($task, { “preds” => \@predecessors, “params” => \@params, }); } else { @$node{‘stdout’, ‘stderr’, ‘exit’} = capture { system $action, @params; }; }

수행 결과가 완료 되었다면 완료 시간과 수행 결과 상태를 업데이트 해 줍니다.

$node->{‘end_time’} = time();

unless ($node->{‘exit’}) { $node->{‘state’} = DONE; } else { $node->{‘state’} = FAIL; }

결과물: INFO: The graph is Task1-Task2,Task1-Task3,Task2-Task4,Task3-Task4 INFO: Task1 is the head, starting this task now INFO: running task [Task1] INFO: check whether predecessors of [Task3] were executed successfully INFO: The predecessor [Task1] of Task3 ran successful so continuing… INFO: running task [Task3] INFO: check whether predecessors of [Task2] were executed successfully INFO: The predecessor [Task1] of Task2 ran successful so continuing… INFO: running task [Task2] INFO: check whether predecessors of [Task4] were executed successfully INFO: The predecessor [Task2] of Task4 ran successful so continuing… INFO: The predecessor [Task3] of Task4 ran successful so continuing… INFO: running task [Task4] OUTPUT: The winner is TWTR by change +1.98

마지막으로 정리하며:

여러분이 타스크 의존도를 존중하여 구현 하여야 하는 서비스가 있다면 아마도 지금까지 구현 한 방법과 크게 다를 바 없이 구현 할것이라 생각 됩니다.

좀더 욕심을 내서 구현 하자면 아래와 같은 것을 적용 하여 실제로 사용 할수 있는 프레임워크로 구현 가능 할수 있을 것입니다.

  • http://noflojs.org/ 에서 제공 하는 UI를 이용하여 타스크 생성및 의존성을 정의 해 주는 프론트 엔드 구현. 이를 통해 node및 edge를 가져와 DAG 자료 구조를 생성을 도와줌.
  • 각 타스크를 process혹은 thread로 구현 하여 지속 적으로 작동 하도록 구현.
  • Zookeeper나 Redis 같은 백엔드를 이용하여 타스크간의 상태 혹은 결과물을 공유 하도록 구현.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment