Skip to content

Instantly share code, notes, and snippets.

@wakaba wakaba/future.pl
Last active Dec 16, 2015

Embed
What would you like to do?
## DOM futures in Perl (experimental)
##
## Spec: <http://dom.spec.whatwg.org/#futures>
use strict;
use warnings;
package Future;
# "queue a task"
use AnyEvent;
sub _queue ($$) {
my $code = $_[1];
my $timer; $timer = AE::timer 0, 0, sub {
undef $timer;
$code->();
};
} # _queue
# $future->{}
# "future" properties:
# |accept_cbs| isa arrayref
# |reject_cbs| isa arrayref
# |state| = {undef : pending | 1 : accepted | -1 : rejected}
# |result|
# "resolver" properties:
# |resolved| isa boolean
sub _process ($$) {
my ($self, $key) = @_;
while (@{$self->{$key} or []}) {
my $code = shift @{$self->{$key}};
$self->$code ($self->{result}); # XXX exception thrown by $code
}
} # _process
sub _append ($$$) {
my $self = $_[0];
push @{$self->{accept_cbs} ||= []}, $_[1] if defined $_[1];
push @{$self->{reject_cbs} ||= []}, $_[2] if defined $_[2];
if (defined $self->{state}) {
$self->_queue (sub { $self->_process ('accept_cbs') })
if $self->{state} == 1;
$self->_queue (sub { $self->_process ('reject_cbs') })
if $self->{state} == -1;
}
} # _append
sub _future_callback ($$$) {
my (undef, $future, $method) = @_;
return sub {
$future->$method ($_[1], 'sync');
return undef;
};
} # _future_callback
sub _future_wrapper_callback ($$$) {
my (undef, $future, $code) = @_;
return sub {
my ($value, $error);
{
local $@;
$value = eval { $future->$code ($_[1]) };
$error = $@;
};
if ($error) {
$future->_resolver_reject ($error, 'sync');
} else {
$future->_resolver_resolve ($value, 'sync');
}
return undef;
};
} # _future_wrapper_callback
sub _new ($) {
my $self = bless {}, $_[0];
return $self;
} # _new
# Future->new ($init_code)
sub new ($$) {
my $self = $_[0]->_new;
my $resolver = bless \$self, 'Resolver';
my $error;
{
local $@;
eval { $_[1]->($self, $resolver) };
$error = $@;
}
$self->_resolver_reject ($error) if $error;
return $self;
} # new
# $future->then ($accept_code, $reject_code)
sub then ($;$$) {
my $self = (ref $_[0])->_new;
my $accept_wrapper = defined $_[1]
? $self->_future_wrapper_callback ($self, $_[1])
: $self->_future_callback ($self, '_resolver_accept');
my $reject_wrapper = defined $_[2]
? $self->_future_wrapper_callback ($self, $_[2])
: $self->_future_callback ($self, '_resolver_reject');
$_[0]->_append ($accept_wrapper, $reject_wrapper);
return $self;
} # then
# $future->catch ($reject_code)
sub catch ($;$) {
my $self = (ref $_[0])->_new;
my $accept_cb = $self->_future_callback ($self, '_resolver_accept');
my $reject_wrapper = defined $_[1]
? $self->_future_wrapper_callback ($self, $_[1])
: $self->_future_callback ($self, '_resolver_reject');
$_[0]->_append ($accept_cb, $reject_wrapper);
return $self;
} # catch
# $future->done ($resolve_code, $reject_code)
sub done ($;$$) {
$_[0]->_append ($_[1], $_[2]);
return undef;
} # done
sub XXX_wait ($) {
my $cv = AE::cv;
$_[0]->done (sub { $cv->send ($_[1]) }, sub { $cv->croak ($_[1]) });
return $cv->recv;
} # XXX_wait
# Future->accept ($value)
sub accept ($$) {
my $value = $_[1];
return $_[0]->new (sub { $_[1]->accept ($value) });
} # accept
# Future->resolve ($value)
sub resolve ($$) {
my $value = $_[1];
return $_[0]->new (sub { $_[1]->resolve ($value) });
} # resolve
# Future->reject ($value)
sub reject ($$) {
my $value = $_[1];
return $_[0]->new (sub { $_[1]->reject ($value) });
} # reject
# Future->any ($code, ...)
sub any ($@) {
my $class = shift;
my $self = $class->_new;
unless (@_) {
$self->_resolver_resolve (undef);
} else {
my $resolve_cb = $self->_future_callback ($self, '_resolver_resolve');
my $reject_cb = $self->_future_callback ($self, '_resolver_reject');
for my $value (@_) {
my $future = $class->new (sub { $_[1]->resolve ($value) });
$future->_append ($resolve_cb, $reject_cb);
}
}
return $self;
} # any
# Future->every ($code, ...)
sub every ($@) {
my $class = shift;
my $self = $class->_new;
unless (@_) {
$self->_resolver_resolve (undef);
} else {
my $countdown = @_;
my $arguments = [(undef) x $countdown];
my $reject_cb = $self->_future_callback ($self, '_resolver_reject');
for my $index (0..$#_) {
my $value = $_[$index];
my $resolve_cb = sub {
$arguments->[$index] = $_[1];
$countdown--;
$self->_resolver_resolve ($arguments, 'sync') if $countdown == 0;
return undef;
};
my $future = $class->new (sub { $_[1]->resolve ($value) });
$future->_append ($resolve_cb, $reject_cb);
}
}
return $self;
} # every
# Future->some ($code, ...)
sub some ($@) {
my $class = shift;
my $self = $class->_new;
unless (@_) {
$self->_resolver_resolve (undef);
} else {
my $countdown = @_;
my $arguments = [(undef) x $countdown];
my $resolve_cb = $self->_future_callback ($self, '_resolver_resolve');
for my $index (0..$#_) {
my $value = $_[$index];
my $reject_cb = sub {
$arguments->[$index] = $_[1];
$countdown--;
$self->_resolver_reject ($arguments, 'sync') if $countdown == 0;
return undef;
};
my $future = $class->new (sub { $_[1]->resolve ($value) });
$future->_append ($resolve_cb, $reject_cb);
}
}
return $self;
} # some
# $future->_resolver_accept ($value, $sync);
sub _resolver_accept ($$$) {
return if $_[0]->{resolved};
my $future = $_[0];
$future->{state} = 1; # accepted
$future->{result} = $_[1];
$_[0]->{resolved} = 1;
if ($_[2]) { # sync
$future->_process ('accept_cbs');
} else {
$future->_queue (sub { $future->_process ('accept_cbs') });
}
} # _resolver_accept
# $future->_resolver_resolve ($value, $sync)
sub _resolver_resolve ($$$) {
return if $_[0]->{resolved};
my ($then, $error);
if (UNIVERSAL::can ($_[1], 'can')) {
local $@;
$then = eval { $_[1]->can ('then') };
$error = $@;
}
if ($error) {
$_[0]->_resolver_reject ($error, 'sync');
} elsif ($then) {
my $accept_cb = $_[0]->_future_callback ($_[0], '_resolver_resolve');
my $reject_cb = $_[0]->_future_callback ($_[0], '_resolver_reject');
my $error;
{
local $@;
eval { $_[1]->$then ($accept_cb, $reject_cb) };
$error = $@;
}
$_[0]->_resolver_reject ($error, 'sync') if $error;
} else {
$_[0]->_resolver_accept ($_[1], $_[2]);
}
} # _resolver_resolve
# $future->_resolver_reject ($value, $sync)
sub _resolver_reject ($$$) {
return if $_[0]->{resolved};
my $future = $_[0];
$future->{state} = -1; # rejected
$future->{result} = $_[1];
$_[0]->{resolved} = 1;
if ($_[2]) { # sync
$future->_process ('reject_cbs');
} else {
$future->_queue (sub { $future->_process ('reject_cbs') });
}
} # _resolver_reject
package Resolver;
# $resolver->accept ($value)
sub accept ($;$) {
${$_[0]}->_resolver_accept ($_[1]);
return;
} # accept
# $resolver->resolve ($value)
sub resolve ($;$) {
${$_[0]}->_resolver_resolve ($_[1]);
return;
} # resolve
# $resolver->reject ($value)
sub reject ($;$) {
${$_[0]}->_resolver_reject ($_[1]);
return;
} # reject
package main;
sub new_future (&) { return Future->new ($_[0]) }
sub every_future (&) { return Future->every ($_[0]->()) }
sub any_future (&) { return Future->any ($_[0]->()) }
sub some_future (&) { return Future->some ($_[0]->()) }
use AnyEvent::HTTP;
use Data::Dumper;
every_future { map {
my $url = $_;
new_future {
my $resolver = $_[1];
http_request 'GET', $url => sub {
my $headers = $_[1];
if ($headers->{Status} < 400) {
my @url = ($_[0] =~ m{(https?://[\x21\x23-\x7E]+)}g);
$#url = 3 if $#url > 3;
$resolver->accept (\@url);
} else {
$resolver->reject
('Error: ' . $headers->{Status} . ' ' . $headers->{Reason});
}
};
}->then (sub {
any Future map {
my $url = $_;
warn "Extracted URL: <$url>\n";
new Future sub {
my $resolver = $_[1];
http_request 'GET', $url => sub {
my $headers = $_[1];
if ($headers->{Status} < 400) {
$resolver->accept ($headers);
} else {
$resolver->reject
("Error: <$url>: $headers->{Status} $headers->{Reason}");
}
};
};
} @{$_[1]};
});
} @ARGV }->then (sub {
my $headers = $_[1];
warn Dumper $headers;
})->XXX_wait;
1;
# Copyright 2013 Wakaba <wakaba@suikawiki.org>.
#
# This library is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.