Skip to content

Instantly share code, notes, and snippets.

@stemid
Last active February 4, 2016 16:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stemid/9cd72cee374d0090d4bb to your computer and use it in GitHub Desktop.
Save stemid/9cd72cee374d0090d4bb to your computer and use it in GitHub Desktop.
Greylisting daemon written by Oskar Liljeblad
# /etc/dgrey/config: Configuration file for dgrey
#
# Use this in your postfix main.cf
# check_policy_service inet:127.0.0.1:10000
# debug (yes/no): Verbose logging? (default no)
debug = yes
# log-file (path): File to log to, standard out if '-', or nowhere if set to
# the empty string (default '-').
log-file = /var/log/dgrey.log
# log-syslog (yes/no): Log to syslog (default no)
log-syslog = no
# exim (yes/no): Exim mode - close connection after each request
# (default no)
exim = no
# database-dir (path): Directory where database files are stored
database-dir = /var/lib/dgrey
# local-listen (ip:port): Accept connections without password
local-listen = localhost:10000
# public-listen (ip:port): Accept connections requiring password
public-listen = 0.0.0.0:10001
# auth-key: Authentication key for public connections
auth-key = zkvbJBB56Kn1mvALXDx5
# poll-host: Comma-separated list of hosts to poll (ip:port)
poll-hosts = 10.220.252.102:10001
# reconnect-time: Poll host reconnect time (default 1m)
reconnect-time = 1m
# greylist-min-time: Lower bound of greylist window (default 5m)
greylist-min-time = 5m
# greylist-max-time: Upper bound of greylist window (default 2d)
greylist-max-time = 2d
# greylist-purge-time: Delete entries in greylist database older than this
# (default 30d)
greylist-purge-time = 30d
# awl-count: Auto-whitelist after this many succesful deliveries
# (0 to disable, default 5)
awl-count = 5
# awl-min-time: Minimal time between successful deliveries before the
# auto-whitelist counter is increased (default 1h)
awl-min-time = 1h
# awl-purge-time: Delete auto-whitelist entries older than this
# (default 30d)
awl-purge-time = 30d
# whitelist-client-files: Comma-separated list of files to read client
# whitelist from
whitelist-client-files = /etc/dgrey/whitelist_clients
# whitelist-recipient-files: Comma-separated list of files to read
# recipient whitelist from
whitelist-recipient-files = /etc/dgrey/whitelist_recipients
# hostname: The host name to put in the X-Greylist header (can be specified
# as a file to read, default is to determine automatically)
hostname = /etc/mailname
# prepend-header (yes/no): If yes, then add header X-Greylist to delayed
# e-mails (default yes)
#prepend-header = yes
# listen-queue-size: Listen queue size for incoming connections (default
# system specific)
#listen-queue-size = 5
# lookup-by-host (yes/no): Store greylist tuples with full IP address
# rather than the C-class subnet (default no)
#lookup-by-host = no
# greylist-action: Action to send to MTA when a mail is greylisted (default
# 'DEFER_IF_PERMIT')
#greylist-action = "DEFER_IF_PERMIT"
# greylist-message: Message to reply with when a mail has been greylisted
#greylist-message = "You are being greylisted for %s seconds"
#!/usr/bin/perl -w
#
# dgrey - Greylisting synchronized between multiple MX servers
#
# Copyright (C) 2008 Oskar Liljeblad
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Please see the dgrey(1) manual page for usage details.
# Please note that this file is automatically updated by make.
#
use strict;
use Data::Dumper;
use IO::Muxer;
use IO::Socket;
use IO::Socket::INET;
use IO::Handle;
use Getopt::Long;
use File::Spec;
use File::Basename;
use POSIX qw(strftime);
use Socket qw(TCP_NODELAY);
use BerkeleyDB;
use Fcntl;
use Sys::Hostname;
use Sys::Syslog qw(:standard :macros);
use List::Util qw(min max);
$::PACKAGE = 'dgrey'; # This line is automatically updated by make
$::VERSION = '0.1.0'; # This line is automatically updated by make
$::BUG_EMAIL = 'oskar@osk.mine.nu'; # This line is automatically updated by make
$::PROGRAM = $::PACKAGE;
my %opt; # current command line options
my %cfg; # current configuration
my $log_fh; # log file handle
my $mux; # the muxer object
my $loc_socket; # local listen socket
my $pub_socket; # public listen socket
my $hostname; # hostname for X-Greylist header
my %in_client_fh; # hash of clients that connected to us
my %out_client_fh; # hash of clients that we connected to
my @out_clients; # list of poll host clients (connected or not)
my $msgi_socket; # signal socket, read
my $msgo_socket; # signal socket, write
my $database_env; # database environment
my $grey_db; # database greylist hash
my $grey_db_obj; # database greylist object
my $awl_db; # database awl hash
my $awl_db_obj; # database awl object
my $first_offset; # actionlog first offset
my $last_offset; # actionlog last offset
my $actionlog_fh; # actionlog file handle
my %offsets; # poll host offsets
my $timer_install_time = -1; # time when reconnect timer is to trigger (-1 means not installed)
my @whitelist_client_hostname; # whitelist regexps for client hostname
my @whitelist_client_ip; # whitelist regexps for client ip
my @whitelist_recipient_email; # whitelist regexps for recipient e-mail address
my @instances = ( 0 ) x 10; # previously connected postfix instances
#
# main: Main loop of the program. Called once.
#
sub main() {
$SIG{__WARN__} = sub { print STDERR $0, ': ', @_; };
$SIG{__DIE__} = sub { print STDERR $0, ': ', @_; exit 1; };
# Handle command line options
my $default_cfg_file = File::Spec->catfile('/etc', $::PACKAGE, 'config');
%opt = ( 'config-file' => $default_cfg_file );
Getopt::Long::GetOptions(\%opt, 'config-file|f=s', 'help', 'version') || exit 1;
if ($opt{'version'}) {
print $::PROGRAM, ($::PROGRAM eq $::PACKAGE ? ' ' : ' ('.$::PACKAGE.') '), $::VERSION, "\n";
print "Copyright (C) 2008 Oskar Liljeblad\n";
print "This is free software. You may redistribute copies of it under the terms of\n";
print "the GNU General Public License <http://www.gnu.org/licenses/gpl.html>.\n";
print "There is NO WARRANTY, to the extent permitted by law.\n\n";
print "Written by Oskar Liljeblad.\n";
exit;
}
if ($opt{'help'}) {
print "Usage: $0 [OPTS]..\n";
print "Start the distributed greylisting daemon.\n\n";
print " -f, --config-file=PATH specific configuration file path\n";
print " (default: $default_cfg_file)\n";
print " --help display this help and exit\n";
print " --version output version information and exit\n";
print "\nReport bugs to <", $::BUG_EMAIL, ">\n";
exit;
}
# Read and check config file, white lists and host name file
%cfg = read_config_file($opt{'config-file'}) or exit 1;
reload_client_whitelists(@{$cfg{'whitelist-client-files'}}) || exit 1;
reload_recipient_whitelists(@{$cfg{'whitelist-recipient-files'}}) || exit 1;
refresh_hostname_variable($cfg{'hostname'}) || exit 1;
# Initialize muxer (main loop I/O management) and set up signal management
$mux = IO::Muxer->new(__PACKAGE__);
($msgi_socket,$msgo_socket) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
$mux->add($msgi_socket);
$SIG{$_} = \&handle_signal foreach ('HUP', 'USR1', 'TERM', 'INT', 'ALRM');
# Set up listen sockets
reopen_listen_socket($cfg{'local-listen'}, \$loc_socket, 'local') || exit 1;
reopen_listen_socket($cfg{'public-listen'}, \$pub_socket, 'public') || exit 1;
# Read poll host offsets, prepare action log file handle, open databases
reopen_database($cfg{'database-dir'}) || exit 1;
# Initialize poll host list
reinitialize_poll_hosts(@{$cfg{'poll-hosts'}}) || exit 1;
# Initialize logging
reopen_log_file($cfg{'log-file'}, $cfg{'log-syslog'}) || exit 1;
$SIG{__WARN__} = sub { log_msg('warn', @_); };
$SIG{__DIE__} = sub { log_msg('error', @_); exit 1; };
# Main loop
log_msg('info', "Daemon started (version $::VERSION)\n");
reconnect_poll_hosts();
$mux->loop();
write_poll_host_offsets();
log_msg('info', "Daemon ended\n");
log_msg('info', "Log file closing\n") if defined $log_fh;
close($log_fh) if defined $log_fh; # Ignore error (probably nowhere to print error message anyway)
closelog() if $cfg{'log-syslog'};
}
#
# parse_config_directive: Check a config directive
#
sub parse_config_directive($$$) {
my ($key,$value,$type) = @_;
if ($type eq 'bool') {
return undef if $value !~ /^(yes|no|1|0|true|false)$/;
return ($value =~ /^(yes|1|true)$/ ? 1 : 0);
} elsif ($type eq 'str') {
if ($value =~ /^\"(\\.|[^\\"])*\"$/) {
$value = $1;
$value =~ s/\\(.)/$1/g;
}
return $value;
} elsif ($type eq 'uint') {
return undef if $value !~ /^(0|[1-9][0-9]*)$/;
return $value;
} elsif ($type eq 'dur') {
my $dur = 0;
$value =~ s/\s*//g;
while ($value =~ /^(\d+)([dhms])(.*)$/) {
my ($amount,$factor) = ($1,$2);
$dur += $amount * 3600 * 24 if $factor eq 'd';
$dur += $amount * 3600 if $factor eq 'h';
$dur += $amount * 60 if $factor eq 'm';
$dur += $amount if $factor eq 's';
$value = $3;
}
return $value eq '' ? $dur : undef;
} elsif ($type eq 'strlist') {
my @values = ();
while ($value ne '') {
if ($value =~ /^\"(\\.|[^\\"])*\"\s*(,.*)/) {
$value = $2;
push @values, $1;
$values[$#values] =~ s/\\(.)/$1/g;
} else {
$value =~ /^\s*([^,]*?)\s*(,.*|$)/;
$value = $2;
push @values, $1;
}
$value =~ s/,\s*//;
}
return \@values;
}
return undef;
}
#
# cmp_undef: Compare if two strings are the same, taking into account
# undefined values.
#
sub cmp_undef($$) {
my ($a,$b) = @_;
return $a cmp $b if defined $a && defined $b;
return (defined $a ? 1 : 0) - (defined $b ? 1 : 0);
}
#
# intcmp_undef: Compare if two integers are the same, taking into account
# undefined values.
#
sub intcmp_undef($$) {
my ($a,$b) = @_;
return $a <=> $b if defined $a && defined $b;
return (defined $a ? 1 : 0) - (defined $b ? 1 : 0);
}
#
# list_cmp: Compare two lists for different string values.
#
sub list_cmp($$) {
my ($a,$b) = @_;
return @$a - @$b if @$a != @$b;
for (my $c = 0; $c < @$a; $c++) {
return $a->[$c] cmp $b->[$c] if $a->[$c] ne $b->[$c];
}
return 0;
}
#
# refresh_hostname_variable: Refresh the hostname variable
#
sub refresh_hostname_variable($) {
my ($hn) = @_;
eval {
if (defined $hn && substr($hn, 0, 1) eq '/') {
open(my $fh, '<', $hn) || die "cannot open `$hn': $!\n";
my $str = <$fh>;
close $fh;
chomp $str if defined $str;
die "invalid hostname file `$hn'\n" if !defined $str || $str eq '';
$hn = $str;
}
$hostname = $hn;
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reload_config_file:
#
sub reload_config_file() {
my %newcfg = read_config_file($opt{'config-file'}) or return 0;
my $disconnect = 0; # disconnect all that connected to us
my $reconnect = 0; # reinitialize poll hosts and reconnect
# Nothing need to be done when the following variables are updated:
# debug
# exim
# greylist-min-time
# greylist-max-time
# greylist-purge-time
# awl-count
# awl-min-time
# awl-purge-time
# lookup-by-host
# prepend-header
# greylist-message
# greylist-action
if (cmp_undef($newcfg{'log-file'}, $cfg{'log-file'}) != 0 || $newcfg{'log-syslog'} != $cfg{'log-syslog'}) {
reopen_log_file($newcfg{'log-file'}, $newcfg{'log-syslog'});
}
if (intcmp_undef($newcfg{'listen-queue-size'}, $cfg{'listen-queue-size'}) != 0 || cmp_undef($newcfg{'local-listen'}, $cfg{'local-listen'}) != 0) {
reopen_listen_socket($newcfg{'local-listen'}, \$loc_socket, 'local');
}
if (intcmp_undef($newcfg{'listen-queue-size'}, $cfg{'listen-queue-size'}) != 0 || cmp_undef($newcfg{'public-listen'}, $cfg{'public-listen'}) != 0) {
reopen_listen_socket($newcfg{'public-listen'}, \$pub_socket, 'public');
}
# newcfg{'database-dir'} must be non-null at this point
if ($newcfg{'database-dir'} ne $cfg{'database-dir'}) {
reopen_database($newcfg{'database-dir'});
$disconnect = 1; # our database has changed, they need to reconnect
$reconnect = 1; # offsets changed, we must reconnect and fetch actions anew
}
if (list_cmp($newcfg{'poll-hosts'}, $cfg{'poll-hosts'}) != 0) {
$reconnect = 1;
}
if (cmp_undef($newcfg{'auth-key'}, $cfg{'auth-key'})) {
$disconnect = 1; # clients may now be locked out
}
# Always reload these files (ignore errors as those are logged)
reload_client_whitelists(@{$newcfg{'whitelist-client-files'}});
reload_recipient_whitelists(@{$newcfg{'whitelist-recipient-files'}});
refresh_hostname_variable($newcfg{'hostname'});
if ($newcfg{'reconnect-time'} != $cfg{'reconnect-time'}) {
if ($timer_install_time != -1 && !$reconnect) {
my $timediff = ($timer_install_time + $cfg{'reconnect-time'}) - time();
if ($timediff > 0) {
alarm($timediff);
} else {
$reconnect = 1;
}
}
}
if ($newcfg{'keep-alive-time'} != $cfg{'keep-alive-time'}) {
# I'm too lazy to code some intelligence here, but basicly we could go
# through all outgoing client connections and check keep-alive-time, and
# take the appropriate actions etc.
$reconnect = 1;
}
if ($newcfg{'keep-alive-max-lost'} != $cfg{'keep-alive-max-lost'} && !$reconnect) {
# See comment about laziness above. :)
$reconnect = 1;
}
if ($disconnect) {
$mux->close_when_flushed($_) foreach (values %in_client_fh);
%in_client_fh = ();
}
if ($reconnect) {
$timer_install_time = -1;
$mux->close_when_flushed($_) foreach (values %out_client_fh);
%out_client_fh = ();
reinitialize_poll_hosts(@{$newcfg{'poll-hosts'}});
reconnect_poll_hosts();
}
%cfg = %newcfg;
return 1;
}
#
# reopen_listen_socket: Set up a listen socket, either local or public
#
sub reopen_listen_socket($$$) {
my ($new_listen, $socket_ptr, $listen_type) = @_;
eval {
local $SIG{__DIE__} = 'DEFAULT';
my $new_socket;
if (defined $new_listen) {
$new_socket = new IO::Socket::INET('LocalAddr' => $new_listen, 'Proto' => 'TCP', 'Listen' => $cfg{'listen-queue-size'}, 'Blocking' => 0, 'ReuseAddr' => 1);
die "cannot set up listen socket on `$new_listen': $!\n" if !defined $new_socket;
$mux->listen($new_socket);
log_msg('info', "Listening on $new_listen for ", $listen_type, " connections\n");
} else {
log_msg('info', "Not listening for ", $listen_type, " connections\n");
}
$mux->close(${$socket_ptr}) if defined ${$socket_ptr};
${$socket_ptr} = $new_socket;
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reopen_log_file: Close and open the log file
#
sub reopen_log_file($$) {
my ($new_log_file, $new_log_syslog) = @_;
my $new_log_fh;
if ($new_log_syslog) {
openlog($::PROGRAM, 'ndelay,pid', LOG_DAEMON);
}
if ($new_log_file eq '-') {
if (!open($new_log_fh, '>&', \*STDOUT)) {
warn "cannot open standard out: $!\n";
return 0;
}
} elsif ($new_log_file ne '') {
if (!open($new_log_fh, '+>>', $new_log_file)) {
warn "cannot open `$new_log_file' for appending: $!\n";
return 0;
}
}
$new_log_fh->autoflush(1) if defined $new_log_fh;
my $error;
if (defined $log_fh) {
log_msg('info', "Log file closing\n");
$error = $! if !close($log_fh);
}
closelog() if $cfg{'log-syslog'} && !$new_log_syslog;
$log_fh = $new_log_fh;
log_msg('info', "Log file opened\n") if defined $log_fh;
log_msg('warn', "Could not close old log file - $error\n") if defined $error;
return 1;
}
#
# read_config_file: Read specified config file and return its contents as
# an associative array.
#
sub read_config_file($) {
my ($cfg_file) = @_;
my $cfg_path = dirname($cfg_file);
my %config_template = (
'debug' => [ 'bool', 0 ],
'exim' => [ 'bool', 0 ],
'database-dir' => [ 'str', undef ],
'local-listen' => [ 'str', undef ],
'public-listen' => [ 'str', undef ],
'log-file' => [ 'str', '-' ],
'log-syslog' => [ 'bool', 0 ],
'auth-key' => [ 'str', undef ],
'poll-hosts' => [ 'strlist', [] ],
'reconnect-time' => [ 'dur', 60 ],
'keep-alive-time' => [ 'dur', 60 ],
'keep-alive-max-lost' => [ 'uint', 3 ],
'greylist-min-time' => [ 'dur', 60*5 ],
'greylist-max-time' => [ 'dur', 3600*24*2 ],
'greylist-purge-time' => [ 'dur', 3600*24*30 ],
'awl-count' => [ 'uint', 5 ],
'awl-min-time' => [ 'dur', 3600 ],
'awl-purge-time' => [ 'dur', 3600*24*30 ],
'hostname' => [ 'str', hostname() ],
'lookup-by-host' => [ 'bool', 0 ],
'listen-queue-size' => [ 'uint', undef ],
'prepend-header' => [ 'bool', 1 ],
'whitelist-client-files' => [ 'strlist', [ File::Spec->catfile($cfg_path, 'whitelist_clients'), File::Spec->catfile($cfg_path, 'whitelist_clients.local') ] ],
'whitelist-recipient-files' => [ 'strlist', [ File::Spec->catfile($cfg_path, 'whitelist_recipients'), File::Spec->catfile($cfg_path, 'whitelist_recipients.local') ] ],
'greylist-message' => [ 'str', 'You are being greylisted for %s seconds' ],
'greylist-action' => [ 'str', 'DEFER_IF_PERMIT', ],
);
my %config = map { $_ => $config_template{$_}->[1] } keys %config_template;
eval {
local $SIG{__DIE__} = 'DEFAULT';
open(my $fh, '<', $cfg_file) || die "cannot open `$cfg_file': $!\n";
while (<$fh>) {
s/^\s*(.*?)\s*$/$1/;
if (/^([a-z-]+)\s*=\s*(.*?)\s*$/) {
my ($key,$value) = ($1,$2);
die "$cfg_file:$.: unknown directive `$key'\n" if !exists $config_template{$key};
$config{$key} = parse_config_directive($key, $value, $config_template{$key}->[0]);
die "$cfg_file:$.: invalid value for `$key'\n", if !defined $config{$key};
} elsif (!/^$/ && !/^#/) {
die "$cfg_file:$.: unparsable line\n";
}
}
close($fh); # Ignore errors (reading only)
die "$cfg_file: public-listen and local-listen not specified\n" if !exists $config{'public-listen'} && !exists $config{'local-listen'};
die "$cfg_file: public-listen directive requires auth-key\n" if exists $config{'public-listen'} && !exists $config{'auth-key'};
die "$cfg_file: no database-dir directive specified\n" if !exists $config{'database-dir'};
die "$cfg_file: log-file and log-syslog are mutually exclusive\n" if $config{'log-file'} ne '' && $config{'log-syslog'};
};
return %config if !$@;
warn $@;
return ();
}
#
# write_poll_host_offsets: Write poll host offsets to file on disk.
#
sub write_poll_host_offsets() {
eval {
local $SIG{__DIE__} = 'DEFAULT';
if (exists $cfg{'poll-hosts'}) {
my $file = File::Spec->catfile($cfg{'database-dir'}, 'poll-host-offsets');
open(my $fh, '>', "$file.tmp") || die "cannot open `$file.tmp': $!\n";
foreach my $client (@out_clients) {
print { $fh } $client->{'ip'}.':'.$client->{'port'}."\t".$client->{'poll_offset'}."\n";
}
close($fh) || die "cannot close `$file.tmp': $!\n";
rename("$file.tmp", $file) || die "cannot rename `$file.tmp' to `$file': $!\n";
}
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reconnect_poll_hosts: Schedule reconnect to poll hosts.
#
sub reconnect_poll_hosts() {
log_msg('info', 'Attempting to reconnect to unconnected poll hosts (if any)', "\n");
foreach my $client (@out_clients) {
connect_poll_host($client) if !defined $client->{'fh'};
}
}
#
# mux_timeout: Called when a manually installed timer on a file handle expires
#
sub mux_timeout($$$) {
my ($pkg,$mux,$fh) = @_;
my $client = $out_client_fh{$fh};
if (!exists $client->{'keep-alive-trigger'}) { # Socket not connected yet
log_msg('warn', 'Timed out connecting to '.$client->{'desc'}, "\n");
close_fh($fh, 0);
connect_poll_host($client);
} else {
if ($client->{'keep-alive-count'} > $cfg{'keep-alive-max-lost'}) {
log_msg('debug', 'Too many keep alives lost for '.$client->{'desc'}, "\n");
close_fh($fh, 0);
connect_poll_host($client);
} else {
log_msg('debug', 'Sending keep alive to '.$client->{'desc'}, "\n");
$mux->write($fh, "KEEPALIVE\n");
$mux->set_timeout($fh, $cfg{'keep-alive-time'});
$client->{'keep-alive-trigger'} = time() + $cfg{'keep-alive-time'};
$client->{'keep-alive-count'}++;
}
}
}
#
# mux_input: Called on I/O input waiting
#
sub mux_input($$$$) {
my ($pkg,$mux,$fh,$data) = @_;
if ($fh == $msgi_socket) {
while ($$data =~ s/^(.)//) {
my $signal = $1;
if ($signal eq 'I' || $signal eq 'T') {
log_msg('info', 'Received ', ($signal eq 'I' ? 'INT' : 'TERM'), ' signal, shutting down', "\n");
$mux->end();
} elsif ($signal eq 'H' || $signal eq 'U') {
log_msg('info', 'Received ', ($signal eq 'H' ? 'HUP' : 'USR1'), ' signal, reloading', "\n");
my ($old_log_file, $old_log_syslog) = ($cfg{'log-file'}, $cfg{'log-syslog'});
reload_config_file();
reopen_log_file($cfg{'log-file'}, $cfg{'log-syslog'}) if $old_log_file eq $cfg{'log-file'} && $old_log_syslog == $cfg{'log-syslog'};
write_poll_host_offsets(); # We may do it a second time here (first in reload_config_file), but it should be done at least once
} elsif ($signal eq 'A') {
if ($timer_install_time != -1) {
$timer_install_time = -1;
reconnect_poll_hosts();
}
}
}
}
elsif (exists $out_client_fh{$fh}) {
while ($$data =~ s/^([^\r\n]*)\r?\n//m) {
my $line = $1;
if ($line =~ /^WARN (.*)$/) {
log_msg('warn', "Warning received from polled client, closing: $1\n");
close_fh($fh, 0);
add_poll_host_reconnect_timer();
last;
}
if (exists $out_client_fh{$fh}{'expect_challenge'}) {
if ($line !~ /^CHALLENGERESPONSE (.*)$/) {
log_msg('warn', 'Invalid data from ', $out_client_fh{$fh}{'desc'}, ', expecting challenge', "\n");
close_fh($fh, 0);
add_poll_host_reconnect_timer();
last;
}
if ($1 ne $out_client_fh{$fh}{'expect_challenge'}) {
log_msg('warn', 'Invalid challenge from ', $out_client_fh{$fh}{'desc'}, ', closing', "\n");
close_fh($fh, 0);
add_poll_host_reconnect_timer();
last;
}
log_msg('debug', 'Valid challenge response from ', $out_client_fh{$fh}{'desc'}, "\n");
delete $out_client_fh{$fh}{'expect_challenge'};
next;
}
if ($line eq 'KEEPALIVE-ACK') {
log_msg('debug', 'Received keep alive acknowledge from ', $out_client_fh{$fh}{'desc'}, "\n");
$out_client_fh{$fh}{'keep-alive-count'}--;
next;
}
my ($offset,$type,$key,$value) = ($line =~ /^(\d+):([^:]+):([^:]+):(.+)$/);
if (!defined $value) {
log_msg('warn', 'Invalid data from ', $out_client_fh{$fh}{'desc'}, ', skipping', "\n");
log_msg('debug', 'Invalid data from ', $out_client_fh{$fh}{'desc'}, ': ', quote_string($line), "\n") if $cfg{'debug'};
} else {
my $expect_offset = $out_client_fh{$fh}{'poll_offset'} + length($line) + 1 - length("$offset:");
if ($expect_offset != $offset) {
log_msg('warn', 'Unexpected poll offset from ', $out_client_fh{$fh}{'desc'}, ' - expected ', $expect_offset, ', got ', $offset, "\n");
}
log_msg('debug', 'Apply record from ', $out_client_fh{$fh}{'desc'}, ': ', quote_string($line), "\n") if $cfg{'debug'};
apply_record_from_poll_host($type, $key, $value);
$out_client_fh{$fh}{'poll_offset'} = $offset;
}
}
}
elsif (exists $in_client_fh{$fh}) {
while ($$data =~ s/^([^\r\n]*)\r?\n//) {
my $line = $1;
if (defined $in_client_fh{$fh}{'mode'} && $in_client_fh{$fh}{'mode'} eq 'poll') {
log_msg('debug', 'Received keep alive from ', $in_client_fh{$fh}{'desc'}, "\n");
$mux->write($fh, "KEEPALIVE-ACK\n");
} elsif (defined $in_client_fh{$fh}{'mode'}) {
my ($key,$val) = ($line =~ /^(.*)\t(.*)$/);
if (!defined $key) {
log_msg('warn', "Received invalid line from restore database connection\n");
next;
}
if ($in_client_fh{$fh}{'mode'} eq 'restore-greylist-database') {
$grey_db->{$key} = $val;
} elsif ($in_client_fh{$fh}{'mode'} eq 'restore-awl-database') {
$awl_db->{$key} = $val;
}
} elsif ($line =~ /([^=]+)=(.*)/) {
log_msg('warn', 'Received too long attributes from ',$in_client_fh{$fh}{'desc'},', truncating',"\n") if (length($1) > 512 || length($2) > 512);
$in_client_fh{$fh}{'attrs'}{substr($1,0,512)} = substr($2,0,512);
} elsif ($line eq '') {
if (!$in_client_fh{$fh}{'authed'}) {
my $auth_digest = $in_client_fh{$fh}{'attrs'}{'_'.$::PACKAGE.'_auth_digest'};
if (crypt($cfg{'auth-key'}, $auth_digest) ne $auth_digest) {
log_msg('warn', 'Client ',$in_client_fh{$fh}{'desc'},' failed authentication',"\n");
$mux->write($fh, "WARN authentication failed\n");
close_fh($fh, 1);
return;
}
$in_client_fh{$fh}{'authed'} = 1;
}
handle_client_command($fh, $in_client_fh{$fh});
} else {
log_msg('warn', 'Invalid data received from ',$in_client_fh{$fh}{'desc'},"\n");
log_msg('debug', 'Invalid data received from : ', quote_string($line), "\n") if $cfg{'debug'};
}
}
}
}
#
# quote_string:
#
sub quote_string($) {
my ($s) = @_;
$s =~ s/([^[:print:]])/'\\'.sprintf('x%02x',ord($1))/ge;
return $s;
}
#
# mux_connected: Called when a socket has been connected.
#
sub mux_connected($$$) {
my ($pkg,$mux,$fh) = @_;
my $client = $out_client_fh{$fh};
my ($ip, $port) = ($client->{'ip'}, $client->{'port'});
log_msg('info', 'Connected to ',$client->{'desc'}," ($ip:$port)\n");
my $start_offset = $client->{'poll_offset'};
my $data = '_'.$::PACKAGE."_request=poll\n_".$::PACKAGE."_start_offset=$start_offset\n";
if (defined $cfg{'auth-key'}) {
my $salt0 = join('', ('.', '/', 0..9, 'A'..'Z', 'a'..'z')[rand 64, rand 64]);
my $salt1;
do {
$salt1 = join('', ('.', '/', 0..9, 'A'..'Z', 'a'..'z')[rand 64, rand 64]);
} while ($salt0 eq $salt1);
$client->{'expect_challenge'} = crypt($cfg{'auth-key'}, $salt1);
$data .= '_'.$::PACKAGE.'_auth_digest='.crypt($cfg{'auth-key'}, $salt0)."\n_".$::PACKAGE."_challenge=".$salt1."\n";
}
$mux->write($fh, $data."\n");
$mux->set_timeout($fh, $cfg{'keep-alive-time'});
$client->{'keep-alive-trigger'} = time() + $cfg{'keep-alive-time'};
}
#
# mux_connection: Called when there is a connection on a
# listen socket.
#
sub mux_connection($$$$) {
my ($pkg,$mux,$server_fh,$fh) = @_;
if (defined $loc_socket && $loc_socket == $server_fh) {
$in_client_fh{$fh} = { 'ip' => $fh->peerhost(), 'port' => $fh->peerport(), 'authed' => 1, 'fh' => $fh, 'desc' => $fh->peerhost().':'.$fh->peerport().' (incoming)' };
log_msg('info', 'Local connection from ',$in_client_fh{$fh}{'desc'},"\n");
}
elsif (defined $pub_socket && $pub_socket == $server_fh) {
$in_client_fh{$fh} = { 'ip' => $fh->peerhost(), 'port' => $fh->peerport(), 'authed' => 0, 'fh' => $fh, 'desc' => $fh->peerhost().':'.$fh->peerport().' (incoming)' };
log_msg('info', 'Public connection from ',$in_client_fh{$fh}{'desc'},"\n");
}
else {
log_msg('warn', 'Connection from unknown socket');
return;
}
$fh->sockopt(TCP_NODELAY, 1);
$mux->add($fh);
}
#
# mux_eof: Called when an EOF event is received on a socket
#
sub mux_eof($$$) {
my ($pkg,$mux,$fh) = @_;
if (exists $out_client_fh{$fh}) {
log_msg('warn', 'Disconnected from ',$out_client_fh{$fh}{'desc'},"\n");
close_fh($fh, 0);
add_poll_host_reconnect_timer();
}
elsif (exists $in_client_fh{$fh}) {
log_msg('info', 'Disconnected from ',$in_client_fh{$fh}{'desc'},"\n");
close_fh($fh, 0);
}
else {
log_msg('warn', 'Disconnected from an unknown socket', "\n");
$mux->remove($fh);
}
}
#
# mux_error: Called on I/O error
#
sub mux_error($$$$) {
my ($pkg,$mux,$fh,$cmd) = @_;
if (defined $fh) {
if (exists $out_client_fh{$fh}) {
log_msg('warn', $cmd,' failed on ',$out_client_fh{$fh}{'desc'},': ', $!,"\n");
close_fh($fh, 0);
add_poll_host_reconnect_timer();
}
elsif (exists $in_client_fh{$fh}) {
log_msg('warn', $cmd,' failed on ',$in_client_fh{$fh}{'desc'},': ',$!,"\n");
close_fh($fh, 0);
}
else {
log_msg('warn', $cmd,' failed on an unknown socket: ',$!,"\n");
$mux->remove($fh);
}
} else {
log_msg('warn', 'Muxer error during ',$cmd,': ',$!,"\n");
$mux->end();
}
}
#
# handle_client_command: Handle a complete command from a client
#
sub handle_client_command {
my ($fh,$client) = @_;
if (exists $client->{'attrs'}{'request'}) {
my $attrs = $client->{'attrs'};
my $request = $attrs->{'request'};
if ($request eq 'smtpd_access_policy') {
my ($action,$reason) = smtpd_access_policy($attrs);
my ($actionword) = ($action =~ /^([^ ]+)( |$)/);
log_msg('info', 'Access policy '.$actionword.' - '.$reason."\n");
if ($cfg{'debug'}) {
log_msg('debug', 'Received request, result action='.$action."\n");
foreach my $key (sort keys %{$attrs}) {
log_msg('debug', " $key=$attrs->{$key}\n");
}
}
$mux->write($fh, "action=$action\n\n");
close_fh($fh, 1) if $cfg{'exim'};
} else {
log_msg('warn', "Unsupported request '$request'\n");
}
}
elsif (exists $client->{'attrs'}{'_'.$::PACKAGE.'_request'}) {
my $request = $client->{'attrs'}{'_'.$::PACKAGE.'_request'};
if ($request eq 'poll') {
$client->{'mode'} = 'poll';
my $salt = $client->{'attrs'}{'_'.$::PACKAGE.'_challenge'};
$mux->write($fh, 'CHALLENGERESPONSE '.crypt($cfg{'auth-key'}, $salt)."\n");
my $so = $client->{'attrs'}{'_'.$::PACKAGE.'_start_offset'};
if ($so > $last_offset) {
log_msg('warn', $client->{'desc'},' attempted to seek beyond log end, internal error?', "\n");
} elsif ($so < $last_offset) {
log_msg('warn', $client->{'desc'},' attempted to seek beyond log start, host down too long?',"\n") if $so < $first_offset;
my $offset = ($so < $first_offset ? 0 : $so - $first_offset);
if (!seek($actionlog_fh, length("$first_offset\n") + $offset - 1, Fcntl::SEEK_SET)) {
log_msg('warn', "action-log: cannot seek in file: $!\n");
} else {
my $skip = length <$actionlog_fh>; # Skip previous or partial line
log_msg('warn', $client->{'desc'},' sought to incorrect offset (off by ',$skip,' bytes)',"\n") if $skip != 1;
$offset += $skip - 1;
while (<$actionlog_fh>) {
$offset += length $_;
$mux->write($fh, ($offset+$first_offset).':'.$_);
}
}
}
} elsif ($request eq 'rotate-action-log') {
my $retain = (exists $client->{'attrs'}{'retain'} ? $client->{'attrs'}{'retain'} : 10000);
log_msg('info', "Received command '$request'\n");
rotate_action_log($retain);
$mux->write($fh, "rc=0\n");
close_fh($fh, 1);
} elsif ($request eq 'restore-greylist-database') {
$client->{'mode'} = $request;
} elsif ($request eq 'restore-awl-database') {
$client->{'mode'} = $request;
} elsif ($request eq 'dump-greylist-database') {
$mux->write($fh, "rc=0\n");
while (my ($key, $value) = each %{$grey_db}) {
$mux->write($fh, $key."\t".$value."\n");
}
close_fh($fh, 1);
} elsif ($request eq 'dump-awl-database') {
$mux->write($fh, "rc=0\n");
while (my ($key, $value) = each %{$awl_db}) {
$mux->write($fh, $key."\t".$value."\n");
}
close_fh($fh, 1);
} elsif ($request eq 'database-maintenance') {
log_msg('info', "Received command '$request'\n");
my $now = $client->{'_'.$::PACKAGE.'_emulate_time'} || time();
database_maintenance($now);
$mux->write($fh, "rc=0\n");
close_fh($fh, 1);
} elsif ($request eq 'reload-whitelist-files') {
log_msg('info', "Received command '$request'\n");
my $result = reload_client_whitelists(@{$cfg{'whitelist-client-files'}});
$result &= reload_recipient_whitelists(@{$cfg{'whitelist-recipient-files'}});
$mux->write($fh, $result ? "rc=0\n" : "rc=1\nError: Whitelist files have errors, check log\n");
close_fh($fh, 1);
} elsif ($request eq 'write-poll-host-offsets') {
log_msg('info', "Received command '$request'\n");
write_poll_host_offsets();
$mux->write($fh, "rc=0\n");
close_fh($fh, 1);
} elsif ($request eq 'reconnect-poll-hosts') {
log_msg('info', "Received command '$request'\n");
if ($timer_install_time != -1) {
$timer_install_time = -1;
reconnect_poll_hosts();
}
$mux->write($fh, "rc=0\n");
close_fh($fh, 1);
} elsif ($request eq 'terminate') {
log_msg('info', "Received command '$request'\n");
$mux->write($fh, "rc=0\n");
$mux->force_flush($fh);
$mux->end();
} elsif ($request eq 'check-config-file') {
log_msg('info', "Received command '$request'\n");
my $result = read_config_file($opt{'config-file'});
$mux->write($fh, $result ? "rc=0\n" : "rc=1\nError: Config file has errors, check log\n");
close_fh($fh, 1);
} elsif ($request eq 'reload-config-file') {
log_msg('info', "Received command '$request'\n");
my $result = reload_config_file();
$mux->write($fh, $result ? "rc=0\n" : "rc=1\nError: Config file has errors, check log\n");
close_fh($fh, 1);
} elsif ($request eq 'reopen-log-file') {
log_msg('info', "Received command '$request'\n");
my $result = reopen_log_file($cfg{'log-file'}, $cfg{'log-syslog'});
$mux->write($fh, $result ? "rc=0\n" : "rc=1\nError: Command failed, check old log\n");
close_fh($fh, 1);
} elsif ($request eq 'ping') {
log_msg('info', "Received command '$request'\n");
$mux->write($fh, "rc=0\n");
close_fh($fh, 1);
} elsif ($request eq 'debug-status') {
log_msg('info', "Received command '$request'\n");
$mux->write($fh, "rc=0\n");
$mux->write($fh, "\%in_client_fh:\n".Dumper(\%in_client_fh));
$mux->write($fh, "\@out_clients:\n".Dumper(\@out_clients));
$mux->write($fh, "\%cfg:\n".Dumper(\%cfg));
$mux->write($fh, "\@instances:\n".Dumper(\@instances));
$mux->write($fh, "\$first_offset = $first_offset\n");
$mux->write($fh, "\$last_offset = $last_offset\n");
$mux->write($fh, "\$timer_install_time = $timer_install_time\n");
close_fh($fh, 1);
} else {
log_msg('warn', "Unsupported request '$request'\n");
$mux->write($fh, "rc=1\nError: Unsupported request\n");
close_fh($fh, 1);
}
}
else {
log_msg('warn', "No request type specified, closing\n");
$mux->write($fh, "WARN No request specified\n\n");
close_fh($fh, 1);
}
}
#
# handle_signal: Called when a signal is received
#
sub handle_signal($) {
my ($signal) = @_;
$msgo_socket->syswrite(substr($signal, 0, 1), 1); # Ignore errors
}
#
# log_msg: Called to write a message to the log
#
sub log_msg($@) {
my ($severity, @msg) = @_;
print { $log_fh } strftime('%Y-%m-%d %H:%M:%S ', localtime()), $::PROGRAM,'[',$$,']: ', uc $severity, ': ', @msg if (defined $log_fh);
if ($cfg{'log-syslog'}) {
my $syslog_severity = $severity;
$syslog_severity = 'warning' if $severity eq 'warn';
$syslog_severity = 'err' if $severity eq 'error';
syslog($syslog_severity, ($severity eq 'info' ? '' : uc($severity).': ').join('', @msg));
}
}
#
# connect_poll_host: Set up a new connection to a poll host.
#
sub connect_poll_host($) {
my ($client) = @_;
my ($ip, $port) = ($client->{'ip'}, $client->{'port'});
log_msg('info', 'Connecting to ', $client->{'desc'}, " ($ip:$port)\n");
$client->{'fh'} = new IO::Socket::INET('PeerAddr' => $ip, 'PeerPort' => $port, 'Proto' => 'TCP', 'Blocking' => 0);
if (!defined $client->{'fh'}) {
log_msg('warn', 'Cannot create socket for ', $client->{'desc'}, ": $!\n");
add_poll_host_reconnect_timer();
return;
}
$out_client_fh{$client->{'fh'}} = $client;
$client->{'keep-alive-count'} = 0;
delete $client->{'keep-alive-trigger'};
$mux->add_unconnected($client->{'fh'});
$mux->set_timeout($client->{'fh'}, ($cfg{'keep-alive-max-lost'}+1) * $cfg{'keep-alive-time'});
}
#
# add_poll_host_reconnect_timer: Add reconnect timer for a poll host
#
sub add_poll_host_reconnect_timer() {
if ($timer_install_time == -1) {
log_msg('info', "Installing reconnect timer, triggered in $cfg{'reconnect-time'} seconds\n");
$timer_install_time = time();
alarm($cfg{'reconnect-time'});
}
}
#
# in_greylist_time_window: Determine if two delivery attempts are inside the
# greylist time window.
#
sub in_grey_time_window($$) {
my ($t0,$t1) = @_;
return ($t0 <= $t1 && $t1 - $t0 >= $cfg{'greylist-min-time'} && $t1 - $t0 <= $cfg{'greylist-max-time'});
}
#
# apply_record_from_poll_host: Apply a greylist or awl record from another poll host
#
sub apply_record_from_poll_host($$$) {
my ($type, $key, $value0) = @_;
if ($type eq 'grey') {
my ($first0,$last0,@other0) = split(/,/, $value0);
if (!defined $first0 || !defined $last0 || $first0 !~ /^\d+$/ || $last0 !~ /^\d+/) {
log_msg('warn', 'Attempting to apply invalid data in greylist database: ', quote_string($key),' -> ',quote_string($value0),"\n");
return;
}
my $value1 = $grey_db->{$key};
if (!defined $value1) {
$grey_db->{$key} = $value0;
} else {
my ($first1,$last1,@other1) = split(/,/, $value1);
# $last0 is likely to be >= $last1, but servers should be time-synchronized
if (in_grey_time_window(min($first0,$first1), max($last0,$last1))) {
$grey_db->{$key} = join(',', min($first0,$first1), max($last0,$last1), @other0);
} elsif (in_grey_time_window(max($first0,$first1), max($last0,$last1))) {
$grey_db->{$key} = join(',', max($first0,$first1), max($last0,$last1), @other0);
} elsif (in_grey_time_window(min($first0,$first1), min($last0,$last1))) {
$grey_db->{$key} = join(',', min($first0,$first1), min($last0,$last1), @other0);
} elsif (in_grey_time_window(max($first0,$first1), min($last0,$last1))) {
$grey_db->{$key} = join(',', max($first0,$first1), min($last0,$last1), @other0);
}
}
} elsif ($type eq 'awl') {
my ($count0,$time0) = split(/,/, $value0);
if (!defined $count0 || !defined $time0 || $count0 !~ /^\d+$/ || $time0 !~ /^\d+/) {
log_msg('warn', 'Attempting to apply invalid data in auto-whitelist database: ', quote_string($key),' -> ',quote_string($value0),"\n");
return;
}
my $value1 = $awl_db->{$key};
if (defined $value1) {
my ($count1,$time1) = split(/,/, $value1);
$awl_db->{$key} = $value0 if $count0 > $count1;
} else {
$awl_db->{$key} = $value0;
}
}
}
#
# apply_record: Apply a greylist or awl record into the database
#
sub apply_record($$$) {
my ($type, $key, $value) = @_;
if ($type eq 'grey') {
$grey_db->{$key} = $value;
} elsif ($type eq 'awl') {
$awl_db->{$key} = $value;
}
my $msg = "$type:$key:$value\n";
print { $actionlog_fh } $msg;
$last_offset += length($msg);
foreach my $fh (keys %in_client_fh) {
my $client = $in_client_fh{$fh};
if (exists $client->{'mode'} && $client->{'mode'} eq 'poll') {
$mux->write($client->{'fh'}, "$last_offset:$msg");
}
}
}
#
# smtpd_access_policy: Act on an access policy request.
# `DUNNO' means positive answer.
#
sub smtpd_access_policy($) {
my ($attr) = @_;
foreach my $re (@whitelist_client_hostname) {
return ('DUNNO', "client hostname $attr->{'client_name'} in static whitelist") if $attr->{'client_name'} =~ $re;
}
foreach my $re (@whitelist_client_ip) {
return ('DUNNO', "client IP address $attr->{'client_address'} in static whitelist") if $attr->{'client_address'} =~ $re;
}
foreach my $re (@whitelist_recipient_email) {
return ('DUNNO', "recipient e-mail address $attr->{'recipient'} in static whitelist") if $attr->{'recipient'} =~ $re;
}
my $now = $attr->{'_'.$::PACKAGE.'_emulate_time'} || time();
my ($awl_key, $awl_value, $awl_count, $awl_last);
if ($cfg{'awl-count'} != 0) {
$awl_key = $attr->{'client_address'};
$awl_value = $awl_db->{$awl_key};
($awl_count, $awl_last) = split(/,/, $awl_value) if defined $awl_value;
if (defined $awl_count && $awl_count >= $cfg{'awl-count'}) {
if ($now >= $awl_last + $cfg{'awl-min-time'}) {
$awl_count++; # for statistics
apply_record('awl', $awl_key, $awl_count.','.$now);
}
return ('DUNNO', "client IP address $attr->{'client_address'} in auto-whitelist");
}
}
my $sender = do_sender_email_substitutions($attr->{'sender'});
my ($client_net, $client_host) = do_client_address_substitutions($attr->{'client_address'}, $attr->{'client_name'});
my $key = lc ($client_net.'/'.$sender.'/'.$attr->{'recipient'});
my $value = $grey_db->{$key};
my $first;
my $last_was_successful = 0;
if (defined $value) {
my $last;
($first, $last) = split(/,/, $value);
if ($last - $first >= $cfg{'greylist-min-time'}) {
$last_was_successful = 1;
} else {
$first = $now if $now - $first > $cfg{'greylist-max-time'}; # outside window
}
} else {
$first = $now;
}
apply_record('grey', $key, $first.','.$now.(defined $client_host ? ','.$client_host: ''));
my $timediff = $cfg{'greylist-min-time'} - ($now - $first);
if ($timediff <= 0 && !$last_was_successful) {
# We are inside the greylist window
if (!defined $awl_last || $now >= $awl_last + $cfg{'awl-min-time'}) {
# We are inside the "awl increase time window"
$awl_count++;
apply_record('awl', $awl_key, $awl_count.','.$now);
log_msg('debug', "Increased auto-whitelist count to $awl_count for $attr->{'client_address'}\n") if $cfg{'debug'};
log_msg('info', 'Auto-whitelisted ', $attr->{'client_address'}, "\n") if $awl_count == $cfg{'awl-count'};
}
}
if ($timediff > 0) {
# We are before the greylist window
my $msg = $cfg{'greylist-message'};
$msg =~ s/\%s/$timediff/;
my $recipient_domain = $attr->{'recipient'};
$recipient_domain =~ s/.*\@//;
$msg =~ s/\%r/$recipient_domain/;
return ($cfg{'greylist-action'}.' '.$msg, '<'.$key.'> not in greylist window');
}
if (!$last_was_successful && $cfg{'prepend-header'} && is_new_instance($attr->{'instance'})) {
#my $client = $attr->{'client_name'} eq 'unknown' ? $attr->{'client_address'} : $attr->{'client_name'};
#log_msg('info', 'Delayed ', ($now-$first), " seconds: client=$client, from=$sender, to=", $attr->{'recipient'}, "\n");
my $date = strftime("%a, %d %b %Y %T %Z", localtime($now));
return ('PREPEND X-Greylist: delayed '.($now-$first).' seconds by '.$::PROGRAM.' '.$::VERSION.' on '.$hostname.'; '.$date, '<'.$key.'> found in greylist database');
}
return ('DUNNO', '<'.$key.'> found in greylist database');
}
#
# is_new_instance: Determine if this request was seen before.
# We will be called multiple times by postfix
#
sub is_new_instance($) {
my ($instance) = @_;
return 1 if !defined $instance; # Exim
return 0 if (grep { $_ eq $instance } @instances) != 0;
unshift @instances, $instance;
pop @instances;
return 1;
}
#
# do_sender_email_substitutions: Fixup sender e-mail address
#
sub do_sender_email_substitutions($) {
my ($addr) = @_;
my ($user, $domain) = split(/@/, $addr, 2);
return $addr if !defined $domain;
# strip extension, used sometimes for mailing-list VERP
$user =~ s/\+.*//;
# replace numbers in VERP addresses with '#' so that
# we don't create a new key for each mail
$user =~ s/\b\d+\b/#/g;
return $user.'@'.$domain;
}
#
# do_client_address_substitutions: Fixup ip and hostname of client
#
sub do_client_address_substitutions($$) {
my ($ip, $hostname) = @_;
return ($ip, undef) if $cfg{'lookup-by-host'};
my @ip = split(/\./, $ip);
return ($ip, undef) if !defined $ip[3];
# skip if it contains the last two IP numbers in the hostname
# (we assume it is a pool of dialup addresses of a provider)
return ($ip, undef) if $hostname =~ /$ip[2]/ && $hostname =~ /$ip[3]/;
return (join('.', @ip[0..2], '0'), $ip[3]);
}
#
# database_maintenance: Clean up and compact database
#
sub database_maintenance($) {
my ($now) = @_;
log_msg('info', "(database-maintenance) Started\n");
# Remove old database log files
$database_env->txn_checkpoint(0, 0) == 0 || log_msg('warn', 'Cannot checkpoint database: '.$BerkeleyDB::Error."\n");
foreach my $file ($database_env->log_archive(DB_ARCH_ABS)) {
log_msg('info', "Removing database log `$file'\n");
unlink($file) || log_msg('warn', "Cannot remove database log `$file': $!\n");
}
# Clean up greylist database
eval {
my $kept_keys = 0;
my @old_keys = ();
while (my ($key, $value) = each %{$grey_db}) {
my ($first, $last) = split(/,/, $value);
if (!defined $first || !defined $last || $first !~ /^\d+$/ || $last !~ /^\d+/) {
log_msg('warn', 'Removing invalid data in greylist database: ', quote_string($key),' -> ',quote_string($value),"\n");
next;
}
if (!defined $last) { # shouldn't happen
push @old_keys, $key;
} elsif($now - $last > $cfg{'greylist-purge-time'}) { # last-seen passed max-age
push @old_keys, $key;
} elsif($last-$first < $cfg{'greylist-min-time'} && $now-$last > $cfg{'greylist-max-time'}) { # no successful entry yet and last seen passed retry-window
push @old_keys, $key;
} else {
$kept_keys++;
}
}
my $txn = $database_env->txn_begin();
die 'Cannot begin transaction: ', $BerkeleyDB::Error, "\n" if $txn == 0;
$grey_db_obj->Txn($txn);
delete $grey_db->{$_} foreach (@old_keys);
log_msg('info', 'Deleted ',scalar(@old_keys),' greylist record(s), kept ',$kept_keys,"\n");
$txn->txn_commit() == 0 || die 'Cannot commit transaction: ', $BerkeleyDB::Error, "\n";
};
warn $@ if $@;
# Cleanup clients auto-whitelist database
eval {
my $kept_keys = 0;
my @old_keys_awl = ();
while (my ($key, $value) = each %{$awl_db}) {
my $awl_last_seen = (split(/,/, $value))[1];
if (!defined $awl_last_seen || $awl_last_seen !~ /^\d+$/) {
log_msg('warn', 'Removing invalid data in auto-whitelist database: ', quote_string($key),' -> ',quote_string($value),"\n");
next;
}
if ($now - $awl_last_seen > $cfg{'awl-purge-time'}) {
push @old_keys_awl, $key;
} else {
$kept_keys++;
}
}
my $txn = $database_env->txn_begin();
die 'Cannot begin transaction: ', $BerkeleyDB::Error, "\n" if $txn == 0;
$awl_db_obj->Txn($txn);
delete $awl_db->{$_} foreach (@old_keys_awl);
log_msg('info', 'Deleted ',scalar(@old_keys_awl),' auto-whitelist record(s), kept ',$kept_keys,"\n");
$txn->txn_commit() == 0 || die 'Cannot commit transaction: ', $BerkeleyDB::Error, "\n";
};
warn $@ if $@;
log_msg('info', "(database-maintenance) Completed\n");
}
#
# rotate_action_log: Rotate logs on disk to free up space
#
sub rotate_action_log {
my ($retain) = @_;
my $actionlog = File::Spec->catfile($cfg{'database-dir'}, 'action-log');
if ($last_offset - $first_offset <= $retain) {
log_msg('info', "(rotate-log) Nothing to rotate, log too small\n");
return 1;
}
eval {
local $SIG{__DIE__} = 'DEFAULT';
die "$actionlog.tmp: cannot remove file: $!\n" if -e "$actionlog.tmp" && !unlink("$actionlog.tmp");
open(my $old_actionlog_fh, '<', $actionlog) || die "$actionlog: cannot open file: $!\n";
open(my $new_actionlog_fh, '+>>', "$actionlog.tmp") || die "$actionlog.tmp: cannot create file: $!\n";
$new_actionlog_fh->autoflush(1);
seek($old_actionlog_fh, -$retain-1, Fcntl::SEEK_END) || die "$actionlog: cannot seek in file: $!\n";
<$old_actionlog_fh>; # Go to first complete line (also skip the first line if we are that close to the beginning of the file)
my $new_first_offset = tell($old_actionlog_fh);
die "$actionlog: cannot get file position: $!\n" if $new_first_offset < 0;
$new_first_offset = $first_offset + $new_first_offset - length("$first_offset\n");
print { $new_actionlog_fh } "$new_first_offset\n";
while (defined (my $line = <$old_actionlog_fh>)) {
print { $new_actionlog_fh } $line;
}
seek($new_actionlog_fh, 0, Fcntl::SEEK_END) || die "$actionlog.tmp: cannot seek in file: $!\n";
rename("$actionlog.tmp", $actionlog) || die "cannot rename `$actionlog.tmp' to `$actionlog': $!\n";
close($old_actionlog_fh); # Ignore errors (not writing)
log_msg('info', "(rotate-log) Removed ".($new_first_offset - $first_offset)." byte(s)\n");
$actionlog_fh = $new_actionlog_fh;
$first_offset = $new_first_offset;
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reload_client_whitelists: Read all client whitelist files
#
sub reload_client_whitelists(@) {
my (@files) = @_;
eval {
local $SIG{__DIE__} = 'DEFAULT';
my @wl_hostname;
my @wl_ip;
foreach my $file (@files) {
next if !-e $file;
open(my $fh, '<', $file) || die "cannot open `$file': $!\n";
while(<$fh>) {
s/#.*$//;
s/^\s+//;
s/\s+$//;
next if $_ eq '';
if (/^\/(\S+)\/$/) { # regular expression
push @wl_hostname, qr{$1}i;
} elsif (/^\d{1,3}(?:\.\d{1,3}){0,3}$/) { # IP address or part of it
push @wl_ip, qr{^$_};
} elsif (/^.*\:.*\:.*$/) { # IPv6?
push @wl_ip, qr{^$_};
} elsif (/^\S+$/) {
push @wl_hostname, qr{\Q$_\E$}i;
} else {
die "$file:$.: invalid line\n";
}
}
close($fh);
}
@whitelist_client_hostname = @wl_hostname;
@whitelist_client_ip = @wl_ip;
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reload_recipient_whitelists: Read all receipient whitelist files
#
sub reload_recipient_whitelists(@) {
my (@files) = @_;
eval {
local $SIG{__DIE__} = 'DEFAULT';
my @wl_email;
foreach my $file (@files) {
next if !-e $file;
open(my $fh, '<', $file) || die "cannot open `$file': $!\n";
while(<$fh>) {
s/#.*$//;
s/^\s+//;
s/\s+$//;
next if $_ eq '';
my ($user, $domain) = split(/\@/, $_, 2);
if (/^\/(\S+)\/$/) { # regular expression
push @wl_email, qr{$1}i;
} elsif (!/^\S+$/) {
die "$file:$.: invalid line\n";
} elsif (defined $domain && $domain ne '') { # user@domain (match also user+extension@domain)
push @wl_email, qr{^\Q$user\E(?:\+[^@]+)?\@\Q$domain\E$}i;
} elsif (defined $domain) { # user@
push @wl_email, qr{^\Q$user\E(?:\+[^@]+)?\@}i;
} else { # domain ($user is the domain)
push @wl_email, qr{\Q$user\E$}i;
}
}
close($fh);
}
@whitelist_recipient_email = @wl_email;
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reinitialize_poll_hosts: Reinitialize out_clients structure
#
sub reinitialize_poll_hosts(@) {
my (@poll_hosts) = @_;
eval {
my @new_out_clients = ();
foreach my $hostport (@poll_hosts) {
my ($host,$port) = ($hostport =~ /^(.*):(.*)$/);
die "$hostport: invalid host specificaton\n" if !defined $host;
my $ip = gethostbyname($host);
die "$host: cannot resolve host address: $!\n" if !defined $ip;
my $offset = $offsets{inet_ntoa($ip).':'.$port} || 0;
push @new_out_clients, {
'ip' => inet_ntoa($ip),
'port' => $port,
'fh' => undef,
'poll_offset' => $offset,
'keep-alive-count' => 0, # Set in connect_poll_host
'desc' => $hostport.' (outgoing)',
};
}
@out_clients = @new_out_clients;
};
return 1 if !$@;
warn $@;
return 0;
}
#
# reopen_database: Close the database if open and open it again
#
sub reopen_database($) {
my ($database_dir) = @_;
eval {
local $SIG{__DIE__} = 'DEFAULT';
# Initialize action log
my $new_actionlog = File::Spec->catfile($database_dir, 'action-log');
open(my $new_actionlog_fh, '+>>', $new_actionlog) || die "cannot open `$new_actionlog': $!\n";
$new_actionlog_fh->autoflush(1); # Not strictly necessary, but does not hurt
my $size = -s $new_actionlog;
die "$new_actionlog: cannot get file size: $!\n" if !defined $size;
my ($new_first_offset, $new_last_offset);
if ($size == 0) {
print { $new_actionlog_fh } "0\n";
$new_first_offset = 0;
$new_last_offset = 0;
} else {
seek($new_actionlog_fh, 0, Fcntl::SEEK_SET) || die "$new_actionlog: cannot seek: $!\n";
$new_first_offset = <$new_actionlog_fh>;
chop $new_first_offset;
my $p0 = tell($new_actionlog_fh);
die "$new_actionlog: cannot get file position: $!\n" if $p0 < 0;
seek($new_actionlog_fh, 0, Fcntl::SEEK_END) || die "$new_actionlog: cannot seek: $!\n";
$new_last_offset = $new_first_offset + $size - $p0;
}
# Initialize poll host offsets
if (keys %offsets != 0) {
write_poll_host_offsets() || die "cannot save old poll host offsets\n";
}
my %new_offsets = ();
my $file = File::Spec->catfile($database_dir, 'poll-host-offsets');
if (-e $file) {
open (my $fh, '<', $file) || die "cannot open `$file': $!\n";
while (<$fh>) {
chomp;
my ($ipport,$offset) = /^(.*)\t(\d+)$/;
die "$file:$.: invalid line\n" if !defined $offset;
$new_offsets{$ipport} = $offset;
}
close($fh);
}
# Initialize database
my $new_database_env = BerkeleyDB::Env->new(
-Home => $database_dir,
-Flags => DB_CREATE|DB_RECOVER|DB_INIT_TXN|DB_INIT_MPOOL|DB_INIT_LOG,
-SetFlags => DB_AUTO_COMMIT,
) or die "cannot create database environment: $!\n";
my $new_grey_db;
my $new_grey_db_obj = tie(%{$new_grey_db}, 'BerkeleyDB::Btree',
-Filename => 'grey.db',
-Flags => DB_CREATE,
-Env => $new_database_env
) or die "cannot create or open greylist database: $!\n";
my $new_awl_db;
my $new_awl_db_obj = tie(%{$new_awl_db}, 'BerkeleyDB::Btree',
-Filename => 'awl.db',
-Flags => DB_CREATE,
-Env => $new_database_env
) or die "cannot create or open auto-whiteliste database: $!\n";
# Clean up old action log variables
close($actionlog_fh) if defined $actionlog_fh;
# Clean up old database variables
$grey_db_obj->close() if defined $grey_db_obj;
$awl_db_obj->close() if defined $awl_db_obj;
# Set action log variables
$actionlog_fh = $new_actionlog_fh;
$first_offset = $new_first_offset;
$last_offset = $new_last_offset;
# Set poll host offset variables
%offsets = %new_offsets;
# Set database variables
$grey_db_obj = $new_grey_db_obj;
$grey_db = $new_grey_db;
$awl_db_obj = $new_awl_db_obj;
$awl_db = $new_awl_db;
$database_env = $new_database_env;
log_msg('info', "Databases in `", $database_dir, "' opened\n");
};
return 1 if !$@;
warn $@;
return 0;
}
#
# Close an outgoing file handle, possibly gracefully
#
sub close_fh($$) {
my ($fh, $gracefully) = @_;
my $client;
if (exists $in_client_fh{$fh}) {
$client = $in_client_fh{$fh};
delete $in_client_fh{$fh};
} else {
$client = $out_client_fh{$fh};
$out_client_fh{$fh}{'fh'} = undef;
delete $out_client_fh{$fh};
}
if ($gracefully) {
log_msg('warn', 'Closing connection with ',$client->{'desc'},' after flushing data',"\n");
$mux->close_when_flushed($fh);
} else {
log_msg('warn', 'Closing connection with ',$client->{'desc'},' immediately',"\n");
$mux->close($fh); # XXX: does this generate errors?
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment