Skip to content

Instantly share code, notes, and snippets.

@sekimura
Created August 3, 2009 06:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sekimura/160388 to your computer and use it in GitHub Desktop.
Save sekimura/160388 to your computer and use it in GitHub Desktop.
watercoolr (pubsub via webhooks) in Perl
#!/usr/bin/perl
## Perl implementation of watercoolr : http://watercoolr.nuklei.com/
use strict;
use warnings;
use AnyEvent::HTTPD;
use AnyEvent::HTTP;
use Coro::AnyEvent;
use Compress::Zlib qw(crc32);
use JSON::XS;
use URI::Escape;
use Math::BaseCalc;
use DBI;
my $port = 8181;
my $dbh = DBI->connect($ENV{DATABASE_URL} || 'dbi:SQLite:dbname=watercoolr.db', '', '');
$dbh->do(<<END_OF_SQL);
CREATE TABLE IF NOT EXISTS channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(32)
);
END_OF_SQL
$dbh->do(<<END_OF_SQL);
CREATE TABLE IF NOT EXISTS subscribers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id INTEGER,
url VARCHAR(128)
);
END_OF_SQL
# $dbh->do(<<END_OF_SQL);
# INSERT INTO channels (name) VALUES ('hogehoge');
# END_OF_SQL
# $dbh->do(<<END_OF_SQL);
# INSERT INTO subscribers (channel_id, url) VALUES (1, 'http://labs.qootas.org');
# END_OF_SQL
sub gen_id {
my $base = int(rand(100000000));
my $salt = localtime();
my $calc36 = Math::BaseCalc->new(digits=>[0..9,'a'..'z']);
$calc36->to_base(crc32($base . $salt));
}
my $httpd = AnyEvent::HTTPD->new( port => $port );
$httpd->reg_cb(
'/channels' => sub {
my ($httpd, $req) = @_;
my $id = gen_id();
#$db{channels}{$id} = 1;
$dbh->do(q{INSERT INTO channels (name) VALUES (?)}, undef, $id);
$req->respond( { content => ['application/javascript',
JSON::XS->new->ascii->encode({ id => $id })
]});
},
'/subscribers' => sub {
my ($httpd, $req) = @_;
my $json = $req->parm('data');
my $data = JSON::XS->new->ascii->decode($json);
my $channel = $data->{channel};
my $url = $data->{url};
my $rec = $dbh->selectall_arrayref(q{SELECT * FROM channels WHERE name = ?}, {Slice => {}}, $channel)->[0];
if ($rec) {
$dbh->do(q{INSERT INTO subscrivers (channel_id, url) VALUES (?, ?)}, undef, $rec->{id}, $url);
}
$req->respond( { content => ['application/javascript',
JSON::XS->new->ascii->encode({ status => 'OK' })
]});
},
'/messages' => sub {
my ($httpd, $req) = @_;
my $json = $req->parm('data');
my $data = JSON::XS->new->ascii->decode($json);
my $channel = $data->{channel};
my $message = $data->{message};
my $rec = $dbh->selectall_arrayref(q{SELECT * FROM channels WHERE name = ?}, {Slice => {}}, $channel)->[0];
if ($rec) {
my $aref = $dbh->selectall_arrayref('SELECT * FROM subscribers WHERE channel_id = ?', {Slice => {}}, $rec->{id});
for my $sub (@$aref) {
my $url = $sub->{url};
warn "$url $message";
AnyEvent::HTTP::http_post $url, 'data='.uri_escape($message), sub {
my ($body, $hdr) = @_;
use Data::Dumper;
print Dumper $hdr;
};
}
}
$req->respond( { content => ['application/javascript',
JSON::XS->new->ascii->encode({ status => 'OK' })
]});
},
);
$httpd->run;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment