Skip to content

Instantly share code, notes, and snippets.

@tobert
Created March 21, 2012 14:51
Show Gist options
  • Save tobert/2147824 to your computer and use it in GitHub Desktop.
Save tobert/2147824 to your computer and use it in GitHub Desktop.
A perl/ZeroMQ log forwarder
#!/usr/bin/perl
=head1 NAME
zeromq_log_forwarder.pl - read a log and forward over ZeroMQ
=head1 SYNOPSIS
zeromq_log_forwarder.pl --logfile /full/path/to/logfile
--connect <uri> # Required, ZeroMQ URI to connect to.
--logfile <logfile> # Required, Full path to the logfile to tail. Should be qualified.
--dbhome <directory> # Required, Full path to a suitable directory for a BerkeleyDB environment.
--linux # Optional, enables Linux-specific rotation detection, beware of symlinks
mkdir -p /var/lib/log-forwarder
nohup zeromq_log_forwarder.pl \
--connect tcp://logsink.mydomain.com:20000 \
--logfile /var/log/foobar/foo.log \
--linux \
--dbhome /var/lib/log-forwarder &
=head1 DESCRIPTION
This script tails a logfile and forwards contents over ZeroMQ.
The logfile is ID'ed by checksumming the first 64k of the file. This is used to positively detect
rotations. When a new file is detected, a new process is forked to tail that file and send its data
along. When the end of the file is reached, the tailer process exits normally.
The repeater process binds on port 20001 with ZMQ_PUSH for consumers to connect and receive messages using
ZeroMQ ZMQ_PULL. The repeater process is necessary because of the forking model of this script. When two
files overlap, they cannot bind to the same TCP port. Fortunately, ZeroMQ has facilities for handling this.
=head1 LINUX
If you enable --linux, this script will use readlink (what ls -l does) on /proc/$$/fd/<fileno>
to determine if a file has been rotated. This only works on Linux, and only when rotation occurs
on the same mountpoint.
=head1 BerkeleyDB
This program uses bdb to store file position markers across restarts. BerkeleyDB offers safe writes across
multiple process that is necessary here since it forks for every new file.
=head1 TODO
* add past rotated file detection / replay
* support DNS RR records
* directories of files w/ patterns?
* fork limit / timeout (so hung servers don't cause this script to leak processes)
* rate limiting
* centralized configuration (via a ZMQ REQ/REP or maybe just a PULL)
* TLS or HMAC?
=head1 AUTHOR
Al Tobey <tobert@gmail.com>
=cut
$| = 1;
use strict;
use warnings;
use Carp;
use IO::File;
use Pod::Usage;
use Getopt::Long;
use Digest::MD5 qw(md5_hex);
use POSIX qw/:sys_wait_h setsid/;
use Data::Dumper;
use Time::HiRes qw/usleep/;
use ZeroMQ qw/:all/;
use ZeroMQ::Raw qw/zmq_device/;
use Benchmark qw(timediff timestr);
use BerkeleyDB;
our $PROGRAM = "zeromq_log_forwarder";
our $IDBYTES = 2**9;
our $IPC_URI = "ipc:///var/tmp/$$-zmq-ipc";
our $MAX_EOF_SLEEPS = 10_000;
our($opt_bind, $opt_connect); # one and only
our($opt_logfile, $opt_db_home, $opt_logdir); # required
our($opt_linux, $opt_daemon, $opt_verbose); # options
GetOptions(
'b=s' => \$opt_bind, 'bind=s' => \$opt_bind,
'c=s' => \$opt_connect, 'connect=s' => \$opt_connect,
'f=s' => \$opt_logfile, 'logfile=s' => \$opt_logfile,
'x' => \$opt_linux, 'linux' => \$opt_linux,
'h=s' => \$opt_db_home, 'dbhome=s' => \$opt_db_home,
'd' => \$opt_daemon, 'daemon' => \$opt_daemon,
'l=s' => \$opt_logdir, 'logdir=s' => \$opt_logdir,
'v' => \$opt_verbose, 'verbose' => \$opt_verbose
);
$opt_logdir ||= "/var/log/$PROGRAM";
$opt_db_home ||= "/var/lib/$PROGRAM";
check_db_home($opt_db_home);
unless ($opt_logfile && -f $opt_logfile) {
die "--logfile argument is required to point a file that exists";
}
daemonize($opt_logdir) if ($opt_daemon);
my $repeater_pid = fork();
if ($repeater_pid) {
watch($opt_logfile);
}
else {
public_repeater();
}
exit 0;
=item verbose()
Verbose output to STDERR, switched on/off via --verbose / -v.
=cut
sub verbose ($) {
return unless $opt_verbose;
print @_, $/;
}
=item watch()
Watch a logfile and tail it. Handles rotation detection via checksum identification and
forking a process to tail the file.
=cut
sub watch {
my($logfile) = @_;
my $last_file_id = 'initial';
my %pids;
my $count = 0;
while (1) {
my $file_id = idfile($logfile, 1);
if ($last_file_id ne $file_id) {
verbose "[main] Log ID changed from $last_file_id to $file_id.";
$last_file_id = $file_id;
my $pid = fork();
if ($pid) {
verbose "[main] spawned child [$$] to tail file with id '$file_id'";
$pids{$pid} = time;
next;
}
else {
alarm(86400); # never live more than a day
$SIG{ALRM} = sub { die "[$$] process was alive for more than a day. Dying now." };
my $start = Benchmark->new;
logtail($logfile, $file_id);
my $done = Benchmark->new;
my $took = timestr(timediff($done, $start));
verbose "[$$] $took";
verbose "[$$] Child finished tailing file ID $file_id normally. Exiting.";
exit 0;
}
}
foreach my $pid (keys %pids) {
my $result = waitpid($pid, WNOHANG);
if ($result < 0) {
delete $pids{$pid};
}
}
sleep 1;
}
}
=item public_repeater()
Forked log tailers publish to this process, which then publishes to the remote ZMQ socket.
=cut
sub public_repeater {
verbose "[$$] starting public repeater listening on $IPC_URI ...";
my $cxt = ZeroMQ::Context->new;
my $in = $cxt->socket(ZMQ_PULL);
$in->setsockopt(ZMQ_HWM, 1_000);
$in->bind($IPC_URI);
verbose "[$$] listening on $IPC_URI.";
my $out = $cxt->socket(ZMQ_PUSH);
$out->setsockopt(ZMQ_HWM, 1_000);
if ($opt_connect) {
verbose "[$$] connecting to $opt_connect.";
$out->connect($opt_connect);
}
elsif ($opt_bind) {
$out->bind($opt_bind);
verbose "[$$] listening on $opt_bind.";
}
# works with zmq 2.x, not 3.x, possibly a little faster
#zmq_device(ZMQ_FORWARDER, $in->{_socket}, $out->{_socket});
while (my $data = $in->recv) {
$out->send($data);
}
verbose "[$$] public repeater process shutting down.";
}
=item logtail()
Normally you should use File::Tail to tail logfiles, but in testing this program,
a number of bugs were discovered around seek/tell (which is used for positioning) and
the sleeping between reads.
This custom implemenation tracks position in a BDB so a process restart can pick up
where it left off and sleeps for smaller increments of time to help with high-throughput
logs.
=cut
sub logtail {
my($logfile, $id) = @_;
my $count = 0;
my $pos = 0;
my $sleeps = 0;
my($db_env, $db) = setup_position_db($opt_db_home);
my $cxt = ZeroMQ::Context->new;
my $sock = $cxt->socket(ZMQ_PUSH);
$sock->setsockopt(ZMQ_HWM, 10_000);
verbose "[$$] connecting to public repeater process on $IPC_URI.";
$sock->connect($IPC_URI);
my $fh = IO::File->new($logfile, 'r');
# use old-style exists check to support Hardy machines
if ($db->db_get($id, my $tmp) == 0) {
$db->db_get($id, $pos);
verbose "[$$] file ID $id has been opened before. Seeking to last known position at $pos bytes.";
$fh->seek($pos, 0);
}
else {
$db->db_put($id, 0);
}
while (1) {
$fh->seek($pos, 0);
while (my $line = $fh->getline) {
$sleeps = 0; # reset the sleep timeout
$count++;
$sock->send( $line );
if ($count % 10_000 == 0) {
$db->db_put($id, $fh->tell);
$db->db_sync;
verbose "[$$] processed $count lines ... \n";
}
}
my $curpos = $fh->tell;
# still at the same position as before, sleep, reset EOF,
# and try again
if ($fh->eof and $curpos == $pos) {
usleep 2;
$sleeps++;
}
# save the current position
$db->db_put($id, $curpos);
# crazy linux-specific way to detect rotations
if ($opt_linux and $^O eq 'linux' and $fh->eof) {
my $fileno = fileno $fh;
my $curpath = readlink("/proc/$$/fd/$fileno");
# FIXME: $curpath can't be a symlink
if ($curpath ne $logfile) {
verbose "[$$] reached EOF and /proc/$$/fd/$fileno is now $curpath. Exiting.";
last;
}
}
# save the current position in the file
$pos = $curpos;
# if we end up at EOF for 20 seconds (10k * 2 microseconds),
# it's probably rotated and all done, exit normally
if ($sleeps > $MAX_EOF_SLEEPS) {
verbose "[$$] slept $MAX_EOF_SLEEPS times at EOF. Exiting.";
last;
}
}
$db->db_close;
}
=item idfile()
Read the first N bytes of the file and compute an MD5 checksum to use as its
identification. Generally 64k is way more than enough for logfiles or similar
time series data.
=cut
sub idfile {
my($logfile, $wait) = @_;
if ($wait) {
while ( not -f $logfile or -s $logfile < $IDBYTES ) {
if ( -f $logfile ) {
my $size = -s $logfile;
warn "[$$] Logfile is only $size bytes. Waiting for it to grow up.";
}
sleep 10;
}
}
open( my $fh, "< $logfile" );
sysread( $fh, my $buffer, $IDBYTES );
my $digest = md5_hex($buffer);
close $fh;
return $digest;
}
=item check_db_home()
Verify that the BerkeleyDB home exists and is configured.
=cut
sub check_db_home {
my $home = shift;
mkdir $home;
my($db_env, $db) = setup_position_db($home);
$db->status or die "BerkeleyDB setup seems to have failed.";
}
=item setup_position_db()
Create a BDB set up to allow concurrent access for keeping track of file
locations and ID's.
=cut
sub setup_position_db {
my $home = shift;
die "--dbhome is required to point at a directory!"
unless ($home && -d $home);
my $dbflags = 0;
my $envflags = 0;
if ( not -f "$home/positions.db" ) {
$dbflags |= DB_CREATE;
$envflags |= DB_CREATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK;
}
my $env = BerkeleyDB::Env->new(
-Flags => $envflags,
-Home => $home
) or die "Could not create BerkeleyDB::Env: $BerkeleyDB::Error";
my $db = BerkeleyDB::Hash->new(
-Flags => $dbflags,
-Env => $env,
-Filename => "positions.db"
) or die "Could not open positions.db: $BerkeleyDB::Error";
return($env, $db);
}
=item daemonize()
Detach from the terminal and move STDOUT/STDERR to logfiles.
=cut
sub daemonize {
my($logdir) = @_;
unless (defined $logdir and -d $logdir and -w $logdir) {
die "--logdir (a writeable directory) is required to run as a daemon."
}
my $pid = fork();
if ($pid) {
exit 0;
}
else {
POSIX::setsid();
open(STDIN, "< /dev/null");
open(STDOUT, ">> $logdir/forwarder.log");
open(STDERR, ">> $logdir/forwarder.errors");
}
}
# vim: et ts=4 sw=4 ai smarttab
@tobert
Copy link
Author

tobert commented Mar 21, 2012

This may have some bugs introduced from stripping out company-specific bits. If you use it or like it let me know and I can give it more attention.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment