Created
June 27, 2024 14:54
-
-
Save marcoarthur/6293eba2f2a4184340d08a389e73b2f9 to your computer and use it in GitHub Desktop.
experiments with perl rxperl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use feature qw(signatures); | |
my $fact = sub ($n) { | |
return 1 if $n == 0; | |
return $n * __SUB__->($n - 1); | |
}; | |
my $paths = sub ($h) { | |
return '' unless ref $h; | |
my @keys = keys %$h; | |
for (@keys) { | |
$_ .= join '', __SUB__->($h->{$_}); | |
} | |
return @keys; | |
} | |
my $conf = { | |
postgresl => { service => 'car' }, | |
mysql => { dsn => 'user=root;password=123;dbname=root' }, | |
some => { long => { very => {long => 'idea'} } }, | |
}; | |
use feature qw(signatures say); | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use DDP; | |
my $observer = { | |
next => sub ($x){ p $x }, | |
error => sub ($err){ warn "Oh mine: $err" }, | |
complete => sub { say 'complete' }, | |
}; | |
my $i = 0.1; | |
my $result = rx_of('A'..'F')->pipe( | |
op_delay_when( sub{ $i *= 2; rx_timer($i) } ) | |
); | |
#RxPerl | |
# my $result = rx_of(1,2,3) | |
# ->pipe( | |
# op_delay_when( | |
# sub($x,$idx) { | |
# rx_timer(1/$x); | |
# } | |
# ) | |
# ); | |
$result->subscribe( $observer ); | |
Mojo::IOLoop->start; | |
use feature qw(signatures); | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use DDP; | |
my $observer = { | |
next => sub ($x){ p $x }, | |
error => sub ($err){ warn "Oh mine: $err" }, | |
complete => sub { warn 'complete' }, | |
}; | |
#RxPerl | |
my $first = rx_from([10,20,30,40]) | |
->pipe( | |
op_delay_when( | |
sub($x,$idx) { | |
rx_timer($x/10); | |
} | |
) | |
); | |
my $second = rx_from([15,25,35]) | |
->pipe( | |
op_delay_when( | |
sub($x,$idx) { | |
rx_timer($x/10) | |
} | |
) | |
); | |
#RxPerl | |
my $result = rx_combine_latest([$first, $second] ); | |
#my $result = rx_zip($first, $second); | |
$result->subscribe( $observer ); | |
Mojo::IOLoop->start; | |
use feature qw(signatures); | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use DDP; | |
my $observer = { | |
next => sub ($x){ p $x }, | |
error => sub ($err){ warn "Oh mine: $err" }, | |
complete => sub { warn 'complete' }, | |
}; | |
#RxPerl | |
my $result = rx_from([10,20,30]) | |
->pipe( | |
op_delay_when( | |
sub($x,$idx) { | |
rx_timer($x/10); | |
} | |
) | |
); | |
$result->subscribe( $observer ); | |
Mojo::IOLoop->start; | |
use feature qw(signatures); | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use DDP; | |
#RxPerl | |
# sub generate_doubles($seed) { | |
# my $i = $seed; | |
# return sub { return $i = $i*2; } | |
# } | |
my $success = 1; | |
my $promise = Mojo::Promise->new(sub ($resolve, $reject) { | |
Mojo::IOLoop->timer(1 => sub { | |
if ( $success ) { $resolve->('Lucky!') } | |
else { $reject->('Unlucky!') } | |
}); | |
})->then( sub { return "Cool" } );; | |
# my $it = generate_doubles(3); | |
my $result = rx_from($promise); | |
my $observer = { | |
next => sub ($x){ p $x }, | |
error => sub ($err){ warn "Oh mine: $err" }, | |
complete => sub { warn 'complete' }, | |
}; | |
$result->subscribe( $observer ); | |
Mojo::IOLoop->start; | |
use feature qw(signatures); | |
use RxPerl::Mojo ':all'; | |
use Mojo::IOLoop; | |
use DDP; | |
#RxPerl | |
my $letters = rx_of(qw(a b c)); | |
my $result = $letters->pipe( | |
op_merge_map( | |
sub($x, $idx){ rx_interval(1)->pipe( op_map( sub { return $x . $_; } ) ) } | |
), | |
); | |
my $observer = { | |
next => sub ($x){ p $x }, | |
error => sub { warn 'Oh mine' }, | |
complete => sub { warn 'complete' }, | |
}; | |
$result->subscribe( $observer ); | |
Mojo::IOLoop->start; | |
use feature qw(signatures); | |
use RxPerl::Mojo ':all'; | |
use Mojo::Pg; | |
use Mojo::IOLoop; | |
use DDP; | |
my $url = 'postgresql:///?service=demo'; | |
my $pg = Mojo::Pg->new($url); | |
my $query = 'SELECT a.n FROM generate_series(1,10) a(n)'; | |
my $observer = { | |
next => sub ($x){ p $x }, | |
error => sub ($err){ warn "Oh mine: $err" }, | |
complete => sub { warn 'complete' }, | |
}; | |
my $subject = rx_subject->new; | |
my $promise = $pg->db->query_p($query); | |
# use tap to signal to the subject the results of result set | |
my $o = rx_from($promise) | |
->pipe( op_tap( sub($rs) { $subject->next($_) for $rs->hashes->to_array->@* } ) ) | |
->subscribe($observer); | |
# just print out | |
$subject->pipe(op_take(8))->subscribe($observer); | |
Mojo::IOLoop->start; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment