Created
June 27, 2024 16:45
-
-
Save marcoarthur/d47a96be11391e9aa9e89b46cb4aed16 to your computer and use it in GitHub Desktop.
continue rx_perl experiments in a while
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
==> own_operator.pl <== | |
use strict; | |
use warnings; | |
use feature qw(signatures say); | |
use RxPerl::IOAsync ':all'; | |
use Mojo::Util qw(trim); | |
use IO::Async::Loop; | |
use DDP; | |
my $loop = IO::Async::Loop->new; | |
RxPerl::IOAsync::set_loop($loop); | |
my $o = { | |
next => sub ($x){ p $x }, | |
error => sub { p $_[0] }, | |
complete => sub { say "complete"; $loop->stop }, | |
}; | |
sub draw($char) { | |
return my $stream = rx_interval(0.2)->pipe( | |
op_take(25), | |
op_map(sub { rand }), | |
op_map(sub{ "$char" x int($_*65) }), | |
); | |
} | |
draw("-")->subscribe($o); | |
# A pipeable operator is a function that returns a function that accepts a $source observable as input, and outputs another observable. | |
# sub op_draw($char) { | |
# return sub($source) { | |
# return rx_observable->new( | |
# sub ($subscriber) { | |
# my $own = { | |
# %$subscriber, | |
# next => sub { "$char" x int($_*65) }, | |
# } | |
# $source->subscribe($own); | |
# return; | |
# } | |
# ); | |
# } | |
# } | |
$loop->run; | |
==> read_the_keyboard.pl <== | |
use strict; | |
use warnings; | |
package Keyboard { | |
use Mojo::Base 'Mojo::EventEmitter', -strict, -signatures; | |
sub pressed($self, $key) { | |
$self->emit( 'keyup' => $key ); | |
} | |
} | |
package main; | |
use feature qw(signatures say); | |
use RxPerl::IOAsync ':all'; | |
use Term::TermKey::Async qw( FORMAT_VIM KEYMOD_CTRL ); | |
use Mojo::Util qw(trim); | |
use IO::Async::Loop; | |
use DDP; | |
my $loop = IO::Async::Loop->new; | |
RxPerl::IOAsync::set_loop($loop); | |
sub make_observer ($i) { | |
return { | |
next => sub {say "next #$i: "; p $_[0];}, | |
error => sub {say "error #$i: "; p $_[0]}, | |
complete => sub {say "complete #$i"}, | |
}; | |
} | |
# create event source and attach to real source tka | |
my $keyboard = Keyboard->new; | |
my $tka = Term::TermKey::Async->new( | |
term => \*STDIN, | |
on_key => sub ($self, $key){ | |
$keyboard->pressed($self->format_key($key, FORMAT_VIM)); | |
}, | |
); | |
# Saves all typing keyword and reads the keyboard events | |
my $typed = ''; #saved typing | |
# | |
#RxPerl | |
#RxPerl::Guides::CreatingPipeableOperators | |
# | |
rx_from_event_array($keyboard, 'keyup')->pipe( # on keyup event | |
op_map ( sub { $typed .= $_->[0]; return { text => trim $typed } } ), # save any typed char, ignoring leading or ending whitespace | |
op_filter( sub ($txt, $idx) { length($txt->{text}) > 2 } ), # ignores text with less than 2 chars | |
op_debounce_time(0.75), # only after long pausing typing | |
op_distinct_until_key_changed( 'text' ), # pass a new value if after pause typing it changed | |
#op_switch_map( sub ($txt) {} ), | |
)->subscribe( | |
{ | |
next => sub { p $_[0]; $typed = ''}, # clear typed again | |
error => sub { p $_[0]}, | |
complete => sub {say "complete"}, | |
} | |
); | |
$loop->add( $tka ); | |
$loop->run; | |
==> read-async-a-file.pl <== | |
use Mojo::Base -strict, -signatures; | |
use IO::Async::Loop; | |
use IO::Async::FileStream; | |
use IO::All; | |
use DDP; | |
use IO::Prompt; | |
my $loop = IO::Async::Loop->new; | |
my $MNT = "$ENV{HOME}/mnt"; | |
sub watch_for($file_pattern = qr/\.log$/, $path=$MNT, $only_recent = 1) { | |
my @files = map { my $f = io->file($_); } | |
sort {$b->mtime <=> $a->mtime} # sort the most recent modified | |
io->dir($path)->filter(sub { $_->name =~ $file_pattern })->All_Files; | |
return unless @files; | |
my $watcher = sub($filename) { | |
say "will be watching $filename"; | |
return IO::Async::FileStream->new( | |
filename => "$filename", | |
interval => 0.5, | |
on_initial => sub { | |
my ( $self ) = @_; | |
$self->seek_to_last( "\n" ); | |
}, | |
on_read => sub { | |
my ( $self, $buffref ) = @_; | |
while( $$buffref =~ s/^(.*\n)// ) { | |
print "Received a line $1"; | |
} | |
return 0; | |
}, | |
); | |
}; | |
my $streams; | |
if( $only_recent ) { | |
$streams = $files[0] ? [$watcher->($files[0])]: []; | |
} else { | |
$streams = [ map { $watcher->($_) } @files ]; | |
} | |
return $streams; | |
} | |
sub main($search_pattern) { | |
my $filestreams = watch_for(qr/$search_pattern/); | |
INPUT: | |
while ( ! $filestreams ) | |
{ | |
my $msg = "could not found file by this pattern $search_pattern, give another"; | |
$search_pattern = prompt -d => $search_pattern, "$msg: "; | |
last unless $search_pattern; | |
$filestreams = watch_for(qr/$search_pattern/); | |
} | |
exit 0 unless $search_pattern; | |
$loop->add($_) for @$filestreams; | |
$loop->run; | |
} | |
main($ARGV[0] || 'perl'); | |
__END__ | |
=pod | |
=head1 EXAMPLE | |
This shows basic structure to watch for a file change in async fashion, so it won't | |
burn cpu time in a busy while. This uses the IO::Async framework. | |
=cut | |
==> experiments.pl <== | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
# RxPerl | |
# rx_interval(1.4)->pipe( | |
# op_take(5), | |
# )->subscribe( sub ($arg){say "next: ", $arg} ); | |
# rx_of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)->pipe( | |
# op_distinct(), | |
# )->subscribe(sub ($arg) { say "received: $arg" }); | |
# rx_interval(1)->subscribe( | |
# sub ($arg){ | |
# say 'tick'; | |
# } | |
# ); | |
# my $x = rand(); | |
# my $observer = { | |
# next => sub ($arg) { return rand() }, | |
# error => sub ($arg) { say "error: ", $arg}, | |
# complete => sub { say "complete"}, | |
# }; | |
# my $cold = rx_observable->new( | |
# sub ($subscriber) { | |
# $subscriber->next(rand()); | |
# } | |
# ); | |
# my $hot = $cold->publish(); | |
# $cold->subscribe( sub ($arg) { say "Subscriber A: $arg" } ); | |
# $cold->subscribe( sub ($arg) { say "Subscriber B: $arg" } ); | |
#RxPerl::Guides::CreatingObservables | |
#my $mashup = rx_of( 'A'..'Z' ); | |
#RxPerl::Subscription | |
use Data::Fake qw/Core Names Text Dates/; | |
use Mojo::Exception qw(raise); | |
use DDP; | |
my $fdata = fake_hash { | |
name => fake_name, | |
friends => fake_array( fake_int(2,10), fake_name() ), | |
gender => fake_pick(qw/Male Female Other/), | |
}; | |
my $observer = { | |
next => sub ($x) { p $x }, | |
error => sub ($err) { raise 'Oberser::Error', "Error! $err" }, | |
complete => sub { say "complete" }, | |
}; | |
my $users = rx_subject->new; | |
$users->subscribe($observer); | |
rx_interval(0.4) | |
->pipe( op_map( sub{ rand() < 0.3 ? die "Oops" : $fdata->()} ) ) | |
->subscribe($observer); | |
Mojo::IOLoop->start; | |
#$users->next($next->()); | |
# my $data = $generator->(); | |
# my $users = rx_of(@$data); | |
# $users | |
# ->pipe( op_map( sub { uc $_->{name} } ) ) | |
# ->subscribe( sub { p @_ } ) | |
# ; | |
#p $data->[0]; | |
# my $lced = $mashup | |
# ->pipe(op_map(sub { lc })) | |
# ->subscribe( sub ($arg) { say $arg } ); | |
#Mojo::IOLoop->start; | |
==> pipe.pl <== | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use Mojo::Reactor::Epoll; | |
use Data::Fake qw/Core Names Text Dates/; | |
use Mojo::Exception qw(raise); | |
use DDP; | |
use Data::Dumper; | |
our $DEFAULT_OBSERVABLE = { | |
next => sub ($x) { p $x }, | |
error => sub { raise 'Observer::Error', "error !" }, | |
complete => sub { say "complete" }, | |
}; | |
my $observer = $DEFAULT_OBSERVABLE; | |
my $ex = 'rx_zip'; #default example to run | |
my $EXAMPLES = { | |
rx_zip => { | |
code => sub { | |
# [0, 0, 0], [1, 1, 1], [2, 2, 2], complete | |
rx_zip( | |
rx_interval(0.7)->pipe( op_take(3) ), | |
rx_interval(1), | |
rx_interval(2), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# [0, 0, 0], [1, 1, 1], [2, 2, 2], complete | |
), | |
}, | |
rx_timer => { | |
code => sub { | |
# (pause 10 seconds) 0, complete | |
rx_timer(10)->subscribe($observer); | |
# (pause 10 seconds) 0, 1, 2, 3, ... (every 1 second) | |
rx_timer(10, 1)->subscribe($observer); | |
}, | |
output => qq( | |
# (pause 10 seconds) 0, complete | |
# (pause 10 seconds) 0, 1, 2, 3, ... (every 1 second) | |
), | |
}, | |
rx_throw_error => { | |
code => sub { | |
# 0, 1, 2, 3, error: foo | |
rx_concat( | |
rx_interval(1)->pipe( op_take(4) ), | |
rx_throw_error('foo'), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 0, 1, 2, 3, error: foo | |
), | |
}, | |
rx_subject => { | |
code => sub { | |
# 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, complete | |
my $subject = rx_subject->new; | |
$subject->subscribe($observer); | |
# elsewhere... | |
$subject->next($_) for 1 .. 10; | |
$subject->complete; | |
}, | |
output => qq( | |
# 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, complete | |
), | |
}, | |
rx_observable => { | |
code => sub { | |
# 0.578, 0.234, 0.678, ... (every 1 second) | |
my $o = rx_observable->new | |
( | |
sub ($subscriber) | |
{ | |
Mojo::IOLoop->recurring(1, sub { $subscriber->next(rand) }); | |
} | |
); | |
$o->subscribe($observer); | |
}, | |
output => qq{ | |
# 0.578, 0.234, 0.678, ... (every 1 second) | |
}, | |
}, | |
rx_NEVER => { | |
code => sub { | |
# 10, 20, 30 (and no complete) | |
rx_concat( | |
rx_of(10, 20, 30), | |
rx_NEVER, | |
rx_of(40, 50, 60), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 10, 20, 30 (and no complete) | |
), | |
}, | |
rx_EMPTY => { | |
code => sub { | |
# complete | |
rx_EMPTY->subscribe($observer); | |
# 10, 20, 30, 40, 50, 60, complete | |
rx_concat( | |
rx_of(10, 20, 30), | |
rx_EMPTY, | |
rx_EMPTY, | |
rx_EMPTY, | |
rx_of(40, 50, 60), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# complete | |
# 10, 20, 30, 40, 50, 60, complete | |
), | |
}, | |
rx_replay_subject => { | |
code => sub { | |
# 20, 30, 40, 50, complete | |
my $rs = rx_replay_subject(2); | |
$rs->next(10); | |
$rs->next(20); | |
$rs->next(30); | |
$rs->subscribe($observer); | |
$rs->next(40); | |
$rs->next(50); | |
$rs->complete; | |
# my $rs = rx_replay_subject(2, 3); # params: buffer_size, window_time | |
}, | |
output => qq( | |
# 20, 30, 40, 50, complete | |
), | |
}, | |
rx_on_error_resume_next => { | |
code => sub { | |
# 1, 2, 3, 10, 20, 30, complete | |
rx_on_error_resume_next( | |
rx_of(1, 2, 3)->pipe( op_concat_with(rx_throw_error('foo')) ), | |
rx_throw_error('bar'), | |
rx_of(10, 20, 30), | |
rx_throw_error('baz'), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 1, 2, 3, 10, 20, 30, complete | |
), | |
}, | |
rx_NEVER => { | |
code => sub { | |
# 10, 20, 30 (and no complete) | |
rx_concat( | |
rx_of(10, 20, 30), | |
rx_NEVER, | |
rx_of(40, 50, 60), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 10, 20, 30 (and no complete) | |
), | |
}, | |
rx_merge => { | |
code => sub { | |
# 0, 0, 1, 1, 2, 3, 2, 4, 3, ... | |
rx_merge( | |
rx_interval(0.7), | |
rx_interval(1), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 0, 0, 1, 1, 2, 3, 2, 4, 3, ... | |
), | |
}, | |
rx_iif => { | |
code => sub { | |
my $i; | |
my $o = rx_iif( | |
sub { $i > 5 }, | |
rx_of(1, 2, 3), | |
rx_of(10, 20, 30), | |
); | |
$i = 4; | |
# 10, 20, 30, complete | |
$o->subscribe($observer); | |
$i = 6; | |
# 1, 2, 3, complete | |
$o->subscribe($observer); | |
}, | |
output => qq( | |
# 10, 20, 30, complete | |
# 1, 2, 3, complete | |
), | |
}, | |
rx_generate => { | |
code => sub { | |
# 2, 5, 10, 17, 26 | |
rx_generate( | |
1, # initializer | |
sub ($x) { $x <= 5 }, # check, and can also use $_ here | |
sub ($x) { $x + 1 }, # iterate, and can also use $_ here | |
sub ($x) { $x ** 2 + 1 }, # result selector (optional), and can also use $_ here | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 2, 5, 10, 17, 26 | |
), | |
}, | |
rx_from => { | |
code => sub { | |
# 10, 20, 30, complete | |
rx_from([10, 20, 30])->subscribe($observer); | |
}, | |
output => qq( | |
# 10, 20, 30, complete | |
), | |
}, | |
rx_concat => { | |
code => sub { | |
# 10, 20, 30, 10, 20, 30, 40, complete | |
rx_concat( | |
rx_of(10, 20, 30), | |
rx_of(10, 20, 30, 40), | |
)->subscribe($observer); | |
}, | |
output => qq( | |
# 10, 20, 30, 10, 20, 30, 40, complete | |
), | |
}, | |
rx_partition => | |
{ | |
code => sub { | |
my $source = rx_interval(1)->pipe( op_take(10) ); | |
my ($o1, $o2) = rx_partition( | |
$source, | |
sub ($value, $index) { $value % 2 == 1 }, | |
); | |
rx_concat($o1, $o2)->subscribe($observer); | |
}, | |
output => qq{ | |
# 1, 3, 5, 7, 9, 0, 2, 4, 6, 8, complete | |
}, | |
}, | |
rx_race => { | |
code => sub { | |
rx_race( | |
rx_interval(1)->pipe( op_map(sub {$_[0] * 100})), | |
rx_interval(0.7)->pipe( op_map(sub {$_[0] * 10})), | |
)->subscribe($observer); | |
}, | |
output => qq{ | |
# 0, 10, 20, 30, ... (every 0.7 seconds) | |
}, | |
}, | |
rx_range => { | |
code => sub { | |
rx_range(10, 7)->subscribe($observer); | |
}, | |
output => qq{ | |
# 10, 11, 12, 13, 14, 15, 16, complete | |
}, | |
}, | |
rx_fork_join => { | |
code => sub { | |
# [30, 3, 'c'], complete | |
rx_fork_join([ | |
rx_of(10, 20, 30), | |
rx_of(1, 2, 3), | |
rx_of('a', 'b', 'c'), | |
])->subscribe($observer); | |
# {x => 30, y => 3, z => 'c'}, complete | |
rx_fork_join({ | |
x => rx_of(10, 20, 30), | |
y => rx_of(1, 2, 3), | |
z => rx_of('a', 'b', 'c'), | |
})->subscribe($observer); | |
}, | |
output => qq| | |
# [30, 3, 'c'], complete | |
# {x => 30, y => 3, z => 'c'}, complete | |
|, | |
}, | |
rx_defer => { | |
code => sub { | |
my $special_var; | |
my $o = rx_defer(sub { | |
return $special_var ? rx_of(10, 20, 30) : rx_of(40, 50, 60) | |
}); | |
# 10, 20, 30, complete | |
$special_var = 1; | |
$o->subscribe($observer); | |
# 40, 50, 60, complete | |
$special_var = 0; | |
$o->subscribe($observer); | |
}, | |
output => qq( | |
# 10, 20, 30, complete | |
# 40, 50, 60, complete | |
), | |
}, | |
rx_combine_latest => { | |
code => sub { | |
# [0, 0], [0, 1], [1, 1], [1, 2], [1, 3], ... | |
rx_combine_latest([ | |
rx_interval(1), | |
rx_interval(0.7), | |
])->subscribe($observer); | |
}, | |
output => qq{ | |
# [0, 0], [0, 1], [1, 1], [1, 2], [1, 3], ... | |
}, | |
}, | |
rx_behavior_subject => { | |
code => sub { | |
# 10,20, 30, complete | |
my $b_s = rx_behavior_subject->new(10); | |
$b_s->subscribe($observer); | |
$b_s->next(20); | |
$b_s->next(30); | |
$b_s->complete; | |
}, | |
output => qq{ | |
# 10,20, 30, complete | |
}, | |
}, | |
}; | |
sub run_all { | |
run_this($_) for keys %$EXAMPLES; | |
} | |
sub run_this($example) { | |
my ($code, $output) = @{ $EXAMPLES->{$example} }{qw(code output)}; | |
$Data::Dumper::Deparse = 1; # help deparse subref | |
say "Example: $example"; | |
my $sub = Dumper $code; | |
$sub =~ s/\$VAR1 =//g; # delete the $VAR1 = | |
$sub =~ s/.*use .*//xmg; # delete any use statement | |
say $sub; # show the code | |
say "Output ", $output; | |
# execute the code; | |
$code->(); | |
} | |
sub run_random_if_not { | |
# choose random if not set an example | |
my @all = keys %$EXAMPLES; | |
$ex //= $all[ int(rand($#all)) ]; | |
run_this($ex); | |
} | |
run_random_if_not; | |
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; | |
__END__ | |
=pod | |
=head1 CONCEPTS | |
RxPerl is based on RxJS that is an attractive concept that deals with the stream | |
of values, and its processing by functional tools. Pining the central idea in | |
the Observables. It states: | |
"Think of RxJS as Lodash for events." | |
ReactiveX combines the **Observer pattern** with the **Iterator pattern** and | |
**functional programming** with collections to fill the need for an ideal way of | |
managing sequences of events. | |
=over | |
=item Observable: | |
represents the idea of an invokable collection of future values or events. | |
=item Observer: | |
is a collection of callbacks that knows how to listen to values delivered by | |
the Observable. | |
=item Subscription: | |
represents the execution of an Observable, is primarily useful for cancelling | |
the execution. | |
=item Operators: | |
are pure functions that enable a functional programming style of dealing with | |
collections with operations like map, filter, concat, reduce, etc. | |
=item Subject: | |
is equivalent to an EventEmitter, and the only way of multicasting a value or | |
event to multiple Observers. | |
=item Schedulers: | |
are centralized dispatchers to control concurrency, allowing us to coordinate | |
when computation happens on e.g. setTimeout or requestAnimationFrame or others. | |
=back | |
=cut | |
==> rx-stream-from-future-event.pl <== | |
use strictures 2; | |
package TailEvent { | |
use Mojo::Base 'Mojo::EventEmitter', -strict, -signatures; | |
use IO::Async::FileStream; | |
has ioloop => sub { die "needs io loop" }; | |
has file => sub { die "needs a file" }; | |
has poll_interval => 0.5; | |
sub new_line($self, $line) { | |
$self->emit( on_arrival => { data => $line, epoch => time } ); | |
} | |
sub setup($self) { | |
my $stream = IO::Async::FileStream->new | |
( | |
filename => $self->file, | |
interval => $self->poll_interval, | |
on_initial => sub { | |
my ( $stream ) = @_; | |
$stream->seek_to_last( "\n" ); | |
}, | |
on_read => sub { | |
my ( undef, $buffref ) = @_; | |
while( $$buffref =~ s/^(.*\n)// ) { | |
$self->new_line($1); | |
} | |
return 0; | |
}, | |
); | |
$self->ioloop->add($stream); | |
} | |
sub new($class, @args) { | |
my $self = $class->SUPER::new(@args); | |
$self->setup; | |
return $self; | |
} | |
1; | |
} | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::IOAsync qw(:all); | |
use IO::Async::Loop; | |
use List::Util qw(first); | |
use IO::All; | |
use Mojo::Exception qw(raise); | |
use DDP; | |
our $DEF_OBSERVER = { | |
next => sub ($x) { p $x }, | |
error => sub { raise 'Observer::Error', "error !" }, | |
complete => sub { say "complete" }, | |
}; | |
our $MNT = "$ENV{HOME}/mnt"; | |
# find the most recent modified file matching $pattern on $path | |
sub recent_modified($pattern=qr/perl\.log/, $path=$MNT) { | |
my $recent = | |
first { defined $_ } | |
sort {$b->mtime <=> $a->mtime} # sort by modified time | |
io->dir($path)->filter(sub { $_->name =~ $pattern })->All_Files; | |
return "$recent"; | |
} | |
sub main($fpattern=$ARGV[0]) { | |
my $loop = IO::Async::Loop->new; | |
RxPerl::IOAsync::set_loop($loop); | |
my $tail = TailEvent->new( | |
file => recent_modified($fpattern), | |
ioloop => $loop | |
); | |
my $source = rx_from_event($tail, 'on_arrival'); | |
$source->subscribe($DEF_OBSERVER); | |
$loop->run; | |
} | |
main; | |
=pod | |
=head1 EXAMPLE | |
This exemplifies the construction of a "tail reading" for watching files in an | |
asynchronously manner. Noteworthy is the use of an event-based paradigm, | |
defining the event, separating it from the processing of the event. | |
The solution pattern is depicted as following: | |
=over 4 | |
=item | |
C<TailEvent> uses C<IO::Async::FileStream> in order to watch data arriving, | |
which thus emit the event 'on_arrival' and pass the event's data for the | |
observers of this event. | |
=item | |
The use of C<RxPerl> (C<rx_from_event>) to grab the event data, and observers | |
to react to it (saving, printing, resending, ...), in this example only | |
printing it. | |
=back | |
=cut | |
==> open-file-for-streaming.pl <== | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::IOAsync ':all'; | |
use IO::Async::Loop; | |
use IO::All; | |
use Mojo::Exception qw(raise); | |
use DateTime::Format::Strptime; | |
use DDP; | |
our $MNT = "$ENV{HOME}/mnt/"; | |
our $DEFAULT = { | |
next => sub ($x) { p $x }, | |
#next => sub ($x) { print $x }, # why this don't work ? | |
error => sub { raise 'Observer::Error', "error !" }, | |
complete => sub { say "complete" }, | |
}; | |
sub stream_from_files( | |
$file_pattern = qr/.*\.log/, | |
$path = $MNT, | |
) | |
{ | |
my @streams = | |
map { rx_of( $_->getlines )} | |
map { io->file($_) } | |
io->dir($path)->filter(sub { $_->name =~ $file_pattern })->All_Files; | |
return rx_concat(@streams); | |
} | |
sub parse_msg($msg) { | |
my $parts = [ $msg =~ m/^(.{19}) 0 <([^>]+)>(.+)$/ ]; | |
my $dt_parser = DateTime::Format::Strptime->new( | |
pattern => '%Y-%m-%dT%H:%M:%S', | |
on_error => 'croak', | |
); | |
if ( $parts->[0] ) { | |
$parts->[0] = $dt_parser->parse_datetime($parts->[0]); | |
} | |
return $parts; | |
} | |
sub main { | |
my $stream = stream_from_files(qr/#io-async\.log/); | |
$stream->pipe( | |
op_map( sub { parse_msg($_) }) ) | |
->subscribe($DEFAULT); | |
} | |
my $loop = IO::Async::Loop->new; | |
RxPerl::IOAsync::set_loop($loop); | |
main(); | |
$loop->run; | |
__END__ | |
=pod | |
=head1 EXAMPLE | |
Open files for streaming its contents (lines) as a stream of line data. | |
This examplifies the creation of a sequence of lines get from files, but this | |
ends and not waits for more lines to arrive on these files. | |
=cut | |
==> infinite-stream.pl <== | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use Data::Fake qw/Core Names Text Dates/; | |
use Mojo::Exception qw(raise); | |
use DDP; | |
our $DEFAULT = { | |
next => sub ($x) { p $x }, | |
#next => sub ($x) { print $x }, # why this don't work ? | |
error => sub { raise 'Observer::Error', "error !" }, | |
complete => sub { say "complete" }, | |
}; | |
sub fake_char($from = ['a'..'z','A'..'Z']) | |
{ | |
my $max = scalar(@$from) - 1; | |
my $index = fake_int(0,$max); | |
return sub{ $from->[$index->()] }; | |
} | |
sub stream_genetic( $rate = rand(), $observer = $DEFAULT ) | |
{ | |
say "rate: $rate/s"; | |
my $chain = fake_char([qw(A G C T)]); | |
my $o = rx_interval($rate); | |
$o->pipe( op_map( sub { $chain->() } ) )->subscribe($observer); | |
return $o; | |
} | |
my $stream = stream_genetic; | |
$stream->subscribe( sub{ state $count = 0; $count++; say $count } ); | |
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; | |
__END__ | |
=pod | |
=head1 EXAMPLE | |
this examplifies the creation of an infinite sequence of letters, the important | |
call is C<rx_interval($rate)> that creates an observable sequence of events at | |
$rate (in seconds). | |
=cut |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment