Last active
August 29, 2015 13:56
-
-
Save cho45/8858846 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
use utf8; | |
use strict; | |
use warnings; | |
use lib lib => glob 'modules/*/lib'; | |
use constant MAX_WORKER => 2; | |
use constant MAX_REQUEST_PER_CHILD => 100; | |
use constant INTERVAL => $ENV{INTERVAL} || 5; | |
use constant NAMESPACE => 'MyApp::Worker'; | |
my $databases = [ ... ]; | |
use TheSchwartz; | |
use Log::Minimal; | |
use Module::Find (); | |
use DBIx::DisconnectAll; | |
use Parallel::Prefork; | |
my $pm = Parallel::Prefork->new({ | |
max_workers => MAX_WORKER, | |
trap_signals => { | |
TERM => 'TERM', | |
HUP => 'INT', | |
INT => 'INT', | |
USR1 => undef, | |
} | |
}); | |
infof("[%d] Worker starting...", $$); | |
$0 = "worker master"; | |
while ($pm->signal_received !~ 'TERM|INT|HUP') { | |
$pm->start(sub { | |
infof("[%d] New worker started", $$); | |
my $run = 1; | |
my $count = 0; | |
$SIG{INT} = $SIG{HUP} = sub { | |
infof("[%d] Signal received will teminate", $$); | |
$run = 0; | |
}; | |
$SIG{TERM} = sub { | |
infof("[%d] SIGTERM received. Exit immediately", $$); | |
exit 1; | |
}; | |
sleep rand() * INTERVAL; | |
my $client = TheSchwartz->new( | |
databases => $databases, | |
); | |
my $workers = [ Module::Find::useall(NAMESPACE) ]; | |
for my $worker (@$workers) { | |
infof('[%d] Enable working for %s', $$, $worker); | |
$client->can_do($worker); | |
} | |
while ($run && ($count < MAX_REQUEST_PER_CHILD)) { | |
if (getppid == 1) { infof("I'm zombie..."); exit 1; } | |
$0 = sprintf("worker slave(%d)", $count); | |
my $job = $client->find_job_for_workers; | |
if (!$job && @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) { | |
$client->restore_full_abilities; | |
$job = $client->find_job_for_workers; | |
} | |
if ($job) { | |
infof('[%d] job:%d Work %s', $$, $job->jobid, $job->funcname); | |
$0 = sprintf("worker slave(%d) >%s %s", $count, $job->jobid, $job->funcname); | |
$client->work_once($job); | |
$count++; | |
my $exit_status = $job->exit_status; | |
if (!$exit_status) { | |
my $done = defined $exit_status ? 'Success' : 'Done(Status Unknown)'; | |
infof('[%d] job:%d %s %s', $$, $job->jobid, $done, $job->funcname); | |
} else { | |
critf("[%d] job:%d %s", $$, $job->jobid, join("\n", $job->failure_log)); | |
critf('[%d] job:%d Failed %s', $$, $job->jobid, $job->funcname); | |
} | |
# Disconnect all db handles for MHA | |
dbi_disconnect_all(); | |
} else { | |
# Disconnect all db handles for MHA | |
dbi_disconnect_all(); | |
sleep INTERVAL; | |
} | |
} | |
infof("[%d] Worker has finished (worked:%d)", $$, $count); | |
}); | |
} | |
infof("[%d] Worker exiting...", $$); | |
$0 = "worker master: exiting..."; | |
$pm->signal_all_children('INT'); | |
$pm->signal_all_children('INT'); | |
$pm->wait_all_children(); | |
infof("[%d] Worker exit", $$); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment