Skip to content

Instantly share code, notes, and snippets.

@marcoarthur
Created June 27, 2024 14:54
Show Gist options
  • Save marcoarthur/6293eba2f2a4184340d08a389e73b2f9 to your computer and use it in GitHub Desktop.
Save marcoarthur/6293eba2f2a4184340d08a389e73b2f9 to your computer and use it in GitHub Desktop.
experiments with perl rxperl
use feature qw(signatures);
my $fact = sub ($n) {
return 1 if $n == 0;
return $n * __SUB__->($n - 1);
};
my $paths = sub ($h) {
return '' unless ref $h;
my @keys = keys %$h;
for (@keys) {
$_ .= join '', __SUB__->($h->{$_});
}
return @keys;
}
my $conf = {
postgresl => { service => 'car' },
mysql => { dsn => 'user=root;password=123;dbname=root' },
some => { long => { very => {long => 'idea'} } },
};
use feature qw(signatures say);
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use DDP;
my $observer = {
next => sub ($x){ p $x },
error => sub ($err){ warn "Oh mine: $err" },
complete => sub { say 'complete' },
};
my $i = 0.1;
my $result = rx_of('A'..'F')->pipe(
op_delay_when( sub{ $i *= 2; rx_timer($i) } )
);
#RxPerl
# my $result = rx_of(1,2,3)
# ->pipe(
# op_delay_when(
# sub($x,$idx) {
# rx_timer(1/$x);
# }
# )
# );
$result->subscribe( $observer );
Mojo::IOLoop->start;
use feature qw(signatures);
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use DDP;
my $observer = {
next => sub ($x){ p $x },
error => sub ($err){ warn "Oh mine: $err" },
complete => sub { warn 'complete' },
};
#RxPerl
my $first = rx_from([10,20,30,40])
->pipe(
op_delay_when(
sub($x,$idx) {
rx_timer($x/10);
}
)
);
my $second = rx_from([15,25,35])
->pipe(
op_delay_when(
sub($x,$idx) {
rx_timer($x/10)
}
)
);
#RxPerl
my $result = rx_combine_latest([$first, $second] );
#my $result = rx_zip($first, $second);
$result->subscribe( $observer );
Mojo::IOLoop->start;
use feature qw(signatures);
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use DDP;
my $observer = {
next => sub ($x){ p $x },
error => sub ($err){ warn "Oh mine: $err" },
complete => sub { warn 'complete' },
};
#RxPerl
my $result = rx_from([10,20,30])
->pipe(
op_delay_when(
sub($x,$idx) {
rx_timer($x/10);
}
)
);
$result->subscribe( $observer );
Mojo::IOLoop->start;
use feature qw(signatures);
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use DDP;
#RxPerl
# sub generate_doubles($seed) {
# my $i = $seed;
# return sub { return $i = $i*2; }
# }
my $success = 1;
my $promise = Mojo::Promise->new(sub ($resolve, $reject) {
Mojo::IOLoop->timer(1 => sub {
if ( $success ) { $resolve->('Lucky!') }
else { $reject->('Unlucky!') }
});
})->then( sub { return "Cool" } );;
# my $it = generate_doubles(3);
my $result = rx_from($promise);
my $observer = {
next => sub ($x){ p $x },
error => sub ($err){ warn "Oh mine: $err" },
complete => sub { warn 'complete' },
};
$result->subscribe( $observer );
Mojo::IOLoop->start;
use feature qw(signatures);
use RxPerl::Mojo ':all';
use Mojo::IOLoop;
use DDP;
#RxPerl
my $letters = rx_of(qw(a b c));
my $result = $letters->pipe(
op_merge_map(
sub($x, $idx){ rx_interval(1)->pipe( op_map( sub { return $x . $_; } ) ) }
),
);
my $observer = {
next => sub ($x){ p $x },
error => sub { warn 'Oh mine' },
complete => sub { warn 'complete' },
};
$result->subscribe( $observer );
Mojo::IOLoop->start;
use feature qw(signatures);
use RxPerl::Mojo ':all';
use Mojo::Pg;
use Mojo::IOLoop;
use DDP;
my $url = 'postgresql:///?service=demo';
my $pg = Mojo::Pg->new($url);
my $query = 'SELECT a.n FROM generate_series(1,10) a(n)';
my $observer = {
next => sub ($x){ p $x },
error => sub ($err){ warn "Oh mine: $err" },
complete => sub { warn 'complete' },
};
my $subject = rx_subject->new;
my $promise = $pg->db->query_p($query);
# use tap to signal to the subject the results of result set
my $o = rx_from($promise)
->pipe( op_tap( sub($rs) { $subject->next($_) for $rs->hashes->to_array->@* } ) )
->subscribe($observer);
# just print out
$subject->pipe(op_take(8))->subscribe($observer);
Mojo::IOLoop->start;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment