Skip to content

Instantly share code, notes, and snippets.

@FGasper
Last active September 17, 2019 12:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FGasper/1583e59368c5d1a98e8c858fbf9c4cf6 to your computer and use it in GitHub Desktop.
Save FGasper/1583e59368c5d1a98e8c858fbf9c4cf6 to your computer and use it in GitHub Desktop.
Demonstration of promises-based wrapper around Net::Curl
#!/usr/bin/env perl
# A demonstration. An epoll implementation of Net::Curl::Promiser is below.
package main;
use strict;
use warnings;
use Net::Curl::Easy qw(:constants);
use Linux::Perl::epoll ();
my @urls = (
'http://perl.com',
'http://metacpan.org',
);
my $epoll = Linux::Perl::epoll->new();
#----------------------------------------------------------------------
# Add the handles to the Promiser object.
my $promiser = My::Curl::Epoll->new($epoll);
for my $url (@urls) {
my $handle = Net::Curl::Easy->new();
$handle->setopt( CURLOPT_URL() => $url );
$handle->setopt( CURLOPT_FOLLOWLOCATION() => 1 );
$promiser->add_handle($handle)->then(
sub { print "$url completed.$/" },
sub { warn "$url: " . shift },
);
}
#----------------------------------------------------------------------
while ($promiser->handles()) {
my $timeout = $promiser->get_timeout();
my @events = !$timeout ? () : $epoll->wait(
maxevents => 10,
timeout => $timeout / 1000,
);
if (@events) {
my (@rcv, @snd);
while ( my ($fd, $evts_num) = splice @events, 0, 2 ) {
if ($evts_num & $epoll->EVENT_NUMBER()->{'IN'}) {
push @rcv, $fd;
}
if ($evts_num & $epoll->EVENT_NUMBER()->{'OUT'}) {
push @snd, $fd;
}
}
$promiser->process( \@snd, \@rcv );
}
else {
$promiser->time_out();
}
}
#----------------------------------------------------------------------
package My::Curl::Epoll;
use parent 'Net::Curl::Promiser';
sub new {
my ($class, $epoll) = @_;
my $self = $class->SUPER::new();
$self->{'_epoll'} = $epoll;
return $self;
}
sub _set_epoll {
my ($self, $fd, @events) = @_;
if ( exists $self->{'_fds'}{$fd} ) {
$self->{'_epoll'}->modify( $fd, events => \@events );
}
else {
$self->{'_epoll'}->add( $fd, events => \@events );
$self->{'_fds'}{$fd} = undef;
}
return;
}
sub _SET_POLL_IN {
my ($self, $fd) = @_;
return $self->_set_epoll( $fd, 'IN' );
}
sub _SET_POLL_OUT {
my ($self, $fd) = @_;
return $self->_set_epoll( $fd, 'OUT' );
}
sub _SET_POLL_INOUT {
my ($self, $fd) = @_;
return $self->_set_epoll( $fd, 'IN', 'OUT' );
}
sub _STOP_POLL {
my ($self, $fd) = @_;
if ( delete $self->{'_fds'}{$fd} ) {
$self->{'_epoll'}->delete( $fd );
}
return;
}
1;
# The base class; an implementation will extend this to support whatever event mechanism, e.g., epoll.
package Net::Curl::Promiser;
use strict;
use warnings;
use Promise::ES6 ();
use Net::Curl::Multi ();
use constant _DEFAULT_TIMEOUT => 1000;
sub new {
my ($class) = @_;
my %props = (
callbacks => {},
to_fail => {},
);
my $self = bless \%props, $class;
my $curl = Net::Curl::Multi->new();
$self->{'curl'} = $curl;
$curl->setopt(
Net::Curl::Multi::CURLMOPT_SOCKETDATA,
$self,
);
$curl->setopt(
Net::Curl::Multi::CURLMOPT_SOCKETFUNCTION,
\&_socket_fn,
);
if (my $timer_fn = $class->can('_ON_TIMEOUT_CHANGE')) {
$curl->setopt(
Net::Curl::Multi::CURLMOPT_TIMERDATA,
$self,
);
$curl->setopt(
Net::Curl::Multi::CURLMOPT_TIMERFUNCTION,
$timer_fn,
);
}
return $self;
}
sub time_out {
my ($self) = @_;
my $is_active = $self->{'curl'}->socket_action( Net::Curl::Multi::CURL_SOCKET_TIMEOUT() );
$self->_process_pending();
return $is_active;
}
sub process {
my ($self, $send_fds_ar, $receive_fds_ar) = @_;
for my $fd (@$send_fds_ar) {
$self->{'curl'}->socket_action( $fd, Net::Curl::Multi::CURL_CSELECT_OUT() );
}
for my $fd (@$receive_fds_ar) {
$self->{'curl'}->socket_action( $fd, Net::Curl::Multi::CURL_CSELECT_IN() );
}
$self->_process_pending();
return $self;
}
sub handles {
return shift()->{'curl'}->handles();
}
sub get_timeout {
my ($self) = @_;
my $timeout = $self->{'curl'}->timeout();
return( $timeout < 0 ? _DEFAULT_TIMEOUT() : $timeout );
}
sub add_handle {
my ($self, $easy) = @_;
$self->{'curl'}->add_handle($easy);
my $promise = Promise::ES6->new( sub {
$self->{'callbacks'}{$easy} = \@_;
} );
return $promise;
}
sub fail_handle {
my ($self, $easy, $reason) = @_;
$self->{'to_fail'}{$easy} = [ $easy, $reason ];
return;
}
#----------------------------------------------------------------------
sub _socket_fn {
my ( $multi, $easy, $fd, $action, $assign, $self ) = @_;
if ($action == Net::Curl::Multi::CURL_POLL_IN) {
$self->_SET_POLL_IN($fd);
}
elsif ($action == Net::Curl::Multi::CURL_POLL_OUT) {
$self->_SET_POLL_OUT($fd);
}
elsif ($action == Net::Curl::Multi::CURL_POLL_INOUT) {
$self->_SET_POLL_INOUT($fd);
}
elsif ($action == Net::Curl::Multi::CURL_POLL_REMOVE) {
$self->_STOP_POLL($fd);
}
return 0;
}
sub _socket_action {
my ($self, $fd, $direction) = @_;
my $is_active = $self->{'curl'}->socket_action( $fd, $direction );
$self->_process_pending();
return $is_active;
}
sub _finish_handle {
my ($self, $easy, $cb_idx, $payload) = @_;
delete $self->{'to_fail'}{$easy};
$self->{'curl'}->remove_handle( $easy );
if ( my $cb_ar = delete $self->{'callbacks'}{$easy} ) {
$cb_ar->[$cb_idx]->($payload);
}
return;
}
sub _clear_failed {
my ($self) = @_;
for my $easy_str ( keys %{ $self->{'to_fail'} } ) {
my $val_ar = delete $self->{'to_fail'}{$easy_str};
my ($easy, $reason) = @$val_ar;
$self->_finish_handle( $easy, 1, $reason );
}
return;
}
sub _process_pending {
my ($self) = @_;
$self->_clear_failed();
while ( my ( $msg, $easy, $result ) = $self->{'curl'}->info_read() ) {
if ($msg != Net::Curl::Multi::CURLMSG_DONE()) {
die "Unrecognized info_read() message: [$msg]";
}
if ( my $val_ar = delete $self->{'to_fail'}{$easy} ) {
my ($easy, $reason) = @$val_ar;
$self->_finish_handle( $easy, 1, $reason );
}
else {
$self->_finish_handle(
$easy,
($result == 0) ? ( 0 => $easy ) : ( 1 => $result ),
);
}
}
return;
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment