Skip to content

Instantly share code, notes, and snippets.

@MarcusHoile
Created December 23, 2014 10:09
Show Gist options
  • Save MarcusHoile/2f3f7237b737fbff03b0 to your computer and use it in GitHub Desktop.
Save MarcusHoile/2f3f7237b737fbff03b0 to your computer and use it in GitHub Desktop.
bunny
require "bunny"
require 'json'
require 'yaml'
# Run this message_queue by passing a queue name like this:
# ruby -rubygems ./app/services/message_queue.rb queue_name
module Bunny
class Listener
attr_accessor :queue_name, :conn, :ch, :q
attr_reader :bunny_conf
def initialize a_queue_name=nil, a_conn=nil, a_ch=nil, a_q=nil
@conn = a_conn
@ch = a_ch
@q = a_q
@bunny_conf = HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(Rails.root.join("config","bunny.yml"))).result))
@queue_name = a_queue_name
end
def connect
@conn = Bunny.new(bunny_conf[Rails.env])
@queue_name = bunny_conf[Rails.env][:queue]
conn.start
@ch = conn.create_channel
@q = ch.queue(queue_name, :durable=> true, :exclusive => false, :auto_delete => false)
ch.prefetch(1)
end
def disconnect
conn.close
end
# This is for timing how long it takes to precess
# def report_time start_time, match_import
# match_import.output_log("count: #{q.message_count}, #{q.message_count == 0}")
# if q.message_count == 0
# end_time = Time.now
# difference_seconds = (end_time - start_time).to_i
# match_import.output_log("message_count: #{message_count}, difference_seconds: #{difference_seconds}, start: #{start_time}")
# end
# end
def subscribe
# start_time = Time.now
puts " [*] Waiting for messages. To exit press CTRL+C"
begin
q.subscribe(:ack => true, :block => true) do |delivery_info, metadata, game|
# puts " [x] Received '#{game}'"
if game.start_with?('{"status"')
puts "Got one status message"
else
game =JSON.parse(game)
if (!Parser::Factory::NEWSPORTS.include?(game["property"]["game.fixture"]) && game["property"]["teamA.goals"]) || (Parser::Factory::NEWSPORTS.include?(game["property"]["game.fixture"]))
message = {:entity => game["entity"],
# :league_query_string => game["league.query.string"],
# :league_season_name => game["league.season.name"],
:match_datetime_int => game["property"]["game.kickoff"],
:game_fixture => game["property"]["game.fixture"],
:team_a_goals => game["property"]["teamA.goals"],
:team_b_goals => game["property"]["teamB.goals"],
:message_array => game}
puts " [x] Start update"
begin
factory = Bunny::Parser::Factory.instance
parser = factory.make_parser(message)
parser.update_score
rescue => e
parser.output_log(e.message) if parser
puts " [ERROR] #{e.message}"
end
puts " [x] End update"
else
puts " [x] No score to update"
end
end
ch.ack(delivery_info.delivery_tag)
# report_time(start_time, match_import)
end
rescue Interrupt => _
disconnect
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment