Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env perl
use feature ":5.10";
use strict;
use utf8;
use Tatsumaki::Error;
use Tatsumaki::Application;
use Tatsumaki::HTTPClient;
use Tatsumaki::Server;
use Tatsumaki::MessageQueue;
$main::MQ = Tatsumaki::MessageQueue->instance("main");
package StreamCrowler;
use JSON;
use Data::Dumper;
use Encode;
use Try::Tiny;
use AnyEvent::HTTP;
use Time::Piece;
use LWP::UserAgent;
use TokyoCabinet;
use Storable qw/ nfreeze thaw /;
use Config::Pit;
use MIME::Base64;
use Term::ANSIColor;
$| = 1;
my $Config = pit_get("www.twitter.com", require => {
username => "username",
password => "password",
});
my $Tc = TokyoCabinet::HDB->new;
$Tc->open("stream.hdb", $Tc->OWRITER | $Tc->OCREAT)
or die $Tc->errmsg( $Tc->ecode );
my $Flap = 1;
my $Wait = time - 1;
my $Ua = LWP::UserAgent->new;
my $Remaining = "?";
$Ua->env_proxy;
$SIG{HUP} = $SIG{TERM} = $SIG{INT} = sub {
info("exiting...");
$Tc->close;
info("done");
exit;
};
http_get "http://chirpstream.twitter.com/2b/user.json",
headers => {
Authorization => encode_base64(
$Config->{username} . ":" . $Config->{password}
),
},
on_header => sub {
my ($headers) = @_;
print Dumper $headers;
},
want_body_handle => 1, # for some reason on_body => sub {} doesn't work :/
sub {
my ($handle, $headers) = @_;
$handle->push_read( json => \&parse_json );
}
;
sub esprintf {
my ($format, @args) = @_;
my $t = localtime;
my $message = sprintf "[%s %s] ", $t->ymd, $t->hms;
if ( @_ == 1 ) {
$message .= $format;
}
else {
$message .= sprintf $format, @args;
}
$main::MQ->publish( message => $message );
}
sub to_html {
my $str = shift;
$str =~ s{(https?://[-_.!~*'()a-zA-Z0-9;/?:\@&=+\$,%#]+)}
{<a href="$1" target="_blank">$1</a>}g;
$str =~ s{\@([0-9a-zA-Z_-]+)}
{<a href="http://twitter.com/$1">\@$1</a>}g;
$str;
}
sub do_event {
my $data = shift;
my $event = $data->{event};
given ($event) {
when (qr{(?:un)?follow}) {
esprintf(
qq{<font color="green">\@%s\@%s$event</font>\n},
get_user($data->{source}->{id})->{screen_name},
get_user($data->{target}->{id})->{screen_name},
);
}
when ("retweet") {
my $tweet = get_tweet($data->{target_object}->{id});
esprintf(
qq{<font color="cyan">RT \@%s\@%s$event: %s\n</font>},
get_user($data->{source}->{id})->{screen_name},
get_user($data->{target}->{id})->{screen_name},
to_html($tweet->{text}),
);
}
when (qr{^(?:un)?favorite}) {
my $tweet = get_tweet($data->{target_object}->{id});
esprintf(
qq{<font color="magenta">★ \@%s\@%s$event: %s\n</font>},
get_user($data->{source}->{id})->{screen_name},
get_user($data->{target}->{id})->{screen_name},
to_html($tweet->{text}),
)
}
default {
error("unknown: ". JSON->new->pretty->encode($data));
}
}
}
sub error {
warn colored(["red"], "@_") . "\n";
}
sub info {
warn colored(["blue"], "@_") . "\n";
}
sub handle_api_error {
my ($msg, $h) = @_;
given ($msg->{error}) {
when (qr{^Rate limit}) {
my $reset = $h->{ lc "X-RateLimit-Reset" };
error("RateLimit reset at " . localtime($reset));
$Wait = $reset;
}
default {
error( Dumper($msg) );
}
}
}
sub get_user {
my $id = shift;
my $user = $Tc->get("user:$id");
$user = thaw $user if $user;
return $user || get_user_by_api($id) || +{ screen_name => $id };
}
sub get_tweet {
my $id = shift;
my $tweet = $Tc->get("tweet:$id");
$tweet = thaw $tweet if $tweet;
return $tweet || get_tweet_by_api($id) || +{ text => $id };
}
sub get_user_by_api {
my $id = shift;
return if time < $Wait;
my $res = $Ua->get("http://api.twitter.com/1/users/show.json?user_id=$id");
if ($res->is_success) {
$Remaining = $res->headers->{ lc "X-RateLimit-Remaining" };
my $user = from_json($res->content);
$Wait = time - 1;
set_user($user);
return $user;
}
else {
handle_api_error(from_json($res->content), $res->headers);
}
return;
}
sub get_tweet_by_api {
my $id = shift;
return if time < $Wait;
my $res = $Ua->get("http://api.twitter.com/1/statuses/show/${id}.json");
if ($res->is_success) {
$Remaining = $res->headers->{ lc "X-RateLimit-Remaining" };
my $tweet = from_json($res->content);
my $user = delete $tweet->{user};
$Wait = time - 1;
set_tweet($tweet);
set_user($user);
return $tweet;
}
else {
handle_api_error(from_json($res->content), $res->headers);
}
return;
}
sub set_tweet {
my $tweet = shift;
my $user = delete $tweet->{user};
$Tc->put( "tweet:$tweet->{id}" => nfreeze $tweet ) or warn "failed\n";
$Tc->put( "user:$user->{id}" => nfreeze $user ) or warn "failed\n";
}
sub set_user {
my $user = shift;
$Tc->put( "user:$user->{id}" => nfreeze $user ) or warn "failed\n";
}
sub parse_json {
my ($handle, $data) = @_;
if ($data->{event}) {
try {
do_event($data)
}
catch {
error(@_)
};
}
elsif (my $del = $data->{delete}) {
esprintf(
qq{<font color="orange">DEL \@%s が delete: %s\n</font>},
get_user($del->{status}->{user_id})->{screen_name},
to_html( get_tweet($del->{status}->{id})->{text} ),
);
}
elsif ($data->{text}) {
esprintf(
qq{<img src="%s" width="32" height="32"><a href="http://twitter.com/%s" target="_blank">%s</a>: %s\n},
$data->{user}->{profile_image_url},
$data->{user}->{screen_name},
$data->{user}->{screen_name},
to_html($data->{text}),
);
set_tweet($data);
}
else {
esprintf( "%s\n", JSON->new->pretty->encode($data) );
}
$handle->push_read( json => \&parse_json );
}
package StreamWriter;
use parent qw(Tatsumaki::Handler);
__PACKAGE__->asynchronous(1);
sub get {
my $self = shift;
$self->response->content_type('text/plain; charset=utf-8');
$main::MQ->poll_once(
1,
sub {
$self->write( $_[1] );
$self->finish;
}
);
}
package MainHandler;
use parent qw(Tatsumaki::Handler);
use Text::MicroTemplate qw / :all /;
my $tmpl = <<'_END_';
? my $req = $_[0]->request;
<!Docutype HTML>
<html>
<head>
<meta charset="utf-8">
<title>UserStreamsClient</title>
<script src="http://www.google.com/jsapi"></script>
<script> google.load("jquery", "1.4") </script>
<style>
div { border-bottom: 1px solid #999; }
</style>
</head>
<body>
<div id="tweets"></div>
<script>
var tweets_buf = [];
var id = 0;
var generate_id = function() {
id = id + 1;
return "tweet-" + id;
};
var clear_buf = function() {
while (tweets_buf.length > 100) {
var id = tweets_buf.shift();
$("#"+id).remove();
}
};
var load_more = function() {
$.get(
"<?= $req->uri ?>stream",
function(data) {
if (data != "") {
var id = generate_id();
$("#tweets").prepend("<div id='" + id + "'>" + data + "</div>");
tweets_buf.push(id);
clear_buf();
}
load_more();
}
);
};
load_more();
</script>
</body>
</html>
_END_
sub get {
my $self = shift;
$self->write( render_mt( $tmpl, $self )->as_string );
}
package main;
my $app = Tatsumaki::Application->new([
"/" => "MainHandler",
"/stream" => "StreamWriter",
]);
return $app;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.