Skip to content

Instantly share code, notes, and snippets.

@hiratara
Created October 23, 2011 08:42
Show Gist options
  • Save hiratara/1307113 to your computer and use it in GitHub Desktop.
Save hiratara/1307113 to your computer and use it in GitHub Desktop.
use strict;
use warnings;
{
package AsyncArrow;
use Scalar::Util qw/weaken/;
use AnyEvent;
use Exporter qw/import/;
use Class::Accessor::Lite new => 1, rw => ['code'];
our @EXPORT = qw/repeat done/;
sub repeat_request(;$) { {repeat => 1, value => shift} }
sub done_request(;$) { {done => 1, value => shift} }
sub arr {
my ($class, $f) = @_;
$class->new(code => sub {
my ($v, $progress, $cont) = @_;
$cont->($f->($v), $progress);
});
}
sub compose {
my ($self, $other) = @_;
return (ref $self)->new(code => sub {
my ($v, $progress, $cont) = @_;
$self->code->($v, $progress, sub {
my ($v, $progress) = @_;
$other->code->($v, $progress, $cont);
});
});
}
sub run {
my ($self, $v, $progress) = @_;
$progress ||= ProgressArrow->new_without_args;
$self->code->($v, $progress, sub {
my ($v, $progress) = @_;
return;
});
return $progress;
}
sub repeat {
my $self = shift;
my $weaken_loop;
my $loop = sub {
my ($v, $progress, $cont) = @_;
$self->code->($v, $progress, sub {
my ($v, $progress) = @_;
if ($v->{repeat}) {
my $canceller;
my $t = AE::timer 0, 0, sub {
$progress->advance($canceller);
$weaken_loop->($v->{value}, $progress, $cont);
};
$progress->add_canceller($canceller = sub { undef $t });
} elsif ($v->{done}) {
$cont->($v->{value}, $progress);
} else { die }
});
};
weaken($weaken_loop = $loop);
(ref $self)->new(code => $loop);
}
}
{
package ProgressArrow;
use parent qw/-norequire AsyncArrow/;
use Scalar::Util qw/weaken/;
use Class::Accessor::Lite rw => ['cancellers', 'observers'];
sub new_without_args {
my $class = shift;
my $self = $class->new(
cancellers => [],
observers => [],
);
weaken(my $weaken_self = $self);
$self->code(sub {
my ($v, $progress, $cont) = @_;
push @{$weaken_self->observers}, sub {
my ($v) = @_;
$cont->($v, $progress);
};
});
return $self;
}
sub add_canceller {
my $self = shift;
push @{$self->cancellers}, @_;
}
sub advance {
my ($self, $canceller) = @_;
@{$self->cancellers} = grep {$canceller != $_} @{$self->cancellers};
while (my $observer = shift @{$self->observers}) {
$observer->();
}
}
sub cancel {
my ($self, $canceller) = @_;
while (my $cancel = shift @{$self->cancellers}) {
$cancel->();
}
}
}
use AnyEvent;
use AnyEvent::Handle;
sub inputted_line() {
AsyncArrow->new(code => sub {
my ($v, $progress, $cont) = @_;
my $canceller;
my $hdl = AnyEvent::Handle->new(
fh => \*STDIN,
on_error => sub {
my ($hdl, $fatal, $message) = @_;
$hdl->destroy;
$! and warn "$message(fatal=$fatal)";
$cont->();
},
on_read => sub {
my ($hdl) = @_;
$hdl->push_read(line => sub {
my ($hdl, $line, $eol) = @_;
$progress->advance($canceller);
$canceller->();
$cont->($line, $progress);
});
},
);
$progress->add_canceller($canceller = sub { undef $hdl });
});
}
my $cv = AE::cv;
my $total_length = 0;
my $p = inputted_line->compose(AsyncArrow->arr(sub {
if (defined $_[0]) {
$total_length += length $_[0];
print "INPUT: $_[0]\n";
AsyncArrow::repeat_request;
} else {
print "DONE\n";
AsyncArrow::done_request;
}
}))->repeat
->compose(AsyncArrow->arr(sub { $cv->send }))
->run;
$p->compose(AsyncArrow->arr(sub {
print "total length: $total_length\n"
}))->compose(AsyncArrow->arr(\&AsyncArrow::repeat_request))
->repeat
->run;
$cv->recv;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment