Skip to content

Instantly share code, notes, and snippets.

@akahan
Created December 9, 2015 23:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save akahan/8ceae6c5589d4c702a8e to your computer and use it in GitHub Desktop.
Save akahan/8ceae6c5589d4c702a8e to your computer and use it in GitHub Desktop.
sub run {
my ($self) = @_;
$0 = $self->{procname};
# Создаем процессы слейвов
for my $slave_id (sort {$a <=> $b} keys %{$self->{_respawn_ctl}}) {
debug "Spawning slave: $slave_id";
my $slave = AsyncJobsServer::Slave->new(
slave_id => $slave_id,
procname => $self->{_init_ops}->{procname},
);
$self->fork_and_run($slave);
$self->add_slave( $slave );
}
debug "All slaves spawned";
$self->{_respawn_ctl} = {};
$self->{_cv} = AnyEvent->condvar();
# Наблюдаем за SIGTERM
$self->{_sigterm} = AnyEvent->signal(
signal => 'TERM',
cb => sub {
debug "SIGTERM received";
$self->process_signal('TERM');
exit 1;
}
);
# Наблюдаем за SIGINT
$self->{_sigint} = AnyEvent->signal(
signal => 'INT',
cb => sub {
debug "SIGINT received";
$self->process_signal('INT');
exit 1;
}
);
my $jc = AsyncJobsServer::JobsController->new;
# Запускаем AnyEvent tcp_server
$self->{_tcp_server_guard} = tcp_server $self->{_init_ops}->{host}, $self->{_init_ops}->{port}, sub {
my $fh = shift or die $!;
my $client_handle;
$client_handle = AnyEvent::Handle->new(
fh => $fh,
on_eof => sub {
$client_handle->destroy();
},
);
$client_handle->push_read( json => sub {
my ( $h, $request ) = @_;
debug "Requested command: $request->{command}";
if ($request->{command} eq 'job') {
$request->{job_id} or die "job_id required";
my $response = {
job_id => $request->{job_id},
};
if ( my $job = $jc->job_result( $request->{job_id} ) ) {
$response->{result} = $job->{_result};
$response->{finished_time} = $job->{_finished_time};
$response->{created_time} = $job->{_created_time};
}
else {
$response->{in_process} = 1;
debug "Job $request->{job_id} in process";
}
$client_handle->push_write( json => $response );
$client_handle->push_shutdown();
return;
}
my ($job_id, $job_mod) = $jc->job_create($request->{command}) or die "Can't create a job";
# Выбираем каждый слейв по кругу
my $slave = $self->next_slave();
debug "Slave choosed: $self->{_next_slave_id}";
my $slave_handle = AnyEvent::Handle->new(
fh => $slave->{_socket},
on_read => sub {
shift->push_read( json => sub {
my ( $h, $job_result ) = @_;
$jc->job_done( $job_id, $job_result );
});
}
);
# Поставим задачу
$slave_handle->push_write($job_mod."\n");
# Возвратим клиенту job_id
my $response = {
job_id => $job_id
};
$client_handle->push_write( json => $response );
$client_handle->push_shutdown();
});
};
my $command = $self->{_cv}->recv();
if ($command eq 'RESPAWN') {
$self->{_cv} = undef;
$self->{_tcp_server_guard} = undef;
$self->{_sigterm} = undef;
$self->{_sigint} = undef;
$self->run();
}
$self->term_slaves();
exit 1;
}
sub fork_and_run {
my ( $self, $slave ) = @_;
my ( $mh, $sh ) = portable_socketpair();
fh_nonblocking $mh, 1;
fh_nonblocking $sh, 1;
my $pid = fork();
unless ($pid) {
$slave->{_pid} = $$;
$mh->close();
$slave->{_socket} = $sh;
$slave->run;
}
$sh->close();
$slave->{_pid} = $pid;
$slave->{_socket} = $mh;
return $mh
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment