-
-
Save dragon3/394491 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
use feature ":5.10"; | |
use strict; | |
use utf8; | |
use Tatsumaki::Error; | |
use Tatsumaki::Application; | |
use Tatsumaki::HTTPClient; | |
use Tatsumaki::Server; | |
use Tatsumaki::MessageQueue; | |
$main::MQ = Tatsumaki::MessageQueue->instance("main"); | |
package StreamCrowler; | |
use JSON; | |
use Data::Dumper; | |
use Encode; | |
use Try::Tiny; | |
use AnyEvent::HTTP; | |
use Time::Piece; | |
use LWP::UserAgent; | |
use TokyoCabinet; | |
use Storable qw/ nfreeze thaw /; | |
use Config::Pit; | |
use MIME::Base64; | |
use Term::ANSIColor; | |
$| = 1; | |
my $Config = pit_get("www.twitter.com", require => { | |
username => "username", | |
password => "password", | |
}); | |
my $Tc = TokyoCabinet::HDB->new; | |
$Tc->open("stream.hdb", $Tc->OWRITER | $Tc->OCREAT) | |
or die $Tc->errmsg( $Tc->ecode ); | |
my $Flap = 1; | |
my $Wait = time - 1; | |
my $Ua = LWP::UserAgent->new; | |
my $Remaining = "?"; | |
$Ua->env_proxy; | |
$SIG{HUP} = $SIG{TERM} = $SIG{INT} = sub { | |
info("exiting..."); | |
$Tc->close; | |
info("done"); | |
exit; | |
}; | |
http_get "http://chirpstream.twitter.com/2b/user.json", | |
headers => { | |
Authorization => encode_base64( | |
$Config->{username} . ":" . $Config->{password} | |
), | |
}, | |
on_header => sub { | |
my ($headers) = @_; | |
print Dumper $headers; | |
}, | |
want_body_handle => 1, # for some reason on_body => sub {} doesn't work :/ | |
sub { | |
my ($handle, $headers) = @_; | |
$handle->push_read( json => \&parse_json ); | |
} | |
; | |
sub esprintf { | |
my ($format, @args) = @_; | |
my $t = localtime; | |
my $message = sprintf "[%s %s] ", $t->ymd, $t->hms; | |
if ( @_ == 1 ) { | |
$message .= $format; | |
} | |
else { | |
$message .= sprintf $format, @args; | |
} | |
$main::MQ->publish( message => $message ); | |
} | |
sub to_html { | |
my $str = shift; | |
$str =~ s{(https?://[-_.!~*'()a-zA-Z0-9;/?:\@&=+\$,%#]+)} | |
{<a href="$1" target="_blank">$1</a>}g; | |
$str =~ s{\@([0-9a-zA-Z_-]+)} | |
{<a href="http://twitter.com/$1">\@$1</a>}g; | |
$str; | |
} | |
sub do_event { | |
my $data = shift; | |
my $event = $data->{event}; | |
given ($event) { | |
when (qr{(?:un)?follow}) { | |
esprintf( | |
qq{<font color="green">\@%s が \@%s を $event</font>\n}, | |
get_user($data->{source}->{id})->{screen_name}, | |
get_user($data->{target}->{id})->{screen_name}, | |
); | |
} | |
when ("retweet") { | |
my $tweet = get_tweet($data->{target_object}->{id}); | |
esprintf( | |
qq{<font color="cyan">RT \@%s が \@%s を $event: %s\n</font>}, | |
get_user($data->{source}->{id})->{screen_name}, | |
get_user($data->{target}->{id})->{screen_name}, | |
to_html($tweet->{text}), | |
); | |
} | |
when (qr{^(?:un)?favorite}) { | |
my $tweet = get_tweet($data->{target_object}->{id}); | |
esprintf( | |
qq{<font color="magenta">★ \@%s が \@%s を $event: %s\n</font>}, | |
get_user($data->{source}->{id})->{screen_name}, | |
get_user($data->{target}->{id})->{screen_name}, | |
to_html($tweet->{text}), | |
) | |
} | |
default { | |
error("unknown: ". JSON->new->pretty->encode($data)); | |
} | |
} | |
} | |
sub error { | |
warn colored(["red"], "@_") . "\n"; | |
} | |
sub info { | |
warn colored(["blue"], "@_") . "\n"; | |
} | |
sub handle_api_error { | |
my ($msg, $h) = @_; | |
given ($msg->{error}) { | |
when (qr{^Rate limit}) { | |
my $reset = $h->{ lc "X-RateLimit-Reset" }; | |
error("RateLimit reset at " . localtime($reset)); | |
$Wait = $reset; | |
} | |
default { | |
error( Dumper($msg) ); | |
} | |
} | |
} | |
sub get_user { | |
my $id = shift; | |
my $user = $Tc->get("user:$id"); | |
$user = thaw $user if $user; | |
return $user || get_user_by_api($id) || +{ screen_name => $id }; | |
} | |
sub get_tweet { | |
my $id = shift; | |
my $tweet = $Tc->get("tweet:$id"); | |
$tweet = thaw $tweet if $tweet; | |
return $tweet || get_tweet_by_api($id) || +{ text => $id }; | |
} | |
sub get_user_by_api { | |
my $id = shift; | |
return if time < $Wait; | |
my $res = $Ua->get("http://api.twitter.com/1/users/show.json?user_id=$id"); | |
if ($res->is_success) { | |
$Remaining = $res->headers->{ lc "X-RateLimit-Remaining" }; | |
my $user = from_json($res->content); | |
$Wait = time - 1; | |
set_user($user); | |
return $user; | |
} | |
else { | |
handle_api_error(from_json($res->content), $res->headers); | |
} | |
return; | |
} | |
sub get_tweet_by_api { | |
my $id = shift; | |
return if time < $Wait; | |
my $res = $Ua->get("http://api.twitter.com/1/statuses/show/${id}.json"); | |
if ($res->is_success) { | |
$Remaining = $res->headers->{ lc "X-RateLimit-Remaining" }; | |
my $tweet = from_json($res->content); | |
my $user = delete $tweet->{user}; | |
$Wait = time - 1; | |
set_tweet($tweet); | |
set_user($user); | |
return $tweet; | |
} | |
else { | |
handle_api_error(from_json($res->content), $res->headers); | |
} | |
return; | |
} | |
sub set_tweet { | |
my $tweet = shift; | |
my $user = delete $tweet->{user}; | |
$Tc->put( "tweet:$tweet->{id}" => nfreeze $tweet ) or warn "failed\n"; | |
$Tc->put( "user:$user->{id}" => nfreeze $user ) or warn "failed\n"; | |
} | |
sub set_user { | |
my $user = shift; | |
$Tc->put( "user:$user->{id}" => nfreeze $user ) or warn "failed\n"; | |
} | |
sub parse_json { | |
my ($handle, $data) = @_; | |
if ($data->{event}) { | |
try { | |
do_event($data) | |
} | |
catch { | |
error(@_) | |
}; | |
} | |
elsif (my $del = $data->{delete}) { | |
esprintf( | |
qq{<font color="orange">DEL \@%s が delete: %s\n</font>}, | |
get_user($del->{status}->{user_id})->{screen_name}, | |
to_html( get_tweet($del->{status}->{id})->{text} ), | |
); | |
} | |
elsif ($data->{text}) { | |
esprintf( | |
qq{<img src="%s" width="32" height="32"><a href="http://twitter.com/%s" target="_blank">%s</a>: %s\n}, | |
$data->{user}->{profile_image_url}, | |
$data->{user}->{screen_name}, | |
$data->{user}->{screen_name}, | |
to_html($data->{text}), | |
); | |
set_tweet($data); | |
} | |
else { | |
esprintf( "%s\n", JSON->new->pretty->encode($data) ); | |
} | |
$handle->push_read( json => \&parse_json ); | |
} | |
package StreamWriter; | |
use parent qw(Tatsumaki::Handler); | |
__PACKAGE__->asynchronous(1); | |
sub get { | |
my $self = shift; | |
$self->response->content_type('text/plain; charset=utf-8'); | |
$main::MQ->poll_once( | |
1, | |
sub { | |
$self->write( $_[1] ); | |
$self->finish; | |
} | |
); | |
} | |
package MainHandler; | |
use parent qw(Tatsumaki::Handler); | |
use Text::MicroTemplate qw / :all /; | |
my $tmpl = <<'_END_'; | |
? my $req = $_[0]->request; | |
<!Docutype HTML> | |
<html> | |
<head> | |
<meta charset="utf-8"> | |
<title>UserStreamsClient</title> | |
<script src="http://www.google.com/jsapi"></script> | |
<script> google.load("jquery", "1.4") </script> | |
<style> | |
div { border-bottom: 1px solid #999; } | |
</style> | |
</head> | |
<body> | |
<div id="tweets"></div> | |
<script> | |
var tweets_buf = []; | |
var id = 0; | |
var generate_id = function() { | |
id = id + 1; | |
return "tweet-" + id; | |
}; | |
var clear_buf = function() { | |
while (tweets_buf.length > 100) { | |
var id = tweets_buf.shift(); | |
$("#"+id).remove(); | |
} | |
}; | |
var load_more = function() { | |
$.get( | |
"<?= $req->uri ?>stream", | |
function(data) { | |
if (data != "") { | |
var id = generate_id(); | |
$("#tweets").prepend("<div id='" + id + "'>" + data + "</div>"); | |
tweets_buf.push(id); | |
clear_buf(); | |
} | |
load_more(); | |
} | |
); | |
}; | |
load_more(); | |
</script> | |
</body> | |
</html> | |
_END_ | |
sub get { | |
my $self = shift; | |
$self->write( render_mt( $tmpl, $self )->as_string ); | |
} | |
package main; | |
my $app = Tatsumaki::Application->new([ | |
"/" => "MainHandler", | |
"/stream" => "StreamWriter", | |
]); | |
return $app; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment