Skip to content

Instantly share code, notes, and snippets.

@jeremyvdw
Created March 5, 2012 18:27
Show Gist options
  • Save jeremyvdw/1980156 to your computer and use it in GitHub Desktop.
Save jeremyvdw/1980156 to your computer and use it in GitHub Desktop.
apublish & asubscribe PubNub methods (wrapped in Fibers)
require 'eventmachine'
require 'em-http-request'
require 'yajl'
module PubSub
class Pubnub
#**
#* Pubnub
#*
#* Init the Pubnub Client API
#*
#* @param string publish_key required key to send messages.
#* @param string subscribe_key required key to receive messages.
#* @param string secret_key required key to sign messages.
#* @param boolean ssl required for 2048 bit encrypted messages.
#*
def initialize(publish_key, subscribe_key, secret_key, ssl_on)
@publish_key = publish_key
@subscribe_key = subscribe_key
@secret_key = secret_key
@ssl = ssl_on
@origin = 'pubsub.pubnub.com'
@limit = 1_800
@timetoken = 0
if @ssl
@origin = 'https://' + @origin
else
@origin = 'http://' + @origin
end
end
#**
#* Apublish
#*
#* This is NON-BLOCKING.
#*
def apublish(channel, message, callback = nil, errback = nil)
## Capture User Input
message = message.to_json
if ENV['VERBOSE_PUBNUB']
puts "\n ==> PUB to #{channel.inspect}: #{message}\n"
end
## Sign Message
signature = @secret_key.length > 0 ? Digest::MD5.hexdigest([
@publish_key,
@subscribe_key,
@secret_key,
channel,
message
].join('/')) : '0'
## Fail if message (in bytes) too long.
## Hard limit set to 1_800 bytes (http://www.pubnub.com/tutorial/developer-intro-tutorial)
if message.size_in_bytes > @limit
puts('Message TOO LONG (' + @limit.to_s + ' LIMIT)')
return [ 0, 'Message Too Long.' ]
end
## Send Message
request = [
'publish',
@publish_key,
@subscribe_key,
signature,
channel,
'0',
message
]
## Construct Request URL
url = @origin + '/' + request.map{ |bit| bit.split('').map{ |ch|
' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.index(ch) ?
'%' + ch.unpack('H2')[0].to_s.upcase : URI.encode(ch)
}.join('') }.join('/')
Fiber.new {
http = EventMachine::HttpRequest.new(url).get(:keepalive => true)
start = Time.now.to_f
http.errback {
message = "Failed to publish message: #{http.error}"
errback.call message
}
http.callback {
if ENV['VERBOSE_PUBNUB']
duration = '%.3f' % (Time.now.to_f - start)
puts " <== PUBNUB responded in #{duration} seconds: #{http.response.inspect} - for message #{request.last}\n"
end
callback.call Yajl::Parser.parse(http.response)
}
}.resume
end
#**
#* Asubscribe
#*
#* This is NON-BLOCKING.
#*
def asubscribe(channel, callback = nil, errback = nil, timetoken = 0)
request = [
'subscribe',
@subscribe_key,
channel,
'0',
timetoken.to_s
]
## Construct Request URL
url = @origin + '/' + request.map{ |bit| bit.split('').map{ |ch|
' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.index(ch) ?
'%' + ch.unpack('H2')[0].to_s.upcase : URI.encode(ch)
}.join('') }.join('/')
Fiber.new {
http = EM::HttpRequest.new(url).get(:keepalive => true)
http.errback {
# Check if there's a *real* error to call the errback
EM::Timer.new(1) do
asubscribe(channel, callback, errback, timetoken)
end
}
http.callback {
response = Yajl::Parser.parse(http.response)
messages = response[0]
timetoken = response[1]
## If it was a timeout
next if !messages.length
## Run user Callback and Reconnect if user permits.
messages.each do |message|
callback.call(message)
end
asubscribe(channel, callback, errback, timetoken)
}
}.resume
end
##################### WIP ##################
#**
#* History
#*
#* Load history from a channel.
#*
#* @param array args with 'channel' and 'limit'.
#* @return mixed false on fail, array on success.
#*
def history(args)
## Capture User Input
limit = +args['limit'] ? +args['limit'] : 10
channel = args['channel']
## Fail if bad input.
if (!channel)
puts 'Missing Channel.'
return false
end
## Get History
return self._request([
'history',
@subscribe_key,
channel,
'0',
limit.to_s
])
end
#**
#* Time
#*
#* Timestamp from PubNub Cloud.
#*
#* @return int timestamp.
#*
def time()
return self._request([
'time',
'0'
])[0]
end
#**
#* Request URL
#*
#* @param array request of url directories.
#* @return array from JSON response.
#*
def _request(request)
## Construct Request URL
url = @origin + '/' + request.map{ |bit| bit.split('').map{ |ch|
' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.index(ch) ?
'%' + ch.unpack('H2')[0].to_s.upcase : URI.encode(ch)
}.join('') }.join('/')
response = ''
begin
start = Time.now.to_f
open(url) do |f|
response = f.read
end
if ENV['VERBOSE_PUBNUB']
duration = '%.3f' % (Time.now.to_f - start)
puts " <== PUBNUB responded in #{duration} seconds: #{response.inspect} - for message #{request.last}\n"
end
return JSON.parse(response)
rescue OpenURI::HTTPError => err
puts "\n\nPubNub HTTP Error on request. Message:"
puts request.last
puts "\n Response:"
puts err.io.try(:read)
puts "\n\n"
raise err
end
end
end
end
class String
def size_in_bytes
self.unpack("C*").size
end
end
require 'apubnub'
EM.run {
channel = 'test'
message = 'Hello World'
pubnub = PubSub::Pubnub.new(Settings.pubnub.publish_key, Settings.pubnub.subscribe_key, Settings.pubnub.secret_key, true)
cb = Proc.new { |reply|
puts "Message: #{reply}"
}
errb = Proc.new { |error|
puts "Error: #{error}"
}
pubnub.asubscribe(channel, cb, errb)
pubnub.apublish(channel, message, cb, errb)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment