Load Twitter user_timeline repeatedly using AnyEvent, Async::Defer and Promises
=head1 NAME
This script demonstrates how we can use Async::Defer or Promises
module to send requests to repeatedly in an asynchronous
way. The goal is to load a complete user_timeline that expands over
multiple pages. To do that, we have to send multiple requests and the
number of requests cannot be determined in advance. This is where
Async::Defer comes in pretty handy.
=head1 BUGS
Sorry, this example uses Twitter API v1.0, so it won't work with the
newer API.
If you are willing to use the newer API, try to modify asyncTwitter().
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Util;
use JSON;
use Encode;
use Carp;
use Promises 0.02;
use Async::Defer 0.9.3;
use DateTime;
use DateTime::Format::Strptime;
my $twitter_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $plain_parser = DateTime::Format::Strptime->new(pattern => '%Y-%m-%d %H:%M:%S %z');
screen_name of the user whose timeline is to be loaded.
A string representing the "threshold" time of loading. The script will
load statuses whose timestamps (created_at fields) are newer or equal
to the threshold. This can be C<undef>.
Maximum pages to be loaded. The script never loads beyond this page.
use constant {
USER_SCREEN_NAME => 'itmedia',
THRESHOLD_DATETIME_STR => '2012-11-30 12:00:00 +0000',
=head2 main ()
The entry point. It calls C<promisePages> and C<deferPages()> to load
the user_timeline.
sub main {
my $cv;
$cv = AnyEvent->condvar;
print "-------------- Load by Promises\n";
my $statuses = shift;
}, sub {
warn "Load error";
return Promises::Deferred->new()->resolve()->promise;
})->then(sub {
$cv = AnyEvent->condvar;
print "-------------- Load by Async::Defer\n";
my $statuses = shift;
if(defined $statuses) {
}else {
warn "Load error";
=head2 asyncTwitter ($user_screen_name, $max_id, $callback)
Basic function to load a page of the user_timeline from
C<$max_id> can be C<undef>, in which case the first page is loaded.
In success, C<$callback> is called with an ARRAY-ref of statuses. In
failure, C<$callback> is called with C<undef>.
sub asyncTwitter {
my ($user_screen_name, $max_id, $callback) = @_;
my $url = "$user_screen_name";
$url .= "&max_id=$max_id" if defined $max_id;
fork_call {
return `wget -q '$url' -O-`;
} sub {
my $result = shift;
my $result_obj = undef;
if(defined($result) && $result ne "") {
eval {
$result_obj = decode_json($result);
if($@) {
warn "$@";
=head2 printStatuses ( $statuses )
Print an ARRAY-ref of statuses.
sub printStatuses {
my ($statuses) = @_;
foreach my $status (@$statuses) {
"ID:%s [%s] (%s) %s\n", $status->{id}, $status->{created_at}, $status->{user}{screen_name},
Encode::encode('utf8', $status->{text})
=head2 promiseTwitter ( $user_screen_name, $max_id )
Promise-wrapper for C<asyncTwitter()>. It returns L<Promises::Promise>
object representing the loaded statuses in the future.
sub promiseTwitter {
my ($user_screen_name, $max_id) = @_;
my $d = Promises::Deferred->new;
asyncTwitter $user_screen_name, $max_id, sub {
my $result = shift;
if(defined $result) {
}else {
$d->reject("cannot load timeline");
return $d->promise;
=head2 timeFilter ( $statuses, $threshold_datetime )
Filter the ARRAY-ref of statuses with C<$threshold_datetime>, which is
a L<DateTime> object.
Returns a possibly new ARRAY-ref of statuses whose timestamps are
newer or equal to C<$threshold_datetime>.
sub timeFilter {
my ($statuses, $threshold_datetime) = @_;
return $statuses if not defined $threshold_datetime;
return $statuses if !@$statuses;
my @result = ();
foreach my $status (@$statuses) {
my $dt = $twitter_parser->parse_datetime($status->{created_at});
if(DateTime->compare($dt, $threshold_datetime) >= 0) {
push(@result, $status);
return \@result;
=head2 uniqStatuses ( $statuses )
Returns the Uniqified ARRAY-ref of statuses.
sub uniqStatuses {
my ($statuses) = @_;
my @result = ();
my %dict = ();
foreach my $s (@$statuses) {
next if exists $dict{$s->{id}};
$dict{$s->{id}} = 1;
push(@result, $s);
return \@result;
=head2 promisePages ( $user_screen_name, $threshold_datetime_str )
Load user_timeline repeatedly using C<promiseTwitter()> function.
Returns a L<Promises::Promise> object that is resolved when it's
loaded enough pages.
Note that this function uses recursive calls to repeat requests.
sub promisePages {
my ($user_screen_name, $threshold_datetime_str) = @_;
my @result_statuses = ();
my $loaded_pages = 0;
my $threshold_datetime = undef;
if(defined $threshold_datetime_str) {
$threshold_datetime = $plain_parser->parse_datetime($threshold_datetime_str);
## Use recursive execution of $loader to repeat loading.
my $loader; $loader = sub {
my $statuses = shift;
my $orig_statuses = $statuses;
$statuses = timeFilter($statuses, $threshold_datetime);
push(@result_statuses, @$statuses);
print STDERR "Loaded page $loaded_pages\n";
if($loaded_pages >= MAX_PAGES || @$statuses != @$orig_statuses) {
return uniqStatuses \@result_statuses;
my $next_max_id = $statuses->[-1]{id};
return promiseTwitter($user_screen_name, $next_max_id)->then($loader);
return promiseTwitter($user_screen_name, undef)->then($loader);
=head2 deferPages ( $user_screen_name, $threshold_datetime, $callback )
Load user_timeline repeatedly using L<Async::Defer> object.
In success, C<$callback> is called with the loaded ARRAY-ref of
statuses. In falture, C<$callback> is called with C<undef>.
Note that repeated requests are written using C<while()> and
C<break()> methods of L<Async::Defer>.
sub deferPages {
my ($user_screen_name, $threshold_datetime_str, $callback) = @_;
my @result_statuses = ();
my $loaded_pages = 0;
my $next_max_id = undef;
my $threshold_datetime = undef;
if(defined $threshold_datetime_str) {
$threshold_datetime = $plain_parser->parse_datetime($threshold_datetime_str);
Async::Defer->new->try->while(sub { 1 })->do(sub {
my ($d) = @_;
asyncTwitter($user_screen_name, $next_max_id, sub {
my $statuses = shift;
if(not defined $statuses) {
$d->throw('cannot load');
my $orig_statuses = $statuses;
$statuses = timeFilter($statuses, $threshold_datetime);
push(@result_statuses, @$statuses);
print STDERR "Loaded page $loaded_pages\n";
if($loaded_pages >= MAX_PAGES || @$statuses != @$orig_statuses) {
@result_statuses = @{ uniqStatuses \@result_statuses };
$next_max_id = $statuses->[-1]{id};
})->end_while->do(sub {
my ($d) = @_;
})->catch(qr(.) => sub {
my ($d, $e) = @_;
warn $e;
=head1 AUTHOR
Toshio Ito <toshioito [at]>
