Created
June 27, 2024 14:55
-
-
Save marcoarthur/20c1049a3ccefa3ec69126b2e6334191 to your computer and use it in GitHub Desktop.
continue embracing of rxperl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# DbConnection | |
package DbConnection { | |
use Mojo::Base -base, -strict, -signatures; | |
use RxPerl::Mojo ':all'; | |
use Mojo::Pg; | |
use DDP; | |
has is_closed => 0; | |
has connection => undef; | |
has pg => sub { die 'need pg url' }; | |
sub setup($self) { | |
my $pg = Mojo::Pg->new($self->pg); | |
$self->pg($pg) unless ref $self->pg; | |
return rx_defer( | |
sub { | |
$self->connection( $pg ); | |
$self->terminate if $self->is_closed; | |
return rx_of($self); | |
} | |
); | |
} | |
#RxPerl | |
sub close($self) { | |
$self->is_closed(1); | |
if ( $self->pg ) { | |
my $pg = $self->pg; | |
$self->pg(undef); | |
#return rx_from( Mojo::Promise->new->resolve( $pg->db->disconnect )); | |
return rx_from([ $pg->db->disconnect ]); | |
} | |
return rx_EMPTY; | |
} | |
sub terminate($self) { $self->close; } | |
sub unsubscribe($self) { $self->terminate unless $self->is_closed; } | |
} | |
# DbManager | |
package DbManager { | |
use Mojo::Base -base, -strict, -signatures; | |
use RxPerl::Mojo ':all'; | |
has params => sub { {} }; | |
sub get_connection ($self) { return DbConnection->new( $self->params )->setup; } | |
} | |
# Testing | |
package main; | |
use Mojo::Base -strict, -signatures; | |
use RxPerl::Mojo ':all'; | |
use DDP; | |
my $query = 'SELECT a.n FROM generate_series(1,10) a(n)'; | |
sub blocking { | |
my $manager = DbManager->new( params => {pg => 'postgresql:///?service=demo'} ); | |
my $o = $manager->get_connection->pipe( | |
op_merge_map ( | |
sub($connection, $idx){ | |
my $rx_query = rx_of( $connection->pg->db->query($query)->hashes->to_array->@* ); #run query | |
return rx_concat( | |
$rx_query, | |
$connection->close, | |
); | |
} | |
) | |
); | |
} | |
# TODO:Not working | |
# RxPerl | |
sub non_blocking { | |
my $manager = DbManager->new( params => {pg => 'postgresql:///?service=demo'} ); | |
my $o = $manager->get_connection->pipe( | |
op_merge_map ( | |
sub($connection, $idx){ | |
my $promise = $connection->pg->db->query_p($query); | |
$promise->then( sub ($res) { $res->hashes->to_array->@* } ); | |
my $rx_query = rx_from($promise); | |
return $rx_query; | |
# return rx_concat( | |
# $rx_query, | |
# $connection->close, | |
# ); | |
} | |
) | |
); | |
} | |
my $def_obs = { | |
next => sub ($x){ p $x }, | |
error => sub ($err){ warn "BIG ERROR: $err" }, | |
complete => sub { print "Completed\n" }, | |
}; | |
# my $bloc = blocking; | |
# $bloc->pipe( op_take(8) )->subscribe($def_obs); | |
my $non = non_blocking; | |
my $p = last_value_from($non) | |
->then( sub { print "Ok\n" } ) | |
->catch( sub ($err) { warn "OOOOOOOHHHHHH" } ); | |
$non->pipe(op_take(8))->subscribe($def_obs); | |
$p->wait; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment