Skip to content

Instantly share code, notes, and snippets.

@marcoarthur
Created June 27, 2024 16:45
Show Gist options
  • Save marcoarthur/d47a96be11391e9aa9e89b46cb4aed16 to your computer and use it in GitHub Desktop.
Save marcoarthur/d47a96be11391e9aa9e89b46cb4aed16 to your computer and use it in GitHub Desktop.
continue rx_perl experiments in a while
==> own_operator.pl <==
use strict;
use warnings;
use feature qw(signatures say);
use RxPerl::IOAsync ':all';
use Mojo::Util qw(trim);
use IO::Async::Loop;
use DDP;
my $loop = IO::Async::Loop->new;
RxPerl::IOAsync::set_loop($loop);
my $o = {
next => sub ($x){ p $x },
error => sub { p $_[0] },
complete => sub { say "complete"; $loop->stop },
};
sub draw($char) {
return my $stream = rx_interval(0.2)->pipe(
op_take(25),
op_map(sub { rand }),
op_map(sub{ "$char" x int($_*65) }),
);
}
draw("-")->subscribe($o);
# A pipeable operator is a function that returns a function that accepts a $source observable as input, and outputs another observable.
# sub op_draw($char) {
# return sub($source) {
# return rx_observable->new(
# sub ($subscriber) {
# my $own = {
# %$subscriber,
# next => sub { "$char" x int($_*65) },
# }
# $source->subscribe($own);
# return;
# }
# );
# }
# }
$loop->run;
==> read_the_keyboard.pl <==
use strict;
use warnings;
package Keyboard {
use Mojo::Base 'Mojo::EventEmitter', -strict, -signatures;
sub pressed($self, $key) {
$self->emit( 'keyup' => $key );
}
}
package main;
use feature qw(signatures say);
use RxPerl::IOAsync ':all';
use Term::TermKey::Async qw( FORMAT_VIM KEYMOD_CTRL );
use Mojo::Util qw(trim);
use IO::Async::Loop;
use DDP;
my $loop = IO::Async::Loop->new;
RxPerl::IOAsync::set_loop($loop);
sub make_observer ($i) {
return {
next => sub {say "next #$i: "; p $_[0];},
error => sub {say "error #$i: "; p $_[0]},
complete => sub {say "complete #$i"},
};
}
# create event source and attach to real source tka
my $keyboard = Keyboard->new;
my $tka = Term::TermKey::Async->new(
term => \*STDIN,
on_key => sub ($self, $key){
$keyboard->pressed($self->format_key($key, FORMAT_VIM));
},
);
# Saves all typing keyword and reads the keyboard events
my $typed = ''; #saved typing
#
#RxPerl
#RxPerl::Guides::CreatingPipeableOperators
#
rx_from_event_array($keyboard, 'keyup')->pipe( # on keyup event
op_map ( sub { $typed .= $_->[0]; return { text => trim $typed } } ), # save any typed char, ignoring leading or ending whitespace
op_filter( sub ($txt, $idx) { length($txt->{text}) > 2 } ), # ignores text with less than 2 chars
op_debounce_time(0.75), # only after long pausing typing
op_distinct_until_key_changed( 'text' ), # pass a new value if after pause typing it changed
#op_switch_map( sub ($txt) {} ),
)->subscribe(
{
next => sub { p $_[0]; $typed = ''}, # clear typed again
error => sub { p $_[0]},
complete => sub {say "complete"},
}
);
$loop->add( $tka );
$loop->run;
==> read-async-a-file.pl <==
use Mojo::Base -strict, -signatures;
use IO::Async::Loop;
use IO::Async::FileStream;
use IO::All;
use DDP;
use IO::Prompt;
my $loop = IO::Async::Loop->new;
my $MNT = "$ENV{HOME}/mnt";
sub watch_for($file_pattern = qr/\.log$/, $path=$MNT, $only_recent = 1) {
my @files = map { my $f = io->file($_); }
sort {$b->mtime <=> $a->mtime} # sort the most recent modified
io->dir($path)->filter(sub { $_->name =~ $file_pattern })->All_Files;
return unless @files;
my $watcher = sub($filename) {
say "will be watching $filename";
return IO::Async::FileStream->new(
filename => "$filename",
interval => 0.5,
on_initial => sub {
my ( $self ) = @_;
$self->seek_to_last( "\n" );
},
on_read => sub {
my ( $self, $buffref ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
print "Received a line $1";
}
return 0;
},
);
};
my $streams;
if( $only_recent ) {
$streams = $files[0] ? [$watcher->($files[0])]: [];
} else {
$streams = [ map { $watcher->($_) } @files ];
}
return $streams;
}
sub main($search_pattern) {
my $filestreams = watch_for(qr/$search_pattern/);
INPUT:
while ( ! $filestreams )
{
my $msg = "could not found file by this pattern $search_pattern, give another";
$search_pattern = prompt -d => $search_pattern, "$msg: ";
last unless $search_pattern;
$filestreams = watch_for(qr/$search_pattern/);
}
exit 0 unless $search_pattern;
$loop->add($_) for @$filestreams;
$loop->run;
}
main($ARGV[0] || 'perl');
__END__
=pod
=head1 EXAMPLE
This shows basic structure to watch for a file change in async fashion, so it won't
burn cpu time in a busy while. This uses the IO::Async framework.
=cut
==> experiments.pl <==
use Mojo::Base -strict, -signatures;
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
# RxPerl
# rx_interval(1.4)->pipe(
# op_take(5),
# )->subscribe( sub ($arg){say "next: ", $arg} );
# rx_of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)->pipe(
# op_distinct(),
# )->subscribe(sub ($arg) { say "received: $arg" });
# rx_interval(1)->subscribe(
# sub ($arg){
# say 'tick';
# }
# );
# my $x = rand();
# my $observer = {
# next => sub ($arg) { return rand() },
# error => sub ($arg) { say "error: ", $arg},
# complete => sub { say "complete"},
# };
# my $cold = rx_observable->new(
# sub ($subscriber) {
# $subscriber->next(rand());
# }
# );
# my $hot = $cold->publish();
# $cold->subscribe( sub ($arg) { say "Subscriber A: $arg" } );
# $cold->subscribe( sub ($arg) { say "Subscriber B: $arg" } );
#RxPerl::Guides::CreatingObservables
#my $mashup = rx_of( 'A'..'Z' );
#RxPerl::Subscription
use Data::Fake qw/Core Names Text Dates/;
use Mojo::Exception qw(raise);
use DDP;
my $fdata = fake_hash {
name => fake_name,
friends => fake_array( fake_int(2,10), fake_name() ),
gender => fake_pick(qw/Male Female Other/),
};
my $observer = {
next => sub ($x) { p $x },
error => sub ($err) { raise 'Oberser::Error', "Error! $err" },
complete => sub { say "complete" },
};
my $users = rx_subject->new;
$users->subscribe($observer);
rx_interval(0.4)
->pipe( op_map( sub{ rand() < 0.3 ? die "Oops" : $fdata->()} ) )
->subscribe($observer);
Mojo::IOLoop->start;
#$users->next($next->());
# my $data = $generator->();
# my $users = rx_of(@$data);
# $users
# ->pipe( op_map( sub { uc $_->{name} } ) )
# ->subscribe( sub { p @_ } )
# ;
#p $data->[0];
# my $lced = $mashup
# ->pipe(op_map(sub { lc }))
# ->subscribe( sub ($arg) { say $arg } );
#Mojo::IOLoop->start;
==> pipe.pl <==
use Mojo::Base -strict, -signatures;
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use Mojo::Reactor::Epoll;
use Data::Fake qw/Core Names Text Dates/;
use Mojo::Exception qw(raise);
use DDP;
use Data::Dumper;
our $DEFAULT_OBSERVABLE = {
next => sub ($x) { p $x },
error => sub { raise 'Observer::Error', "error !" },
complete => sub { say "complete" },
};
my $observer = $DEFAULT_OBSERVABLE;
my $ex = 'rx_zip'; #default example to run
my $EXAMPLES = {
rx_zip => {
code => sub {
# [0, 0, 0], [1, 1, 1], [2, 2, 2], complete
rx_zip(
rx_interval(0.7)->pipe( op_take(3) ),
rx_interval(1),
rx_interval(2),
)->subscribe($observer);
},
output => qq(
# [0, 0, 0], [1, 1, 1], [2, 2, 2], complete
),
},
rx_timer => {
code => sub {
# (pause 10 seconds) 0, complete
rx_timer(10)->subscribe($observer);
# (pause 10 seconds) 0, 1, 2, 3, ... (every 1 second)
rx_timer(10, 1)->subscribe($observer);
},
output => qq(
# (pause 10 seconds) 0, complete
# (pause 10 seconds) 0, 1, 2, 3, ... (every 1 second)
),
},
rx_throw_error => {
code => sub {
# 0, 1, 2, 3, error: foo
rx_concat(
rx_interval(1)->pipe( op_take(4) ),
rx_throw_error('foo'),
)->subscribe($observer);
},
output => qq(
# 0, 1, 2, 3, error: foo
),
},
rx_subject => {
code => sub {
# 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, complete
my $subject = rx_subject->new;
$subject->subscribe($observer);
# elsewhere...
$subject->next($_) for 1 .. 10;
$subject->complete;
},
output => qq(
# 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, complete
),
},
rx_observable => {
code => sub {
# 0.578, 0.234, 0.678, ... (every 1 second)
my $o = rx_observable->new
(
sub ($subscriber)
{
Mojo::IOLoop->recurring(1, sub { $subscriber->next(rand) });
}
);
$o->subscribe($observer);
},
output => qq{
# 0.578, 0.234, 0.678, ... (every 1 second)
},
},
rx_NEVER => {
code => sub {
# 10, 20, 30 (and no complete)
rx_concat(
rx_of(10, 20, 30),
rx_NEVER,
rx_of(40, 50, 60),
)->subscribe($observer);
},
output => qq(
# 10, 20, 30 (and no complete)
),
},
rx_EMPTY => {
code => sub {
# complete
rx_EMPTY->subscribe($observer);
# 10, 20, 30, 40, 50, 60, complete
rx_concat(
rx_of(10, 20, 30),
rx_EMPTY,
rx_EMPTY,
rx_EMPTY,
rx_of(40, 50, 60),
)->subscribe($observer);
},
output => qq(
# complete
# 10, 20, 30, 40, 50, 60, complete
),
},
rx_replay_subject => {
code => sub {
# 20, 30, 40, 50, complete
my $rs = rx_replay_subject(2);
$rs->next(10);
$rs->next(20);
$rs->next(30);
$rs->subscribe($observer);
$rs->next(40);
$rs->next(50);
$rs->complete;
# my $rs = rx_replay_subject(2, 3); # params: buffer_size, window_time
},
output => qq(
# 20, 30, 40, 50, complete
),
},
rx_on_error_resume_next => {
code => sub {
# 1, 2, 3, 10, 20, 30, complete
rx_on_error_resume_next(
rx_of(1, 2, 3)->pipe( op_concat_with(rx_throw_error('foo')) ),
rx_throw_error('bar'),
rx_of(10, 20, 30),
rx_throw_error('baz'),
)->subscribe($observer);
},
output => qq(
# 1, 2, 3, 10, 20, 30, complete
),
},
rx_NEVER => {
code => sub {
# 10, 20, 30 (and no complete)
rx_concat(
rx_of(10, 20, 30),
rx_NEVER,
rx_of(40, 50, 60),
)->subscribe($observer);
},
output => qq(
# 10, 20, 30 (and no complete)
),
},
rx_merge => {
code => sub {
# 0, 0, 1, 1, 2, 3, 2, 4, 3, ...
rx_merge(
rx_interval(0.7),
rx_interval(1),
)->subscribe($observer);
},
output => qq(
# 0, 0, 1, 1, 2, 3, 2, 4, 3, ...
),
},
rx_iif => {
code => sub {
my $i;
my $o = rx_iif(
sub { $i > 5 },
rx_of(1, 2, 3),
rx_of(10, 20, 30),
);
$i = 4;
# 10, 20, 30, complete
$o->subscribe($observer);
$i = 6;
# 1, 2, 3, complete
$o->subscribe($observer);
},
output => qq(
# 10, 20, 30, complete
# 1, 2, 3, complete
),
},
rx_generate => {
code => sub {
# 2, 5, 10, 17, 26
rx_generate(
1, # initializer
sub ($x) { $x <= 5 }, # check, and can also use $_ here
sub ($x) { $x + 1 }, # iterate, and can also use $_ here
sub ($x) { $x ** 2 + 1 }, # result selector (optional), and can also use $_ here
)->subscribe($observer);
},
output => qq(
# 2, 5, 10, 17, 26
),
},
rx_from => {
code => sub {
# 10, 20, 30, complete
rx_from([10, 20, 30])->subscribe($observer);
},
output => qq(
# 10, 20, 30, complete
),
},
rx_concat => {
code => sub {
# 10, 20, 30, 10, 20, 30, 40, complete
rx_concat(
rx_of(10, 20, 30),
rx_of(10, 20, 30, 40),
)->subscribe($observer);
},
output => qq(
# 10, 20, 30, 10, 20, 30, 40, complete
),
},
rx_partition =>
{
code => sub {
my $source = rx_interval(1)->pipe( op_take(10) );
my ($o1, $o2) = rx_partition(
$source,
sub ($value, $index) { $value % 2 == 1 },
);
rx_concat($o1, $o2)->subscribe($observer);
},
output => qq{
# 1, 3, 5, 7, 9, 0, 2, 4, 6, 8, complete
},
},
rx_race => {
code => sub {
rx_race(
rx_interval(1)->pipe( op_map(sub {$_[0] * 100})),
rx_interval(0.7)->pipe( op_map(sub {$_[0] * 10})),
)->subscribe($observer);
},
output => qq{
# 0, 10, 20, 30, ... (every 0.7 seconds)
},
},
rx_range => {
code => sub {
rx_range(10, 7)->subscribe($observer);
},
output => qq{
# 10, 11, 12, 13, 14, 15, 16, complete
},
},
rx_fork_join => {
code => sub {
# [30, 3, 'c'], complete
rx_fork_join([
rx_of(10, 20, 30),
rx_of(1, 2, 3),
rx_of('a', 'b', 'c'),
])->subscribe($observer);
# {x => 30, y => 3, z => 'c'}, complete
rx_fork_join({
x => rx_of(10, 20, 30),
y => rx_of(1, 2, 3),
z => rx_of('a', 'b', 'c'),
})->subscribe($observer);
},
output => qq|
# [30, 3, 'c'], complete
# {x => 30, y => 3, z => 'c'}, complete
|,
},
rx_defer => {
code => sub {
my $special_var;
my $o = rx_defer(sub {
return $special_var ? rx_of(10, 20, 30) : rx_of(40, 50, 60)
});
# 10, 20, 30, complete
$special_var = 1;
$o->subscribe($observer);
# 40, 50, 60, complete
$special_var = 0;
$o->subscribe($observer);
},
output => qq(
# 10, 20, 30, complete
# 40, 50, 60, complete
),
},
rx_combine_latest => {
code => sub {
# [0, 0], [0, 1], [1, 1], [1, 2], [1, 3], ...
rx_combine_latest([
rx_interval(1),
rx_interval(0.7),
])->subscribe($observer);
},
output => qq{
# [0, 0], [0, 1], [1, 1], [1, 2], [1, 3], ...
},
},
rx_behavior_subject => {
code => sub {
# 10,20, 30, complete
my $b_s = rx_behavior_subject->new(10);
$b_s->subscribe($observer);
$b_s->next(20);
$b_s->next(30);
$b_s->complete;
},
output => qq{
# 10,20, 30, complete
},
},
};
sub run_all {
run_this($_) for keys %$EXAMPLES;
}
sub run_this($example) {
my ($code, $output) = @{ $EXAMPLES->{$example} }{qw(code output)};
$Data::Dumper::Deparse = 1; # help deparse subref
say "Example: $example";
my $sub = Dumper $code;
$sub =~ s/\$VAR1 =//g; # delete the $VAR1 =
$sub =~ s/.*use .*//xmg; # delete any use statement
say $sub; # show the code
say "Output ", $output;
# execute the code;
$code->();
}
sub run_random_if_not {
# choose random if not set an example
my @all = keys %$EXAMPLES;
$ex //= $all[ int(rand($#all)) ];
run_this($ex);
}
run_random_if_not;
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
__END__
=pod
=head1 CONCEPTS
RxPerl is based on RxJS that is an attractive concept that deals with the stream
of values, and its processing by functional tools. Pining the central idea in
the Observables. It states:
"Think of RxJS as Lodash for events."
ReactiveX combines the **Observer pattern** with the **Iterator pattern** and
**functional programming** with collections to fill the need for an ideal way of
managing sequences of events.
=over
=item Observable:
represents the idea of an invokable collection of future values or events.
=item Observer:
is a collection of callbacks that knows how to listen to values delivered by
the Observable.
=item Subscription:
represents the execution of an Observable, is primarily useful for cancelling
the execution.
=item Operators:
are pure functions that enable a functional programming style of dealing with
collections with operations like map, filter, concat, reduce, etc.
=item Subject:
is equivalent to an EventEmitter, and the only way of multicasting a value or
event to multiple Observers.
=item Schedulers:
are centralized dispatchers to control concurrency, allowing us to coordinate
when computation happens on e.g. setTimeout or requestAnimationFrame or others.
=back
=cut
==> rx-stream-from-future-event.pl <==
use strictures 2;
package TailEvent {
use Mojo::Base 'Mojo::EventEmitter', -strict, -signatures;
use IO::Async::FileStream;
has ioloop => sub { die "needs io loop" };
has file => sub { die "needs a file" };
has poll_interval => 0.5;
sub new_line($self, $line) {
$self->emit( on_arrival => { data => $line, epoch => time } );
}
sub setup($self) {
my $stream = IO::Async::FileStream->new
(
filename => $self->file,
interval => $self->poll_interval,
on_initial => sub {
my ( $stream ) = @_;
$stream->seek_to_last( "\n" );
},
on_read => sub {
my ( undef, $buffref ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
$self->new_line($1);
}
return 0;
},
);
$self->ioloop->add($stream);
}
sub new($class, @args) {
my $self = $class->SUPER::new(@args);
$self->setup;
return $self;
}
1;
}
use Mojo::Base -strict, -signatures;
use RxPerl::IOAsync qw(:all);
use IO::Async::Loop;
use List::Util qw(first);
use IO::All;
use Mojo::Exception qw(raise);
use DDP;
our $DEF_OBSERVER = {
next => sub ($x) { p $x },
error => sub { raise 'Observer::Error', "error !" },
complete => sub { say "complete" },
};
our $MNT = "$ENV{HOME}/mnt";
# find the most recent modified file matching $pattern on $path
sub recent_modified($pattern=qr/perl\.log/, $path=$MNT) {
my $recent =
first { defined $_ }
sort {$b->mtime <=> $a->mtime} # sort by modified time
io->dir($path)->filter(sub { $_->name =~ $pattern })->All_Files;
return "$recent";
}
sub main($fpattern=$ARGV[0]) {
my $loop = IO::Async::Loop->new;
RxPerl::IOAsync::set_loop($loop);
my $tail = TailEvent->new(
file => recent_modified($fpattern),
ioloop => $loop
);
my $source = rx_from_event($tail, 'on_arrival');
$source->subscribe($DEF_OBSERVER);
$loop->run;
}
main;
=pod
=head1 EXAMPLE
This exemplifies the construction of a "tail reading" for watching files in an
asynchronously manner. Noteworthy is the use of an event-based paradigm,
defining the event, separating it from the processing of the event.
The solution pattern is depicted as following:
=over 4
=item
C<TailEvent> uses C<IO::Async::FileStream> in order to watch data arriving,
which thus emit the event 'on_arrival' and pass the event's data for the
observers of this event.
=item
The use of C<RxPerl> (C<rx_from_event>) to grab the event data, and observers
to react to it (saving, printing, resending, ...), in this example only
printing it.
=back
=cut
==> open-file-for-streaming.pl <==
use Mojo::Base -strict, -signatures;
use RxPerl::IOAsync ':all';
use IO::Async::Loop;
use IO::All;
use Mojo::Exception qw(raise);
use DateTime::Format::Strptime;
use DDP;
our $MNT = "$ENV{HOME}/mnt/";
our $DEFAULT = {
next => sub ($x) { p $x },
#next => sub ($x) { print $x }, # why this don't work ?
error => sub { raise 'Observer::Error', "error !" },
complete => sub { say "complete" },
};
sub stream_from_files(
$file_pattern = qr/.*\.log/,
$path = $MNT,
)
{
my @streams =
map { rx_of( $_->getlines )}
map { io->file($_) }
io->dir($path)->filter(sub { $_->name =~ $file_pattern })->All_Files;
return rx_concat(@streams);
}
sub parse_msg($msg) {
my $parts = [ $msg =~ m/^(.{19}) 0 <([^>]+)>(.+)$/ ];
my $dt_parser = DateTime::Format::Strptime->new(
pattern => '%Y-%m-%dT%H:%M:%S',
on_error => 'croak',
);
if ( $parts->[0] ) {
$parts->[0] = $dt_parser->parse_datetime($parts->[0]);
}
return $parts;
}
sub main {
my $stream = stream_from_files(qr/#io-async\.log/);
$stream->pipe(
op_map( sub { parse_msg($_) }) )
->subscribe($DEFAULT);
}
my $loop = IO::Async::Loop->new;
RxPerl::IOAsync::set_loop($loop);
main();
$loop->run;
__END__
=pod
=head1 EXAMPLE
Open files for streaming its contents (lines) as a stream of line data.
This examplifies the creation of a sequence of lines get from files, but this
ends and not waits for more lines to arrive on these files.
=cut
==> infinite-stream.pl <==
use Mojo::Base -strict, -signatures;
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use Data::Fake qw/Core Names Text Dates/;
use Mojo::Exception qw(raise);
use DDP;
our $DEFAULT = {
next => sub ($x) { p $x },
#next => sub ($x) { print $x }, # why this don't work ?
error => sub { raise 'Observer::Error', "error !" },
complete => sub { say "complete" },
};
sub fake_char($from = ['a'..'z','A'..'Z'])
{
my $max = scalar(@$from) - 1;
my $index = fake_int(0,$max);
return sub{ $from->[$index->()] };
}
sub stream_genetic( $rate = rand(), $observer = $DEFAULT )
{
say "rate: $rate/s";
my $chain = fake_char([qw(A G C T)]);
my $o = rx_interval($rate);
$o->pipe( op_map( sub { $chain->() } ) )->subscribe($observer);
return $o;
}
my $stream = stream_genetic;
$stream->subscribe( sub{ state $count = 0; $count++; say $count } );
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
__END__
=pod
=head1 EXAMPLE
this examplifies the creation of an infinite sequence of letters, the important
call is C<rx_interval($rate)> that creates an observable sequence of events at
$rate (in seconds).
=cut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment