Created
December 2, 2012 01:39
-
-
Save debug-ito/4186470 to your computer and use it in GitHub Desktop.
Load Twitter user_timeline repeatedly using AnyEvent, Async::Defer and Promises
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
=pod | |
=head1 NAME | |
Load Twitter user_timeline repeatedly using AnyEvent, Async::Defer and Promises | |
=head1 SYNOPSIS | |
This script demonstrates how we can use Async::Defer or Promises | |
module to send requests to twitter.com repeatedly in an asynchronous | |
way. The goal is to load a complete user_timeline that expands over | |
multiple pages. To do that, we have to send multiple requests and the | |
number of requests cannot be determined in advance. This is where | |
Async::Defer comes in pretty handy. | |
=head1 BUGS | |
Sorry, this example uses Twitter API v1.0, so it won't work with the | |
newer API. | |
If you are willing to use the newer API, try to modify asyncTwitter(). | |
=cut | |
use strict; | |
use warnings; | |
use AnyEvent; | |
use AnyEvent::Util; | |
use JSON; | |
use Encode; | |
use Carp; | |
use Promises 0.02; | |
use Async::Defer 0.9.3; | |
use DateTime; | |
use DateTime::Format::Strptime; | |
my $twitter_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); | |
my $plain_parser = DateTime::Format::Strptime->new(pattern => '%Y-%m-%d %H:%M:%S %z'); | |
=pod | |
=head1 CONSTANTS | |
=over | |
=item USER_SCREEN_NAME | |
screen_name of the user whose timeline is to be loaded. | |
=item THRESHOLD_DATETIME_STR | |
A string representing the "threshold" time of loading. The script will | |
load statuses whose timestamps (created_at fields) are newer or equal | |
to the threshold. This can be C<undef>. | |
=item MAX_PAGES | |
Maximum pages to be loaded. The script never loads beyond this page. | |
=back | |
=cut | |
use constant { | |
USER_SCREEN_NAME => 'itmedia', | |
THRESHOLD_DATETIME_STR => '2012-11-30 12:00:00 +0000', | |
MAX_PAGES => 5, | |
}; | |
=pod | |
=head1 FUNCTIONS | |
=head2 main () | |
The entry point. It calls C<promisePages> and C<deferPages()> to load | |
the user_timeline. | |
=cut | |
sub main { | |
my $cv; | |
$cv = AnyEvent->condvar; | |
print "-------------- Load by Promises\n"; | |
promisePages(USER_SCREEN_NAME, THRESHOLD_DATETIME_STR)->then(sub { | |
my $statuses = shift; | |
printStatuses($statuses); | |
}, sub { | |
warn "Load error"; | |
return Promises::Deferred->new()->resolve()->promise; | |
})->then(sub { | |
$cv->send; | |
}); | |
$cv->recv; | |
$cv = AnyEvent->condvar; | |
print "-------------- Load by Async::Defer\n"; | |
deferPages(USER_SCREEN_NAME, THRESHOLD_DATETIME_STR, sub { | |
my $statuses = shift; | |
if(defined $statuses) { | |
printStatuses($statuses); | |
}else { | |
warn "Load error"; | |
} | |
$cv->send; | |
}); | |
$cv->recv; | |
} | |
=pod | |
=head2 asyncTwitter ($user_screen_name, $max_id, $callback) | |
Basic function to load a page of the user_timeline from twitter.com. | |
C<$max_id> can be C<undef>, in which case the first page is loaded. | |
In success, C<$callback> is called with an ARRAY-ref of statuses. In | |
failure, C<$callback> is called with C<undef>. | |
=cut | |
sub asyncTwitter { | |
my ($user_screen_name, $max_id, $callback) = @_; | |
my $url = "https://api.twitter.com/1/statuses/user_timeline.json?screen_name=$user_screen_name"; | |
$url .= "&max_id=$max_id" if defined $max_id; | |
fork_call { | |
return `wget -q '$url' -O-`; | |
} sub { | |
my $result = shift; | |
my $result_obj = undef; | |
if(defined($result) && $result ne "") { | |
eval { | |
$result_obj = decode_json($result); | |
}; | |
if($@) { | |
warn "$@"; | |
} | |
} | |
$callback->($result_obj); | |
}; | |
} | |
=pod | |
=head2 printStatuses ( $statuses ) | |
Print an ARRAY-ref of statuses. | |
=cut | |
sub printStatuses { | |
my ($statuses) = @_; | |
foreach my $status (@$statuses) { | |
printf( | |
"ID:%s [%s] (%s) %s\n", $status->{id}, $status->{created_at}, $status->{user}{screen_name}, | |
Encode::encode('utf8', $status->{text}) | |
); | |
} | |
} | |
=pod | |
=head2 promiseTwitter ( $user_screen_name, $max_id ) | |
Promise-wrapper for C<asyncTwitter()>. It returns L<Promises::Promise> | |
object representing the loaded statuses in the future. | |
=cut | |
sub promiseTwitter { | |
my ($user_screen_name, $max_id) = @_; | |
my $d = Promises::Deferred->new; | |
asyncTwitter $user_screen_name, $max_id, sub { | |
my $result = shift; | |
if(defined $result) { | |
$d->resolve($result); | |
}else { | |
$d->reject("cannot load timeline"); | |
} | |
}; | |
return $d->promise; | |
} | |
=pod | |
=head2 timeFilter ( $statuses, $threshold_datetime ) | |
Filter the ARRAY-ref of statuses with C<$threshold_datetime>, which is | |
a L<DateTime> object. | |
Returns a possibly new ARRAY-ref of statuses whose timestamps are | |
newer or equal to C<$threshold_datetime>. | |
=cut | |
sub timeFilter { | |
my ($statuses, $threshold_datetime) = @_; | |
return $statuses if not defined $threshold_datetime; | |
return $statuses if !@$statuses; | |
my @result = (); | |
foreach my $status (@$statuses) { | |
my $dt = $twitter_parser->parse_datetime($status->{created_at}); | |
if(DateTime->compare($dt, $threshold_datetime) >= 0) { | |
push(@result, $status); | |
} | |
} | |
return \@result; | |
} | |
=pod | |
=head2 uniqStatuses ( $statuses ) | |
Returns the Uniqified ARRAY-ref of statuses. | |
=cut | |
sub uniqStatuses { | |
my ($statuses) = @_; | |
my @result = (); | |
my %dict = (); | |
foreach my $s (@$statuses) { | |
next if exists $dict{$s->{id}}; | |
$dict{$s->{id}} = 1; | |
push(@result, $s); | |
} | |
return \@result; | |
} | |
=pod | |
=head2 promisePages ( $user_screen_name, $threshold_datetime_str ) | |
Load user_timeline repeatedly using C<promiseTwitter()> function. | |
Returns a L<Promises::Promise> object that is resolved when it's | |
loaded enough pages. | |
Note that this function uses recursive calls to repeat requests. | |
=cut | |
sub promisePages { | |
my ($user_screen_name, $threshold_datetime_str) = @_; | |
my @result_statuses = (); | |
my $loaded_pages = 0; | |
my $threshold_datetime = undef; | |
if(defined $threshold_datetime_str) { | |
$threshold_datetime = $plain_parser->parse_datetime($threshold_datetime_str); | |
} | |
## Use recursive execution of $loader to repeat loading. | |
my $loader; $loader = sub { | |
my $statuses = shift; | |
my $orig_statuses = $statuses; | |
$statuses = timeFilter($statuses, $threshold_datetime); | |
push(@result_statuses, @$statuses); | |
print STDERR "Loaded page $loaded_pages\n"; | |
$loaded_pages++; | |
if($loaded_pages >= MAX_PAGES || @$statuses != @$orig_statuses) { | |
return uniqStatuses \@result_statuses; | |
} | |
my $next_max_id = $statuses->[-1]{id}; | |
return promiseTwitter($user_screen_name, $next_max_id)->then($loader); | |
}; | |
return promiseTwitter($user_screen_name, undef)->then($loader); | |
} | |
=pod | |
=head2 deferPages ( $user_screen_name, $threshold_datetime, $callback ) | |
Load user_timeline repeatedly using L<Async::Defer> object. | |
In success, C<$callback> is called with the loaded ARRAY-ref of | |
statuses. In falture, C<$callback> is called with C<undef>. | |
Note that repeated requests are written using C<while()> and | |
C<break()> methods of L<Async::Defer>. | |
=cut | |
sub deferPages { | |
my ($user_screen_name, $threshold_datetime_str, $callback) = @_; | |
my @result_statuses = (); | |
my $loaded_pages = 0; | |
my $next_max_id = undef; | |
my $threshold_datetime = undef; | |
if(defined $threshold_datetime_str) { | |
$threshold_datetime = $plain_parser->parse_datetime($threshold_datetime_str); | |
} | |
Async::Defer->new->try->while(sub { 1 })->do(sub { | |
my ($d) = @_; | |
asyncTwitter($user_screen_name, $next_max_id, sub { | |
my $statuses = shift; | |
if(not defined $statuses) { | |
$d->throw('cannot load'); | |
return; | |
} | |
my $orig_statuses = $statuses; | |
$statuses = timeFilter($statuses, $threshold_datetime); | |
push(@result_statuses, @$statuses); | |
print STDERR "Loaded page $loaded_pages\n"; | |
$loaded_pages++; | |
if($loaded_pages >= MAX_PAGES || @$statuses != @$orig_statuses) { | |
@result_statuses = @{ uniqStatuses \@result_statuses }; | |
$d->break(); | |
return; | |
} | |
$next_max_id = $statuses->[-1]{id}; | |
$d->done(); | |
}); | |
})->end_while->do(sub { | |
my ($d) = @_; | |
$callback->(\@result_statuses); | |
$d->done; | |
})->catch(qr(.) => sub { | |
my ($d, $e) = @_; | |
warn $e; | |
$callback->(undef); | |
$d->done; | |
})->run; | |
} | |
main; | |
=pod | |
=head1 AUTHOR | |
Toshio Ito <toshioito [at] cpan.org> | |
=cut | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment