Skip to content

Instantly share code, notes, and snippets.

@j-keck
Created January 12, 2016 20:29
Show Gist options
  • Save j-keck/c5a67c7ea0431c02499a to your computer and use it in GitHub Desktop.
Save j-keck/c5a67c7ea0431c02499a to your computer and use it in GitHub Desktop.
perl script to send systemd logs in a postgresql db
#!/usr/bin/env perl
#
# save systemd journal logs to a postgres instance
#
#
# * first run with '-init-db' to intialize the database
# * second run to load all logs
# (maybe with '-journalctl-args "-n 1000000"')
# * the following runs inserts only new log entries
#
#
#
# sample queries
# ==========================
# list in json keys: select json_object_keys(j::json) from logs_raw;
# ^^^^ why not with 'jsonb'?
#
# list journal ts, msg: select jts, j->>'MESSAGE' from logs;
#
#
#
#
use v5.10;
use strict;
use warnings;
#use diagnostics; # 3 seconds startup slowdown on pi
use Getopt::Long;
use autodie;
# =============================================================================
# default args
# =============================================================================
my $show_help;
my $init_db;
my $db_ip = undef; # optional
my $db_port = "5432";
my $db_name = &current_os_user();
my $db_user = &current_os_user();
my $db_table_name = "logs";
my $journalctl_args = "";
# =============================================================================
# parse / validate args
# =============================================================================
GetOptions("help" => \$show_help,
"init-db" => \$init_db,
"db-ip=s" => \$db_ip,
"db-port=i" => \$db_port,
"db-name=s" => \$db_name,
"db-user=s" => \$db_user,
"db-table-name=s" => \$db_table_name,
"journalctl-args=s" => \$journalctl_args,
) || exit 1;
# =============================================================================
# initialize vars
# =============================================================================
my $psql_cmd = &build_psql_cmd();
my $machine_id = &read_file("/etc/machine-id");
# =============================================================================
# action
# =============================================================================
if(defined($show_help)){
say qq[$0 : save systemd journal logs to a postgres instance
ARGS:
-h : show this help
-init-db : initialize the database
-db-ip : database ip
-db-port : database port
-db-name : database name (default: $db_name)
-db-user : database user (default: $db_user)
-db-table-name : log table name (default: $db_table_name)
-journalctl-args: extra args for journalctl - '-journalct-args "-n 1000000"'
];
}elsif(defined($init_db)){
&verify_db_encoding();
&init_db();
}else{
&verify_db_login();
&verify_db_is_initialized();
&update_db();
}
# =============================================================================
# action functions
# =============================================================================
sub init_db {
my $setup_sql = <<"EOF";
CREATE TABLE IF NOT EXISTS $db_table_name(
id SERIAL PRIMARY KEY
, ts TIMESTAMP
, jts TIMESTAMP
, hostname TEXT
, hostid TEXT
, priority INTEGER
, j JSONB
);
COMMENT ON COLUMN ${db_table_name}.ts IS 'timestamp: table insert';
COMMENT ON COLUMN ${db_table_name}.jts IS 'timestamp: from systemd journal / log timestamp';
COMMENT ON COLUMN ${db_table_name}.priority IS 'A priority value between 0 ("emerg") and 7 ("debug")';
COMMENT ON COLUMN ${db_table_name}.j IS 'journald log entry as json. see: http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html ';
CREATE INDEX idx_${db_table_name}_ts ON ${db_table_name} (ts);
CREATE INDEX idx_${db_table_name}_jts_hostid ON ${db_table_name} (jts, hostid);
CREATE OR REPLACE FUNCTION pr_${db_table_name}_set_meta() RETURNS trigger AS
\\\$BODY\\\$
DECLARE
rts VARCHAR;
rts_as_double DOUBLE PRECISION;
BEGIN
IF NEW.ts IS NULL THEN
NEW.ts := NOW();
END IF;
IF NEW.jts IS NULL THEN
rts := (SELECT NEW.j->>'__REALTIME_TIMESTAMP');
rts_as_double := CAST(FORMAT('%s.%s', left(rts, 10), substr(rts, 11)) AS DOUBLE PRECISION);
-- RAISE NOTICE 'realtime timestamp: %, as double: %', rts, rts_as_double;
NEW.jts := TO_TIMESTAMP(rts_as_double);
END IF;
IF NEW.hostname IS NULL THEN
NEW.hostname := (SELECT NEW.j->>'_HOSTNAME');
END IF;
IF NEW.hostid IS NULL THEN
NEW.hostid := (SELECT NEW.j->>'_MACHINE_ID');
END IF;
IF NEW.priority IS NULL THEN
NEW.priority := (SELECT NEW.j->>'PRIORITY');
END IF;
RETURN NEW;
END;
\\\$BODY\\\$ LANGUAGE plpgsql;
CREATE TRIGGER tr_bi_${db_table_name}_set_meta
BEFORE INSERT ON $db_table_name
FOR EACH ROW EXECUTE PROCEDURE pr_${db_table_name}_set_meta();
EOF
psql($setup_sql);
}
sub update_db {
# query the last journal cursor from the db
my $last_cursor_query = <<"QUERY";
SELECT j->>'__CURSOR'
FROM $db_table_name
WHERE jts = (SELECT MAX(jts)
FROM $db_table_name
WHERE hostid = '$machine_id')
AND hostid = '$machine_id';
QUERY
my $last_cursor = psql($last_cursor_query);
my $entries_counter = 0;
# start / configure psql session
open my $pgfh, "| cat | $psql_cmd ";
say $pgfh "BEGIN;";
# build / execute journalctl cmd
my $jstart = ($last_cursor ? " --after-cursor='$last_cursor' " : "");
my $journalctl_cmd = "journalctl -o json $jstart $journalctl_args";
say "EXEC: $journalctl_cmd";
open my $jfh, "$journalctl_cmd |" ;
while(my $line = <$jfh>){
chomp($line);
next if $line eq "-- Reboot --"; # nice job systemd!
my $sql = "INSERT INTO $db_table_name (j) VALUES (to_json(\$JOURNAL2PG\$$line\$JOURNAL2PG\$::jsonb)::jsonb);";
say $pgfh $sql;
$entries_counter += 1;
}
close($jfh);
# commit / close psql session
say $pgfh "COMMIT;";
close($pgfh);
say "done - $entries_counter new entries added";
}
#
#
#
sub verify_db_encoding {
my $db_encoding = psql("SHOW SERVER_ENCODING;");
if($db_encoding ne "UTF8"){
&abort("unsupported server encoding", "
expected: UTF8, actual: $db_encoding
HINT:
create a database with UTF8 encoding:
'pg_createcluster 9.4 <DB_NAME> --start-conf=auto --start -- -E UTF8 --data-checksums'
");
}
}
#
#
#
sub verify_db_login {
my $psql_out = `$psql_cmd -c "\\q" 2>&1`;
my $psql_exit_code = $?;
if($psql_exit_code != 0){
&abort("database login failed", "
PSQL Message:
$psql_out
");
}
}
#
#
#
sub verify_db_is_initialized{
if(psql("SELECT 'OK' FROM pg_class WHERE relname = '$db_table_name';") ne "OK"){
&abort("database not initzalized", "
HINT:
initialize the database with:
'$0 --init-db'
");
}
}
# =============================================================================
# helper functions
# =============================================================================
#
#
#
sub abort {
my $reason = shift;
my $details = shift;
say "\n! ABORT ! - $reason\n$details";
exit 1;
}
#
#
#
sub psql {
my $sql = shift;
my $cmd = <<"PERL_EOF";
$psql_cmd <<EOF
\\t
$sql;
EOF
PERL_EOF
# execute psql and catch output
my $res = `$cmd`;
# trim spaces
$res =~ s/^\s+|\s+$//g;
return $res;
}
#
#
#
sub build_psql_cmd {
my $remote_login_args = "";
if(defined($db_ip)){
$remote_login_args .= " -h $db_ip ";
}
if(defined($db_port)){
$remote_login_args .= " -p $db_port ";
}
return "psql -q $remote_login_args -U $db_user -d $db_name";
}
#
#
#
sub current_os_user {
my $user = getpwuid($<);
return $user;
}
#
#
#
sub read_file {
our $path = shift;
# local sub function to keep
# '$/' change local (hint: chomp)
sub slurp {
open my $fh, "<$path";
local $/ = undef;
my $content = <$fh>;
close($fh);
return $content;
}
chomp(my $content = &slurp());
return $content;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment