Skip to content

Instantly share code, notes, and snippets.

@marcoarthur
Last active December 28, 2023 15:55
Show Gist options
  • Save marcoarthur/3fac5a60180c4924a3433264bef0f9e9 to your computer and use it in GitHub Desktop.
Save marcoarthur/3fac5a60180c4924a3433264bef0f9e9 to your computer and use it in GitHub Desktop.
reactive for event handling
use strictures 2;
package TailEvent {
use Mojo::Base 'Mojo::EventEmitter', -strict, -signatures;
use IO::Async::FileStream;
has ioloop => sub { die "needs io loop" };
has file => sub { die "needs a file" };
has poll_interval => 0.5;
sub new_line($self, $line) {
$self->emit( on_arrival => { data => $line, epoch => time } );
}
sub setup($self) {
my $stream = IO::Async::FileStream->new(
filename => $self->file,
interval => $self->poll_interval,
on_initial => sub {
my ( $stream ) = @_;
$stream->seek_to_last( "\n" );
},
on_read => sub {
my ( undef, $buffref ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
$self->new_line($1);
}
return 0;
},
);
$self->ioloop->add($stream);
}
sub new($class, @args) {
my $self = $class->SUPER::new(@args);
$self->setup;
return $self;
}
1;
}
use Mojo::Base -strict, -signatures;
use RxPerl::IOAsync qw(:all);
use IO::Async::Loop;
use List::Util qw(first);
use IO::All;
use Mojo::Exception qw(raise);
use DDP;
our $DEF_OBSERVER = {
next => sub ($x) { p $x },
error => sub { raise 'Observer::Error', "error !" },
complete => sub { say "complete" },
};
our $MNT = "$ENV{HOME}/mnt";
# find the most recent modified file matching $pattern on $path
sub recent_modified($pattern=qr/perl\.log/, $path=$MNT) {
my $recent =
first { defined $_ }
sort {$b->mtime <=> $a->mtime} # sort by modified time
io->dir($path)->filter(sub { $_->name =~ $pattern })->All_Files;
return "$recent";
}
sub main($fpattern=$ARGV[0]) {
my $loop = IO::Async::Loop->new;
RxPerl::IOAsync::set_loop($loop);
my $tail = TailEvent->new(
file => recent_modified($fpattern),
ioloop => $loop
);
my $source = rx_from_event($tail, 'on_arrival');
$source->subscribe($DEF_OBSERVER);
$loop->run;
}
main;
=pod
=head1 EXAMPLE
This exemplifies the construction of a "tail reading" for watching files in an
asynchronously manner. Noteworthy is the use of an event-based paradigm,
defining the event, separating it from the processing of the event.
The solution pattern is depicted as following:
=over 4
=item
C<TailEvent> uses C<IO::Async::FileStream> in order to watch data arriving,
which thus emit the event 'on_arrival' and pass the event's data for the
observers of this event.
=item
The use of C<RxPerl> (C<rx_from_event>) to grab the event data, and observers
to react to it (saving, printing, resending, ...), in this example only
printing it.
=back
=cut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment