Skip to content

Instantly share code, notes, and snippets.

@renatocron
Created November 1, 2019 21:57
Show Gist options
  • Save renatocron/2de9cacd817b16e22eb137900b395550 to your computer and use it in GitHub Desktop.
Save renatocron/2de9cacd817b16e22eb137900b395550 to your computer and use it in GitHub Desktop.
package MojoX::InsistentPromise;
use strict;
use Carp;
use Mojo::IOLoop;
use Mojo::Promise;
my @retry_timing = (0, 0.5, 1, 3, 7, 13, 21, 34, 55, 60, 120);
use Scalar::Util 'weaken';
sub new {
my $class = shift;
my %opts = @_;
croak 'missing option init' unless $opts{init};
croak 'missing option check_sucess' unless $opts{check_sucess};
my $self = bless {
init => $opts{init},
check_sucess => $opts{check_sucess},
id => exists $opts{id} ? $opts{id} : undef,
retry => exists $opts{retry_timing} ? $opts{retry_timing} : \@retry_timing,
should_continue => $opts{should_continue} ? $opts{should_continue} : sub {1},
max_fail => exists $opts{max_fail} ? $opts{max_fail} : 100,
fail_count => 0,
};
my $ip = Mojo::Promise->new;
$self->{promise} = $ip;
weaken $self->{promise};
my $inner_p = $self->{init}->($self->{id});
$inner_p->then($self->_success_cb())->catch($self->_fail_cb());
return ($self, $self->{promise});
}
sub _exp_retry_fail {
my $self = shift;
my $value = $self->{retry}->[$self->{fail_count} - 1];
return $value if defined $value;
return $self->{retry}[-1] || 15;
}
sub _success_cb {
my ($self) = @_;
return sub {
my ($response) = @_;
if ($self->{check_sucess}->($response, $self->{id})) {
$self->{promise}->resolve($response);
}
else {
$self->{fail_count} = $self->{fail_count} + 1;
if ($self->{fail_count} < $self->{max_fail}) {
my $exp_retry = $self->_exp_retry_fail();
my $inner_p = $self->{init}->($self->{id});
Mojo::IOLoop->timer(
$exp_retry => sub {
$inner_p->then($self->_success_cb())->catch($self->_fail_cb());
}
);
}
else {
$self->{promise}->reject('max_fail reached', $self->{id}, $response);
}
}
return undef;
}
}
sub _fail_cb {
my ($self) = @_;
return sub {
my ($err) = @_;
if ( $self->{should_continue}( $err )) {
$self->{fail_count} = $self->{fail_count} + 1;
if ($self->{fail_count} < $self->{max_fail}) {
my $exp_retry = $self->_exp_retry_fail();
my $inner_p = $self->{init}->($self->{id});
Mojo::Promise->all($inner_p);
Mojo::IOLoop->timer(
$exp_retry => sub {
$inner_p->then($self->_success_cb())->catch($self->_fail_cb());
}
);
}else{
$self->{promise}->reject('toomanyexceptions', $self->{id}, $err);
}
}else{
$self->{promise}->reject('Aborted due to should_continue=false, id=' . $self->{id}, $err);
}
return undef;
};
}
1;
@renatocron
Copy link
Author

 my $promises_id= 0;
 my %current_promises;
 my $verbose = 1;
 my $fatal_errors = [];
 my $max_parallel_requests = 4;

    for my $contact_id (1..1000){
        $promises_id++;
        my ($ipo, $ip) = MojoX::InsistentPromise->new(
            check_sucess => sub {
                my ($tx, $contact_id) = @_;
                log_info(sprintf "check_sucess contact_id %s status-code=%s", $contact_id, $tx->res->to_string)
                  if $verbose && (!$tx->res->code || $tx->res->code != 200);
                return !$tx->res->is_error && $tx->res->code == 200;
            },
            init => sub {
                return $ua->get_p(
                    contact_id => $contact_id,
                )
            },
            should_continue => sub {
                my $errmsg = shift;
                log_info(sprintf "Promise failed %s for contact_id %s ", $errmsg, $contact_id) if $verbose;
                return 0 if $verbose && ASKED_TO_EXIT;
                return 1;
            },
            id => $contact_id
        );
        my $mine_id = $promises_id;

        $current_promises{$mine_id} = $ip;

        $ip->then(
            sub {
                my ($tx) = @_;
                $self->_process_event_response(contact_id => $contact_id, events => $tx->res->json);
            }
          )->catch(
            sub {
                my ($errmsg, $contact_id, $response) = @_;

                push @$fatal_errors, $errmsg, $contact_id;
                push @$fatal_errors, $response->res->to_string if $response;

                log_info(sprintf "Fail promise contact_id %s status-code=%s",
                    $contact_id, $response->res->to_string)
                  if $verbose;
            }
          )->finally(
            sub {
                log_info(sprintf "finished promise number %s", $mine_id) if $verbose;

                delete $current_promises{$mine_id};
            }
          );

        Mojo::Promise->race(values %current_promises)->wait
          if scalar keys %current_promises > $max_parallel_requests;
 }


if (scalar keys %current_promises) {
    $verbose++;
    log_info(sprintf 'waiting for %d promises to finish...', scalar keys %current_promises);
    Mojo::Promise->all(values %current_promises)->wait;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment