Skip to content

Instantly share code, notes, and snippets.

@kazeburo
Last active August 29, 2015 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kazeburo/4450032714d98f6a1c06 to your computer and use it in GitHub Desktop.
Save kazeburo/4450032714d98f6a1c06 to your computer and use it in GitHub Desktop.
#!/usr/bin/perl
package OneSteam::Agent::Collector;
use strict;
use warnings;
my @func = qw/memory loadavg uptime sys_version processors cpu_usage tcp_established disk_usage disk_io/;
sub new {
bless {}, shift;
}
sub stats {
my $self = shift;
return $self->{stats};
}
sub meta {
my $self = shift;
return $self->{meta};
}
sub collect {
my $self = shift;
$self->{stats} = {};
$self->{meta} = {};
my %warn;
my %result;
for my $func (@func){
eval {
$self->$func();
};
$warn{$func} = $@ if $@;
}
my $body;
$body .= "stats.$_\t".$self->{stats}->{$_}."\n" for sort keys %{$self->{stats}};
$body .= "meta.$_\t".$self->{meta}->{$_}."\n" for sort keys %{$self->{meta}};
$body .= "warn.$_\t".$warn{$_}."\n" for sort keys %warn;
$body;
}
# Convert string like a "123 KB" into as byte
sub to_byte {
my $s = shift;
my $b = 0;
($s) = ($s =~ /^\s*(.+?)\s*$/); # trim
if ($s =~ /^[0-9]+$/) {
$b = $s;
} elsif ($s =~ /^([0-9]+)\s*([a-zA-Z]+)$/) {
$b = $1;
my $u = lc $2;
if ($u eq 'kb') {
$b = $b * 1024;
} elsif ($u eq 'mb') {
$b = $b * 1024 * 1024;
} elsif ($u eq 'gb') {
$b = $b * 1024 * 1024 * 1024;
} elsif ($u eq 'tb') {
$b = $b * 1024 * 1024 * 1024 * 1024;
} else {
warnf("Unknown unit: %s", $u);
}
} else {
warnf("Failed to convert into as byte: %s", $s);
}
return $b;
}
sub memory {
my $self = shift;
my %MEMORY_ITEM = (
'MemTotal' => 'memory-total.gauge',
'MemFree' => 'memory-free.gauge',
'Buffers' => 'memory-buffers.gauge',
'Cached' => 'memory-cached.gauge',
'SwapTotal' => 'memory-swap-total.gauge',
'SwapFree' => 'memory-swap-free.gauge',
'Inactive' => 'memory-inactive.gauge',
);
open my $fh, '<:utf8', '/proc/meminfo' or die "$!\n";
my %meminfo;
while (<$fh>) {
chomp;chomp;
my($key, $val) = split /[\s:]+/, $_, 2;
next unless $key;
$meminfo{$key} = to_byte($val);
}
close $fh;
my $stats = $self->stats;
for my $k ( keys %MEMORY_ITEM ) {
$stats->{$MEMORY_ITEM{$k}} = int( defined $meminfo{$k} ? $meminfo{$k} : 0);
}
$stats->{'memory-used.gauge'} =
$stats->{'memory-total.gauge'} - $stats->{'memory-free.gauge'} - $stats->{'memory-inactive.gauge'};
$stats->{'memory-swap-used.gauge'} = $stats->{'memory-swap-total.gauge'} - $stats->{'memory-swap-free.gauge'};
}
sub loadavg {
my $self = shift;
open my $fh, '<', '/proc/loadavg' or die "$!\n";
while (<$fh>) {
if (my @e = split /\s+/) {
$self->stats->{'loadavg-1.gauge'} = $e[0];
$self->stats->{'loadavg-5.gauge'} = $e[1];
$self->stats->{'loadavg-15.gauge'} = $e[2];
last;
}
}
close $fh;
}
sub uptime {
my $self = shift;
open my $fh, '<', '/proc/uptime' or die "$!\n";
while (<$fh>) {
if (my @e = split /\s+/) {
$self->meta->{'uptime'} = int($e[0]);
last;
}
}
close $fh;
}
sub sys_version {
my $self = shift;
open my $fh, '<', '/proc/version' or die "$!\n";
$self->meta->{'version'} = <$fh>;
chomp $self->meta->{'version'};
close $fh;
}
sub processors {
my $self = shift;
open my $fh, '<', '/proc/cpuinfo' or die "$!\n";
while (<$fh>) {
$self->stats->{'processors.gauge'}++ if m!^processor\s*:!
}
close $fh;
}
sub cpu_usage {
my $self = shift;
open my $fh, '<', '/proc/stat' or die "$!\n";
my @keys = qw(cpu-user cpu-nice cpu-system cpu-idle cpu-iowait cpu-irq cpu-softirq cpu-steal cpu-guest cpu-guest-nice);
while (<$fh>) {
if (/^cpu\s+/) {
chomp;
my(undef, @t) = split /\s+/;
for my $k (@keys) {
my $v = shift @t;
$self->stats->{"$k.derive"} = int(defined $v ? $v : 0);
}
last;
}
}
close $fh;
}
sub tcp_established {
my $self = shift;
open my $fh, '<', '/proc/net/snmp' or die "$!\n";
my $index;
while (<$fh>) {
if (/^Tcp:/) {
my @vals = split /\s+/, $_;
if (!$index) {
for my $label (@vals) {
last if $label eq 'CurrEstab';
$index++;
}
}
else {
$self->stats->{'tcp-established.gauge'} = $vals[$index];
last;
}
}
}
}
sub cap_cmd {
my ($cmdref) = @_;
pipe my $logrh, my $logwh
or die "Died: failed to create pipe:$!\n";
my $pid = fork;
if ( ! defined $pid ) {
die "Died: fork failed: $!\n";
}
elsif ( $pid == 0 ) {
#child
close $logrh;
open STDOUT, '>&', $logwh
or die "Died: failed to redirect STDOUT\n";
close $logwh;
exec @$cmdref;
die "Died: exec failed: $!\n";
}
close $logwh;
my $result;
while(<$logrh>){
$result .= $_;
}
close $logrh;
while (wait == -1) {}
my $exit_code = $?;
$exit_code = $exit_code >> 8;
return ($result, $exit_code);
}
sub disk_usage {
my $self = shift;
open my $fh, '<', '/proc/mounts' or die "$!\n";
my @mount_points;
my %mount_points;
while (<$fh>) {
if ( m!^/dev/(.+?) (/.*?) ! ) {
next if $2 eq '/boot'; # not required
push @mount_points, $2;
$mount_points{$2} = $1;
$mount_points{$2} =~ s![^A-Za-z0-9_-]!_!g;
}
}
my ($result, $exit_code) = cap_cmd(['df',@mount_points]);
die "failed to exec df\n" if $exit_code != 0;
my $ret;
for ( split /\n/, $result ) {
chomp;chomp;
my @d = split /\s+/, $_;
next unless exists $mount_points{$d[5]};
$self->stats->{"disk-usage-".$mount_points{$d[5]}."-used.gauge"} = $d[2];
$self->stats->{"disk-usage-".$mount_points{$d[5]}."-available.gauge"} = $d[3];
$self->meta->{"disk-usage-".$mount_points{$d[5]}."-mount"} = $d[5];
}
return $ret;
}
sub translate_device_mapper {
my $device = shift;;
for my $d ( glob(q!/dev/mapper/*!) ) {
my $s = readlink($d);
next unless $s;
($s) = ( $s =~ m!(dm-.+)$! );
if ( $s eq $device ) {
$d =~ s!^/dev/!!;
return $d;
}
}
die "cannot resolv $device\n";
}
sub disk_io {
my $self = shift;
my @stats = glob '/sys/block/*/stat';
for my $stat ( @stats ) {
my ($device) = ( $stat =~ m!^/sys/block/(.+)/stat$! );
open my $fh, '<', $stat or die "$!\n";
my $dstat = <$fh>;
close $fh;
$dstat =~ s!^\s+!!g;
my @dstats = split /\s+/, $dstat;
if ( $device =~ m!^dm-! ) {
$device = translate_device_mapper($device);
}
# readd-ios read-merges read-sectors readd-ticks 0..3
# write-ios write-merges write-sectors write-ticks 4..7
# ios-in-prog tot-ticks rq-ticks 8..9
next if $dstats[0] == 0 && $dstats[4] == 0;
$device =~ s![^A-Za-z0-9_-]!_!g;
$self->stats->{"disk-io-".$device."-read-ios.derive"} = $dstats[0];
$self->stats->{"disk-io-".$device."-read-sectors.derive"} = $dstats[2];
$self->stats->{"disk-io-".$device."-write-ios.derive"} = $dstats[4];
$self->stats->{"disk-io-".$device."-write-sectors.device"} = $dstats[6];
}
}
sub traffic {
my $self = shift;
open my $fh, '<', '/proc/net/dev' or die "$!\n";
while (<$fh>) {
if ( m!^\s+([^:]+):\s*(.*)$`! ) {
my $interface = $1;
my $stat = $2;
next if $interface eq 'lo'; #skip loopback
$interface =~ s![^A-Za-z0-9_-]!_!g;
my @stats = split /\s+/, $stat;
$self->stats->{"traffic-${interface}-rxbytes.derive"} = $stats[0];
$self->stats->{"traffic-${interface}-txbytes.derive"} = $stats[8];
}
}
}
1;
package OneSteam::Agent::Server;
use strict;
use warnings;
use POSIX qw(EINTR EAGAIN EWOULDBLOCK :sys_wait_h);
use IO::Socket qw(:crlf IPPROTO_TCP TCP_NODELAY);
use IO::Socket::INET;
our $MAX_REQUEST_SIZE = 131072;
sub new {
my $class = shift;
bless {
timeout => 30,
collector => OneSteam::Agent::Collector->new,
}, $class;
}
sub run {
my $self = shift;
my $localaddr = '0:20078';
my $sock = IO::Socket::INET->new(
Listen => SOMAXCONN,
LocalAddr => $localaddr,
Proto => 'tcp',
(($^O eq 'MSWin32') ? () : (ReuseAddr => 1)),
) or die "failed to listen to port $localaddr: $!";
$self->server($sock);
}
sub server {
my $self = shift;
my $sock = shift;
local $SIG{CHLD} = sub {
1 until (-1 == waitpid(-1, WNOHANG));
};
while(1) {
local $SIG{PIPE} = 'IGNORE';
if ( my $conn = $sock->accept ) {
$conn->blocking(0)
or die "failed to set socket to nonblocking mode:$!";
$conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
or die "setsockopt(TCP_NODELAY) failed:$!";
my $pid = fork();
die "cannot fork: $!" unless defined $pid;
if ( $pid == 0 ) {
local $SIG{CHLD} = 'DEFAULT';
$self->handle_connection($conn);
$conn->close;
exit;
}
$conn->close;
}
}
}
sub handle_connection {
my ($self, $conn) = @_;
my $buf = '';
my $req = +{};
while (1) {
my $rlen = read_timeout(
$conn, \$buf, $MAX_REQUEST_SIZE - length($buf), length($buf), $self->{timeout},
) or last;
if ( parse_read_buffer($buf, $req ) ) {
$buf = '';
my $body = $self->{collector}->collect;
my @headers = ('HTTP/1.1 200 OK','Content-Type: text/plain','Connection: close');
push @headers, 'Date: '.http_date();
write_all( $conn, join($CRLF, @headers, '') . $CRLF . $body, $self->{timeout} );
}
last;
}
return;
}
sub http_date {
my @weekday_abbr = qw(Sun Mon Tue Wed Thu Fri Sat);
my @month_abbr = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
my @gmt = gmtime;
sprintf '%s, %02d %s %04d %02d:%02d:%02d GMT', $weekday_abbr[$gmt[6]], $gmt[3], $month_abbr[$gmt[4]],
$gmt[5]+1900, $gmt[2],$gmt[1],$gmt[0];
}
sub parse_read_buffer {
my ($buf, $ret) = @_;
if ( $buf =~ /$CR?$LF$CR?$LF/ ) {
return 1;
}
return;
}
# returns (positive) number of bytes read, or undef if the socket is to be closed
sub read_timeout {
my ($sock, $buf, $len, $off, $timeout) = @_;
do_io(undef, $sock, $buf, $len, $off, $timeout);
}
# returns (positive) number of bytes written, or undef if the socket is to be closed
sub write_timeout {
my ($sock, $buf, $len, $off, $timeout) = @_;
do_io(1, $sock, $buf, $len, $off, $timeout);
}
# writes all data in buf and returns number of bytes written or undef if failed
sub write_all {
my ($sock, $buf, $timeout) = @_;
my $off = 0;
while (my $len = length($buf) - $off) {
my $ret = write_timeout($sock, $buf, $len, $off, $timeout)
or return;
$off += $ret;
}
return length $buf;
}
# returns value returned by $cb, or undef on timeout or network error
sub do_io {
my ($is_write, $sock, $buf, $len, $off, $timeout) = @_;
my $ret;
DO_READWRITE:
# try to do the IO
if ($is_write) {
$ret = syswrite $sock, $buf, $len, $off
and return $ret;
} else {
$ret = sysread $sock, $$buf, $len, $off
and return $ret;
}
unless ((! defined($ret)
&& ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) {
return;
}
# wait for data
DO_SELECT:
while (1) {
my ($rfd, $wfd);
my $efd = '';
vec($efd, fileno($sock), 1) = 1;
if ($is_write) {
($rfd, $wfd) = ('', $efd);
} else {
($rfd, $wfd) = ($efd, '');
}
my $start_at = time;
my $nfound = select($rfd, $wfd, $efd, $timeout);
$timeout -= (time - $start_at);
last if $nfound;
return if $timeout <= 0;
}
goto DO_READWRITE;
}
1;
package main;
OneSteam::Agent::Server->new->run;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment