Skip to content

Instantly share code, notes, and snippets.

@debug-ito
Created December 2, 2012 01:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debug-ito/4186470 to your computer and use it in GitHub Desktop.
Save debug-ito/4186470 to your computer and use it in GitHub Desktop.
Load Twitter user_timeline repeatedly using AnyEvent, Async::Defer and Promises
=pod
=head1 NAME
Load Twitter user_timeline repeatedly using AnyEvent, Async::Defer and Promises
=head1 SYNOPSIS
This script demonstrates how we can use Async::Defer or Promises
module to send requests to twitter.com repeatedly in an asynchronous
way. The goal is to load a complete user_timeline that expands over
multiple pages. To do that, we have to send multiple requests and the
number of requests cannot be determined in advance. This is where
Async::Defer comes in pretty handy.
=head1 BUGS
Sorry, this example uses Twitter API v1.0, so it won't work with the
newer API.
If you are willing to use the newer API, try to modify asyncTwitter().
=cut
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Util;
use JSON;
use Encode;
use Carp;
use Promises 0.02;
use Async::Defer 0.9.3;
use DateTime;
use DateTime::Format::Strptime;
my $twitter_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $plain_parser = DateTime::Format::Strptime->new(pattern => '%Y-%m-%d %H:%M:%S %z');
=pod
=head1 CONSTANTS
=over
=item USER_SCREEN_NAME
screen_name of the user whose timeline is to be loaded.
=item THRESHOLD_DATETIME_STR
A string representing the "threshold" time of loading. The script will
load statuses whose timestamps (created_at fields) are newer or equal
to the threshold. This can be C<undef>.
=item MAX_PAGES
Maximum pages to be loaded. The script never loads beyond this page.
=back
=cut
use constant {
USER_SCREEN_NAME => 'itmedia',
THRESHOLD_DATETIME_STR => '2012-11-30 12:00:00 +0000',
MAX_PAGES => 5,
};
=pod
=head1 FUNCTIONS
=head2 main ()
The entry point. It calls C<promisePages> and C<deferPages()> to load
the user_timeline.
=cut
sub main {
my $cv;
$cv = AnyEvent->condvar;
print "-------------- Load by Promises\n";
promisePages(USER_SCREEN_NAME, THRESHOLD_DATETIME_STR)->then(sub {
my $statuses = shift;
printStatuses($statuses);
}, sub {
warn "Load error";
return Promises::Deferred->new()->resolve()->promise;
})->then(sub {
$cv->send;
});
$cv->recv;
$cv = AnyEvent->condvar;
print "-------------- Load by Async::Defer\n";
deferPages(USER_SCREEN_NAME, THRESHOLD_DATETIME_STR, sub {
my $statuses = shift;
if(defined $statuses) {
printStatuses($statuses);
}else {
warn "Load error";
}
$cv->send;
});
$cv->recv;
}
=pod
=head2 asyncTwitter ($user_screen_name, $max_id, $callback)
Basic function to load a page of the user_timeline from twitter.com.
C<$max_id> can be C<undef>, in which case the first page is loaded.
In success, C<$callback> is called with an ARRAY-ref of statuses. In
failure, C<$callback> is called with C<undef>.
=cut
sub asyncTwitter {
my ($user_screen_name, $max_id, $callback) = @_;
my $url = "https://api.twitter.com/1/statuses/user_timeline.json?screen_name=$user_screen_name";
$url .= "&max_id=$max_id" if defined $max_id;
fork_call {
return `wget -q '$url' -O-`;
} sub {
my $result = shift;
my $result_obj = undef;
if(defined($result) && $result ne "") {
eval {
$result_obj = decode_json($result);
};
if($@) {
warn "$@";
}
}
$callback->($result_obj);
};
}
=pod
=head2 printStatuses ( $statuses )
Print an ARRAY-ref of statuses.
=cut
sub printStatuses {
my ($statuses) = @_;
foreach my $status (@$statuses) {
printf(
"ID:%s [%s] (%s) %s\n", $status->{id}, $status->{created_at}, $status->{user}{screen_name},
Encode::encode('utf8', $status->{text})
);
}
}
=pod
=head2 promiseTwitter ( $user_screen_name, $max_id )
Promise-wrapper for C<asyncTwitter()>. It returns L<Promises::Promise>
object representing the loaded statuses in the future.
=cut
sub promiseTwitter {
my ($user_screen_name, $max_id) = @_;
my $d = Promises::Deferred->new;
asyncTwitter $user_screen_name, $max_id, sub {
my $result = shift;
if(defined $result) {
$d->resolve($result);
}else {
$d->reject("cannot load timeline");
}
};
return $d->promise;
}
=pod
=head2 timeFilter ( $statuses, $threshold_datetime )
Filter the ARRAY-ref of statuses with C<$threshold_datetime>, which is
a L<DateTime> object.
Returns a possibly new ARRAY-ref of statuses whose timestamps are
newer or equal to C<$threshold_datetime>.
=cut
sub timeFilter {
my ($statuses, $threshold_datetime) = @_;
return $statuses if not defined $threshold_datetime;
return $statuses if !@$statuses;
my @result = ();
foreach my $status (@$statuses) {
my $dt = $twitter_parser->parse_datetime($status->{created_at});
if(DateTime->compare($dt, $threshold_datetime) >= 0) {
push(@result, $status);
}
}
return \@result;
}
=pod
=head2 uniqStatuses ( $statuses )
Returns the Uniqified ARRAY-ref of statuses.
=cut
sub uniqStatuses {
my ($statuses) = @_;
my @result = ();
my %dict = ();
foreach my $s (@$statuses) {
next if exists $dict{$s->{id}};
$dict{$s->{id}} = 1;
push(@result, $s);
}
return \@result;
}
=pod
=head2 promisePages ( $user_screen_name, $threshold_datetime_str )
Load user_timeline repeatedly using C<promiseTwitter()> function.
Returns a L<Promises::Promise> object that is resolved when it's
loaded enough pages.
Note that this function uses recursive calls to repeat requests.
=cut
sub promisePages {
my ($user_screen_name, $threshold_datetime_str) = @_;
my @result_statuses = ();
my $loaded_pages = 0;
my $threshold_datetime = undef;
if(defined $threshold_datetime_str) {
$threshold_datetime = $plain_parser->parse_datetime($threshold_datetime_str);
}
## Use recursive execution of $loader to repeat loading.
my $loader; $loader = sub {
my $statuses = shift;
my $orig_statuses = $statuses;
$statuses = timeFilter($statuses, $threshold_datetime);
push(@result_statuses, @$statuses);
print STDERR "Loaded page $loaded_pages\n";
$loaded_pages++;
if($loaded_pages >= MAX_PAGES || @$statuses != @$orig_statuses) {
return uniqStatuses \@result_statuses;
}
my $next_max_id = $statuses->[-1]{id};
return promiseTwitter($user_screen_name, $next_max_id)->then($loader);
};
return promiseTwitter($user_screen_name, undef)->then($loader);
}
=pod
=head2 deferPages ( $user_screen_name, $threshold_datetime, $callback )
Load user_timeline repeatedly using L<Async::Defer> object.
In success, C<$callback> is called with the loaded ARRAY-ref of
statuses. In falture, C<$callback> is called with C<undef>.
Note that repeated requests are written using C<while()> and
C<break()> methods of L<Async::Defer>.
=cut
sub deferPages {
my ($user_screen_name, $threshold_datetime_str, $callback) = @_;
my @result_statuses = ();
my $loaded_pages = 0;
my $next_max_id = undef;
my $threshold_datetime = undef;
if(defined $threshold_datetime_str) {
$threshold_datetime = $plain_parser->parse_datetime($threshold_datetime_str);
}
Async::Defer->new->try->while(sub { 1 })->do(sub {
my ($d) = @_;
asyncTwitter($user_screen_name, $next_max_id, sub {
my $statuses = shift;
if(not defined $statuses) {
$d->throw('cannot load');
return;
}
my $orig_statuses = $statuses;
$statuses = timeFilter($statuses, $threshold_datetime);
push(@result_statuses, @$statuses);
print STDERR "Loaded page $loaded_pages\n";
$loaded_pages++;
if($loaded_pages >= MAX_PAGES || @$statuses != @$orig_statuses) {
@result_statuses = @{ uniqStatuses \@result_statuses };
$d->break();
return;
}
$next_max_id = $statuses->[-1]{id};
$d->done();
});
})->end_while->do(sub {
my ($d) = @_;
$callback->(\@result_statuses);
$d->done;
})->catch(qr(.) => sub {
my ($d, $e) = @_;
warn $e;
$callback->(undef);
$d->done;
})->run;
}
main;
=pod
=head1 AUTHOR
Toshio Ito <toshioito [at] cpan.org>
=cut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment