Skip to content

Instantly share code, notes, and snippets.

@marcoarthur
Created June 27, 2024 14:55
Show Gist options
  • Save marcoarthur/20c1049a3ccefa3ec69126b2e6334191 to your computer and use it in GitHub Desktop.
Save marcoarthur/20c1049a3ccefa3ec69126b2e6334191 to your computer and use it in GitHub Desktop.
continue embracing of rxperl
# DbConnection
package DbConnection {
use Mojo::Base -base, -strict, -signatures;
use RxPerl::Mojo ':all';
use Mojo::Pg;
use DDP;
has is_closed => 0;
has connection => undef;
has pg => sub { die 'need pg url' };
sub setup($self) {
my $pg = Mojo::Pg->new($self->pg);
$self->pg($pg) unless ref $self->pg;
return rx_defer(
sub {
$self->connection( $pg );
$self->terminate if $self->is_closed;
return rx_of($self);
}
);
}
#RxPerl
sub close($self) {
$self->is_closed(1);
if ( $self->pg ) {
my $pg = $self->pg;
$self->pg(undef);
#return rx_from( Mojo::Promise->new->resolve( $pg->db->disconnect ));
return rx_from([ $pg->db->disconnect ]);
}
return rx_EMPTY;
}
sub terminate($self) { $self->close; }
sub unsubscribe($self) { $self->terminate unless $self->is_closed; }
}
# DbManager
package DbManager {
use Mojo::Base -base, -strict, -signatures;
use RxPerl::Mojo ':all';
has params => sub { {} };
sub get_connection ($self) { return DbConnection->new( $self->params )->setup; }
}
# Testing
package main;
use Mojo::Base -strict, -signatures;
use RxPerl::Mojo ':all';
use DDP;
my $query = 'SELECT a.n FROM generate_series(1,10) a(n)';
sub blocking {
my $manager = DbManager->new( params => {pg => 'postgresql:///?service=demo'} );
my $o = $manager->get_connection->pipe(
op_merge_map (
sub($connection, $idx){
my $rx_query = rx_of( $connection->pg->db->query($query)->hashes->to_array->@* ); #run query
return rx_concat(
$rx_query,
$connection->close,
);
}
)
);
}
# TODO:Not working
# RxPerl
sub non_blocking {
my $manager = DbManager->new( params => {pg => 'postgresql:///?service=demo'} );
my $o = $manager->get_connection->pipe(
op_merge_map (
sub($connection, $idx){
my $promise = $connection->pg->db->query_p($query);
$promise->then( sub ($res) { $res->hashes->to_array->@* } );
my $rx_query = rx_from($promise);
return $rx_query;
# return rx_concat(
# $rx_query,
# $connection->close,
# );
}
)
);
}
my $def_obs = {
next => sub ($x){ p $x },
error => sub ($err){ warn "BIG ERROR: $err" },
complete => sub { print "Completed\n" },
};
# my $bloc = blocking;
# $bloc->pipe( op_take(8) )->subscribe($def_obs);
my $non = non_blocking;
my $p = last_value_from($non)
->then( sub { print "Ok\n" } )
->catch( sub ($err) { warn "OOOOOOOHHHHHH" } );
$non->pipe(op_take(8))->subscribe($def_obs);
$p->wait;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment