Skip to content

Instantly share code, notes, and snippets.

@voloko
Created October 9, 2009 08:46
Show Gist options
  • Save voloko/205871 to your computer and use it in GitHub Desktop.
Save voloko/205871 to your computer and use it in GitHub Desktop.
Twitter realtime API client
require 'eventmachine'
require 'em/buftok'
require 'base64'
module Twitter
class JSONStream < EventMachine::Connection
MAX_LINE_LENGTH = 16*1024
# network failure reconnections
NF_RECONNECT_START = 0.25
NF_RECONNECT_ADD = 0.25
NF_RECONNECT_MAX = 16
# app failure reconnections
AF_RECONNECT_START = 10
AF_RECONNECT_MUL = 2
RECONNECT_MAX = 320
RETRIES_MAX = 10
DEFAULT_OPTIONS = {
:method => 'GET',
:path => '/',
:content_type => "application/x-www-form-urlencoded",
:content => '',
:path => '/1/statuses/filter.json',
:host => 'stream.twitter.com',
:port => 80,
:auth => 'test:test'
}
attr_accessor :code
attr_accessor :headers
attr_accessor :nf_last_reconnect
attr_accessor :af_last_reconnect
attr_accessor :reconnect_retries
def self.connect options = {}
options = DEFAULT_OPTIONS.merge(options)
EventMachine.connect options[:host], options[:port], self, options
end
def initialize options = {}
@options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly
@gracefully_closed = false
@nf_last_reconnect = nil
@af_last_reconnect = nil
@reconnect_retries = 0
end
def each_item &block
@each_item_callback = block
end
def on_error &block
@error_callback = block
end
def on_reconnect &block
@reconnect_callback = block
end
def stop
@gracefully_closed = true
close_connection
end
def unbind
receive_line(@buffer.flush) unless @buffer.empty?
schedule_reconnect unless @gracefully_closed
end
def receive_data data
begin
@buffer.extract(data).each do |line|
receive_line(line)
end
rescue Exception => e
receive_error(e.message)
close_connection
return
end
end
def post_init
reset_state
send_request
end
protected
def schedule_reconnect
timeout = reconnect_timeout
@reconnect_retries += 1
reconnect_after(timeout) if timeout <= RECONNECT_MAX && @reconnect_retries <= RETRIES_MAX
end
def reconnect_after timeout
@reconnect_callback.call(timeout) if @reconnect_callback
EventMachine.add_timer(timeout) do
reconnect @options[:host], @options[:port]
end
end
def reconnect_timeout
if (@code == 0) # network failure
if @nf_last_reconnect
@nf_last_reconnect += NF_RECONNECT_ADD
else
@nf_last_reconnect = NF_RECONNECT_START
end
[@nf_last_reconnect,NF_RECONNECT_MAX].min
else
if @af_last_reconnect
@af_last_reconnect *= AF_RECONNECT_MUL
else
@af_last_reconnect = AF_RECONNECT_START
end
@af_last_reconnect
end
end
def reset_state
@code = 0
@headers = []
@state = :init
@buffer = BufferedTokenizer.new("\r", MAX_LINE_LENGTH)
end
def send_request
data = []
data << "#{@options[:method]} #{@options[:path]} HTTP/1.1"
data << "Host: #{@options[:host]}"
data << "User-agent: ruby image client"
data << "Authorization: Basic " + [@options[:auth]].pack('m').delete("\r\n")
if @options[:method] == 'POST'
data << "Content-type: #{@options[:content_type]}"
data << "Content-length: #{@options[:content].length}"
end
data << "\r\n"
send_data data.join("\r\n") + @options[:content]
end
def receive_line ln
case @state
when :init
parse_response_line ln
when :headers
parse_header_line ln
when :stream
parse_stream_line ln
end
end
def receive_error e
@error_callback.call(e) if @error_callback
end
def parse_stream_line ln
ln.strip!
unless ln.empty?
if ln[0,1] == '{'
@each_item_callback.call(ln) if @each_item_callback
end
end
end
def parse_header_line ln
ln.strip!
if ln.empty?
reset_timeouts
@state = :stream
else
headers << ln
end
end
def parse_response_line ln
if ln =~ /\AHTTP\/1\.[01] ([\d]{3})/
@code = $1.to_i
@state = :headers
else
receive_error('invalid response')
close_connection
end
end
def reset_timeouts
@nf_last_reconnect = @af_last_reconnect = nil
@reconnect_retries = 0
end
end
end
# Moved JSONStream to a separate project.
# See: http://github.com/voloko/twitter-stream
# or simply sudo gem install twitter-stream -s http://gemcutter.org
require 'rubygems'
require 'twitter/json_stream'
EventMachine::run {
stream = Twitter::JSONStream.connect(
:path => '/1/statuses/filter.json',
:auth => 'LOGIN:PASSWORD',
:method => 'POST',
:content => 'track=something'
)
stream.each_item do |item|
# do something useful here (like posting to a message queue or saving to a db)
print 'found: ' + item
end
stream.on_error do |message|
# for logging and debugging purposes
# you might want to examine stream.headers here
print 'error: ' + message
end
stream.on_reconnect do |timeout|
# timeout adheres to http://apiwiki.twitter.com/Streaming-API-Documentation#Connecting
print "reconnecting in: #{timeout} seconds"
end
trap('TERM') {
stream.stop
EventMachine.stop if EventMachine.reactor_running?
}
}
puts "The event loop has ended"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment