Skip to content

Instantly share code, notes, and snippets.

@kazeburo
Created June 13, 2011 07:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kazeburo/1022403 to your computer and use it in GitHub Desktop.
Save kazeburo/1022403 to your computer and use it in GitHub Desktop.
#!/usr/bin/env perl
use strict;
use warnings;
use Time::HiRes;
use Email::MIME;
use Email::Address;
use Email::Address::Loose -override;
use Parallel::Prefork;
use Path::Class qw(dir);
use Fcntl qw/:DEFAULT :seek :flock/;
use Log::Minimal;
use Getopt::Long;
use List::Util qw/reduce/;
use FileHandle;
use Try::Tiny;
use Filesys::Notify::Simple;
my $maildir;
GetOptions(
"d|dir=s" => \$maildir,
);
$maildir = dir($maildir);
die "no maildir: $0 -d maildir" if ( !$maildir || ! -d $maildir);
my $newdir = $maildir->subdir('new');
die "no maildir/new: $0 -d maildir" if ( ! -d "$newdir");
my $pm = Parallel::Prefork->new({
max_workers => 5,
trap_signals => {
TERM => 'TERM',
HUP => 'TERM',
USR1 => undef,
}
});
while ($pm->signal_received ne 'TERM') {
$pm->start(sub {
my $loop = 100;
while ( $loop ) {
my $path = eval {
reduce {
$a->stat->mtime < $b->stat->mtime ? $a : $b
} grep { !$_->is_dir } $newdir->children( no_hidden => 1 );
};
if ( !$path && !$@ ) {
my $watcher = Filesys::Notify::Simple->new(["$newdir"]);
$watcher->wait(sub{});
next;
}
next if !$path;
open( my $fh, '+<', "$path" ) or next;
$fh->autoflush(1);
flock($fh, LOCK_EX|LOCK_NB) or next;
-s $fh or next;
$loop--;
debugf("[%d] %s start", $$, $path);
my $src = do { local $/; <$fh> };
my $mail = Email::MIME->new($src);
my ($deliverd_to) = $mail->header('Delivered-To');
my ($sender) = $mail->header('Return-Path');
($sender) = Email::Address->parse($sender);
infof("[%d] %s from %s", $$, $deliverd_to, $sender);
try {
# ...
1;
} catch {
warnf "[%d] %s from %s error: %s", $$, $deliverd_to, $sender, $_;
Time::HiRes::sleep(1);
utime(undef,undef,$path);
next;
};
debugf("[%d] %s from %s / %s done", $$, $deliverd_to, $sender, $path);
seek($fh, 0, SEEK_SET);
truncate($fh, 0);
unlink($path);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment