Skip to content

Instantly share code, notes, and snippets.

@y-yu
Created January 17, 2012 16:49
Show Gist options
  • Save y-yu/1627445 to your computer and use it in GitHub Desktop.
Save y-yu/1627445 to your computer and use it in GitHub Desktop.
AnyEvent::Twitter::Stream改変
package AnyEvent::Twitter::Stream;
use strict;
use 5.008_001;
our $VERSION = '0.21';
use Data::Dumper;
use AnyEvent;
use AnyEvent::HTTP;
use AnyEvent::Util;
use MIME::Base64;
use URI;
use URI::Escape;
use Carp;
our $STREAMING_SERVER = 'stream.twitter.com';
our $USERSTREAM_SERVER = 'userstream.twitter.com';
our $PROTOCOL = $ENV{'ANYEVENT_TWITTER_STREAM_SSL'} ? 'https' : 'http';
our $US_PROTOCOL = 'https'; # for testing
my %methods = (
firehose => [],
links => [],
retweet => [],
sample => [],
userstream => [ qw(replies) ],
filter => [ qw(track follow locations) ],
);
sub new {
my $class = shift;
my %args = @_;
my $username = delete $args{username};
my $password = delete $args{password};
my $consumer_key = delete $args{consumer_key};
my $consumer_secret = delete $args{consumer_secret};
my $token = delete $args{token};
my $token_secret = delete $args{token_secret};
my $method = delete $args{method};
my $on_connect = delete $args{on_connect} || sub { };
my $on_tweet = delete $args{on_tweet};
my $on_error = delete $args{on_error} || sub { die @_ };
my $on_eof = delete $args{on_eof} || sub { };
my $on_keepalive = delete $args{on_keepalive} || sub { };
my $on_delete = delete $args{on_delete};
my $on_friends = delete $args{on_friends};
my $on_event = delete $args{on_event};
# add
my $on_dm = delete $args{on_dm};
my $timeout = delete $args{timeout};
my $decode_json;
unless (delete $args{no_decode_json}) {
require JSON;
$decode_json = 1;
}
unless ($methods{$method}) {
$on_error->("Method $method not available.");
return;
}
my %post_args;
for my $param ( @{ $methods{$method} } ) {
next if $method eq 'userstream' && $param eq 'replies';
if ( exists $args{$param} ) {
$post_args{$param} = delete $args{$param};
}
}
my $uri;
if ($method eq 'userstream') {
$uri = URI->new("$US_PROTOCOL://$USERSTREAM_SERVER/2/user.json");
}else{
$uri = URI->new("$PROTOCOL://$STREAMING_SERVER/1/statuses/$method.json");
}
$uri->query_form(%args);
my $request_method = 'GET';
my $request_body;
if ($method eq 'filter' || $method eq 'userstream') {
$request_method = 'POST';
$request_body = join '&', map "$_=" . URI::Escape::uri_escape($post_args{$_}), keys %post_args;
}
my $auth;
if ($consumer_key) {
eval {require Net::OAuth;};
die $@ if $@;
my $request = Net::OAuth->request('protected resource')->new(
version => '1.0',
consumer_key => $consumer_key,
consumer_secret => $consumer_secret,
token => $token,
token_secret => $token_secret,
request_method => $request_method,
signature_method => 'HMAC-SHA1',
timestamp => time,
nonce => MIME::Base64::encode( time . $$ . rand ),
request_url => $uri,
extra_params => \%post_args,
);
$request->sign;
$auth = $request->to_authorization_header;
}else{
$auth = "Basic ".MIME::Base64::encode("$username:$password", '');
}
my $self = bless {}, $class;
{
Scalar::Util::weaken(my $self = $self);
my $set_timeout = $timeout
? sub { $self->{timeout} = AE::timer($timeout, 0, sub { $on_error->('timeout') }) }
: sub {};
my $on_json_message = sub {
my ($json) = @_;
# Twitter stream returns "\x0a\x0d\x0a" if there's no matched tweets in ~30s.
$set_timeout->();
if ($json !~ /^\s*$/) {
my $tweet = $decode_json ? JSON::decode_json($json) : $json;
if ($on_delete && $tweet->{delete} && $tweet->{delete}->{status}) {
$on_delete->($tweet->{delete}->{status}->{id}, $tweet->{delete}->{status}->{user_id});
}elsif($on_friends && $tweet->{friends}) {
$on_friends->($tweet->{friends});
}elsif($on_event && $tweet->{event}) {
$on_event->($tweet);
# add
}elsif($on_dm && $tweet->{direct_message}) {
$on_dm->($tweet->{direct_message});
}
else{
$on_tweet->($tweet);
}
}
else {
$on_keepalive->();
}
};
$set_timeout->();
$self->{connection_guard} = http_request($request_method, $uri,
headers => {
Accept => '*/*',
Authorization => $auth,
($request_method eq 'POST'
? ('Content-Type' => 'application/x-www-form-urlencoded')
: ()
),
},
body => $request_body,
on_header => sub {
my($headers) = @_;
if ($headers->{Status} ne '200') {
$on_error->("$headers->{Status}: $headers->{Reason}");
return;
}
return 1;
},
want_body_handle => 1, # for some reason on_body => sub {} doesn't work :/
sub {
my ($handle, $headers) = @_;
return unless $handle;
my $chunk_reader = sub {
my ($handle, $line) = @_;
$line =~ /^([0-9a-fA-F]+)/ or die 'bad chunk (incorrect length)';
my $len = hex $1;
$handle->push_read(chunk => $len, sub {
my ($handle, $chunk) = @_;
$handle->push_read(line => sub {
length $_[1] and die 'bad chunk (missing last empty line)';
});
$on_json_message->($chunk);
});
};
my $line_reader = sub {
my ($handle, $line) = @_;
$on_json_message->($line);
};
$handle->on_error(sub {
undef $handle;
$on_error->($_[2]);
});
$handle->on_eof(sub {
undef $handle;
$on_eof->(@_);
});
if (($headers->{'transfer-encoding'} || '') =~ /\bchunked\b/i) {
$handle->on_read(sub {
my ($handle) = @_;
$handle->push_read(line => $chunk_reader);
});
} else {
$handle->on_read(sub {
my ($handle) = @_;
$handle->push_read(line => $line_reader);
});
}
$self->{guard} = AnyEvent::Util::guard {
$handle->destroy if $handle;
};
$on_connect->();
}
);
}
return $self;
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment