Skip to content

Instantly share code, notes, and snippets.

@macros
Created August 12, 2009 05:47
Show Gist options
  • Save macros/166346 to your computer and use it in GitHub Desktop.
Save macros/166346 to your computer and use it in GitHub Desktop.
#!/us/bin/perl
use warnings;
use strict;
use Working::Daemon;
use IO::Socket;
use JSON::DWIW;
use LWP::UserAgent;
use threads;
use threads::shared;
use Data::Dumper;
use Sys::Hostname;
my $daemon = Working::Daemon->new();
$daemon->parse_options(
'debug' => 0 => "Debugging",
'name' => 'thread-test' => '',
'minthreads=i' => 5 => "Minimum workers",
'maxthreads=i' => 50 => "Maximum workers",
'stomp_server=s' => 'localhost' => "Stomp Server",
'stomp_destination=s' => "search5-urls" => "What queue to read",
'ganglia_host=s' => 'monitor1.sjc.wikia-inc.com' => "Ganglia http host",
'ganglia_user=s' => 'admin' => "Ganglia http user",
'ganglia_pass=s' => 'foo' => "Ganglia http pass"
);
$daemon->do_action;
my @workers :shared;
my $apache_load :shared;
my $queue_size :shared;
$apache_load = 100;
$queue_size = 0;
my $admin_thread = threads->create(\&admin);
my $apache_load_thread = threads->create(\&get_apache_load);
my $queue_size_thread = threads->create(\&get_queue_size);
maybe_add_worker() for 1..$daemon->options->{minthreads};
for(;;) {
if ($apache_load > 100 || $queue_size < 1000) {
remove_worker();
} elsif ( $queue_size > 1000 ) {
maybe_add_worker();
}
sleep 1;
}
sub admin {
my $sock = IO::Socket::INET->new
(Listen => 5,
LocalAddr => 'localhost',
LocalPort => 9000,
Proto => 'tcp',
Reuse => 1
);
while(my $connection = $sock->accept) {
$connection->print(Dumper({ workers => scalar(@workers),
apache_load => $apache_load,
queue_size => $queue_size }));
close($connection);
}
}
sub get_apache_load {
my $ua = LWP::UserAgent->new;
my $host = $daemon->options->{ganglia_host};
$ua->credentials(
"${host}:443",
'Ganglia',
$daemon->options->{ganglia_user} => $daemon->options->{ganglia_pass} );
$ua->timeout(10);
my $req = HTTP::Request->new(GET => "https://${host}/ganglia/?m=load_one&r=hour&s=descending&c=Apaches&h=&sh=1&hc=4&z=small");
for(;;) {
my $res = $ua->request($req);
if ( $res->content && $res->code == 200 ) {
if ($res->content =~ m{Avg Load \(15, 5, 1m\).*\d+%, \d+%, (\d+)%} ) {
$apache_load = $1;
print "Apache Load is: $apache_load\n";
}
}
sleep 1;
}
}
sub get_queue_size {
my $ua = LWP::UserAgent->new;
my $host = $daemon->options->{stomp_server};
my $queue_name = $daemon->options->{stomp_destination};
$ua->timeout(10);
my $req = HTTP::Request->new(GET => "http://${host}:9999/queues/root/name/messages");
for (;;) {
my $res = $ua->request($req);
if ( $res->content && $res->code == 200 ) {
if (my $j = JSON::DWIW::deserialize($res->content)) {
foreach my $q (@{$j->{"queues"}}) {
if ( $q->{name} eq $queue_name ) {
$queue_size = $q->{messages};
print "Queue Size: $queue_size\n"
}
}
}
}
sleep 1;
}
}
sub maybe_add_worker {
if ( @workers < $daemon->options->{maxthreads} ) {
print "Creating new thread\n";
lock(@workers);
my $thread = threads->new(\&do_work);
my $tid = $thread->tid();
push @workers, $tid;
}
}
sub remove_worker {
if (@workers > $daemon->options->{minthreads} ) {
print "Removing worker\n";
lock(@workers);
my $tid = shift @workers;
my $thread = threads->object($tid);
$thread->detach();
}
}
sub do_work {
my $count = 0;
while (!threads->is_detached()) {
sleep 1;
$count++;
}
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment