Skip to content

Instantly share code, notes, and snippets.

@NeoCat
Created September 25, 2010 05:40
Show Gist options
  • Save NeoCat/596521 to your computer and use it in GitHub Desktop.
Save NeoCat/596521 to your computer and use it in GitHub Desktop.
Streaming API plugin and proxy server for twicli
var chirp_cnt = 0;
function update() {
if (!myname) return auth();
callPlugins("update");
if (chirp_cnt++ == 0)
update_ele = loadXDomainScript(twitterAPI + 'statuses/home_timeline.json' +
'?count=' + (since_id ? 200 : max_count) +
'&suppress_response_codes=true&callback=twShow', update_ele);
else
update_ele = loadXDomainScript('http://localhost:10080/?seq='+ (seq++) +
(chirp_cnt==2?'&last=1':''), update_ele);
}
function resetUpdateTimer() {
}
function chirp(tw) {
twShow([tw]);
update_ele = loadXDomainScript('http://localhost:10080/?seq='+seq++, update_ele);
}
var getRepliesTimer = null, checkDirectTimer = null;
function twShow(tw) {
if (tw.error) return error(tw.error);
tw.reverse();
for (var j in tw) if (tw[j] && tw[j].user) callPlugins("gotNewMessage", tw[j]);
tw.reverse();
if (nr_page == 0) {
nr_page = 1;
$("tw").appendChild(nextButton('get_old', nr_page));
}
var nr_shown = twShowToNode(tw, $("tw"), false, false, true, true, true);
if (!getRepliesTimer) {
_getReplies();
_checkDirect();
getRepliesTimer = setInterval(_getReplies, 1000*60*1);
checkDirectTimer = setInterval(_checkDirect, 1000*60*5);
}
callPlugins("noticeUpdate", tw, nr_shown);
if (chirp_cnt == 1)
update();
}
function _getReplies() {
if ($("tw").oldest_id)
getReplies();
}
function _checkDirect() {
checkDirect();
}
#!/usr/local/bin/ruby
require 'webrick'
require 'oauth'
def signed_uri
request = OAuth::RequestProxy.proxy(
"method" => 'GET',
"uri" => 'https://betastream.twitter.com/2b/user.json',
"parameters" => {
"oauth_consumer_key" => "*****************",
"oauth_token" => "*****-*****************",
"oauth_nonce" => OAuth::Helper.generate_key,
"oauth_timestamp" => OAuth::Helper.generate_timestamp,
"oauth_signature_method" => "HMAC-SHA1",
"oauth_version" => "1.0"
})
request.sign!(
:consumer_secret => "******************",
:token_secret => "***************"
)
puts "URI: #{request.signed_uri}"
request.signed_uri
end
$last = nil
$buffered = []
$waiting_thread = nil
class LineServlet < WEBrick::HTTPServlet::AbstractServlet
def do_GET(req, res)
res['Content-Type'] = 'text/javascript'
last_mode = !!$last && req.query['last'] == "1"
while !last_mode && $buffered.length == 0
$waiting_thread = Thread.current
sleep 10
end
$waiting_thread = nil
$last = $buffered.shift if !last_mode
res.body = 'chirp(' + $last + ');'
puts "remaining #{$buffered.length} messages"
end
end
server = WEBrick::HTTPServer.new({
:Port => 10080,
:BindAddress => '127.0.0.1'})
server.mount('/', LineServlet)
trap('INT') { server.shutdown }
trap('TERM') { server.shutdown }
server_thread = Thread.new {
server.start
}
class TimeOut < Exception
end
def timeout(sec)
is_timeout = FALSE
begin
x = Thread.current
y = Thread.start {
sleep sec
if x.alive?
#puts "timeout!"
x.raise TimeOut, "timeout"
end
}
begin
yield
rescue TimeOut
is_timeout = TRUE
end
ensure
Thread.kill y if y && y.alive?
end
is_timeout
end
def read_stream(stream)
ret = ''
timeout(60) { ret = stream.readline.chomp! } and raise TimeOut, "timeout"
ret
end
retries = 5
while (retries-=1) >= 0
IO.popen("curl -Ns '#{signed_uri}'") do |stream|
pid = stream.pid
puts "curl pid: #{pid}"
begin
read_stream(stream) # skip friends line
while l = read_stream(stream)
next if l == ''
$buffered.push l
puts "buffered #{$buffered.length} messages : #{l[0, 40]}..."
$waiting_thread.run if $waiting_thread
end
rescue TimeOut
Process.kill :TERM, pid
puts "\atimeout!"
end
end
end
puts "exiting ..."
server_thread.kill if server_thread.alive?
server.shutdown
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment