Last active
December 28, 2023 15:55
-
-
Save marcoarthur/3fac5a60180c4924a3433264bef0f9e9 to your computer and use it in GitHub Desktop.
reactive for event handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strictures 2; | |
package TailEvent { | |
use Mojo::Base 'Mojo::EventEmitter', -strict, -signatures; | |
use IO::Async::FileStream; | |
has ioloop => sub { die "needs io loop" }; | |
has file => sub { die "needs a file" }; | |
has poll_interval => 0.5; | |
sub new_line($self, $line) { | |
$self->emit( on_arrival => { data => $line, epoch => time } ); | |
} | |
sub setup($self) { | |
my $stream = IO::Async::FileStream->new( | |
filename => $self->file, | |
interval => $self->poll_interval, | |
on_initial => sub { | |
my ( $stream ) = @_; | |
$stream->seek_to_last( "\n" ); | |
}, | |
on_read => sub { | |
my ( undef, $buffref ) = @_; | |
while( $$buffref =~ s/^(.*\n)// ) { | |
$self->new_line($1); | |
} | |
return 0; | |
}, | |
); | |
$self->ioloop->add($stream); | |
} | |
sub new($class, @args) { | |
my $self = $class->SUPER::new(@args); | |
$self->setup; | |
return $self; | |
} | |
1; | |
} | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::IOAsync qw(:all); | |
use IO::Async::Loop; | |
use List::Util qw(first); | |
use IO::All; | |
use Mojo::Exception qw(raise); | |
use DDP; | |
our $DEF_OBSERVER = { | |
next => sub ($x) { p $x }, | |
error => sub { raise 'Observer::Error', "error !" }, | |
complete => sub { say "complete" }, | |
}; | |
our $MNT = "$ENV{HOME}/mnt"; | |
# find the most recent modified file matching $pattern on $path | |
sub recent_modified($pattern=qr/perl\.log/, $path=$MNT) { | |
my $recent = | |
first { defined $_ } | |
sort {$b->mtime <=> $a->mtime} # sort by modified time | |
io->dir($path)->filter(sub { $_->name =~ $pattern })->All_Files; | |
return "$recent"; | |
} | |
sub main($fpattern=$ARGV[0]) { | |
my $loop = IO::Async::Loop->new; | |
RxPerl::IOAsync::set_loop($loop); | |
my $tail = TailEvent->new( | |
file => recent_modified($fpattern), | |
ioloop => $loop | |
); | |
my $source = rx_from_event($tail, 'on_arrival'); | |
$source->subscribe($DEF_OBSERVER); | |
$loop->run; | |
} | |
main; | |
=pod | |
=head1 EXAMPLE | |
This exemplifies the construction of a "tail reading" for watching files in an | |
asynchronously manner. Noteworthy is the use of an event-based paradigm, | |
defining the event, separating it from the processing of the event. | |
The solution pattern is depicted as following: | |
=over 4 | |
=item | |
C<TailEvent> uses C<IO::Async::FileStream> in order to watch data arriving, | |
which thus emit the event 'on_arrival' and pass the event's data for the | |
observers of this event. | |
=item | |
The use of C<RxPerl> (C<rx_from_event>) to grab the event data, and observers | |
to react to it (saving, printing, resending, ...), in this example only | |
printing it. | |
=back | |
=cut |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment