Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/perl
use 5.010;
use warnings;
use strict;
use Geo::IP;
use Parse::Syslog::Line qw( parse_syslog_line );
use Socket qw( getnameinfo NI_NUMERICHOST NI_NUMERICSERV );
use POSIX qw( strftime );
use AnyEvent;
use AnyEvent::Handle::UDP;
use Search::Elasticsearch::Async;
use EV;
$Parse::Syslog::Line::DateTimeCreate = 0;
$Parse::Syslog::Line::EpochCreate = 0;
$Parse::Syslog::Line::ExtractProgram = 0;
$Parse::Syslog::Line::PruneRaw = 1;
$Parse::Syslog::Line::PruneEmpty = 1;
@Parse::Syslog::Line::PruneFields =
qw(priority facility_int priority_int host facility domain content preamble);
my $gi = Geo::IP->open( '/opt/logstash-1.4.1/vendor/geoip/GeoLiteCity.dat',
GEOIP_STANDARD );
my $es = Search::Elasticsearch::Async->new->bulk_helper(
index => 'logstash-accesslog-' . strftime( "%Y.%m.%d", localtime ),
type => 'logs',
max_count => 5000,
);
sub _grok {
my ($msg) = @_;
my $message = delete $msg->{'message'};
if ( $message =~ m/^\w+\[\d+\]: (.+)$/o ) {
my @values = split( /\|/, $1 );
$msg->{'client'} = $values[0];
$msg->{'servername'} = $values[1];
$msg->{'url'} = $values[2];
$msg->{'status'} = $values[3];
$msg->{'time'} = $values[4] + 0;
$msg->{'size'} = $values[5] + 0;
$msg->{'upstream'} = $values[6];
$msg->{'upstreamstatus'} = $values[7];
$msg->{'upstreamtime'} = $values[8] eq '-' ? 0 : $values[8] + 0;
$msg->{'referer'} = $values[9];
$msg->{'xff'} = $values[10];
$msg->{'useragent'} = $values[11];
}
}
sub _geoip {
my ($msg) = @_;
my $record = $gi->record_by_addr( $msg->{'client'} );
$msg->{'geoip'} = defined $record
? {
country_name => $record->country_name,
region_name => $record->region,
city_name => $record->city,
real_region_name => $record->region_name,
location => [ $record->latitude + 0, $record->longitude + 0 ],
}
: undef;
}
sub _send_data {
my ( $message, $from ) = @_;
my $msg = parse_syslog_line($message);
my ( $err, $ipaddr ) = getnameinfo( $from, NI_NUMERICHOST, NI_NUMERICSERV );
$msg->{'host'} = $ipaddr unless $err;
$msg->{'@timestamp'} = strftime( '%FT%T%z', localtime );
_grok($msg);
_geoip($msg);
$es->index( { source => $msg } );
}
my $hostname = '10.4.16.68';
my $port = 5140;
my $cv = AnyEvent->condvar;
AnyEvent::Handle::UDP->new(
bind => [ $hostname, $port ],
on_recv => sub {
my ( $data, $h, $from_addr ) = @_;
_send_data( $data, $from_addr );
},
on_error => sub {
say "Error";
},
);
$cv->recv;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment