Skip to content

Instantly share code, notes, and snippets.

@bschwartz
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bschwartz/d9b82670c48bcf6a5b7d to your computer and use it in GitHub Desktop.
Save bschwartz/d9b82670c48bcf6a5b7d to your computer and use it in GitHub Desktop.
#
# Put this in a directory and create a `Gemfile` with this in it:
#
# source 'https://rubygems.org'
# gem 'nsq-cluster'
# gem 'nsq-ruby'
#
# Then run `bundle install` to install the require gems.
# Then run this like so: `ruby fin-failer.rb`
#
require 'bundler/setup'
require 'nsq-cluster'
require 'nsq'
require 'logger'
# Start nsqd
# Change verbose to true if you want tons of NSQD logs
$cluster = NsqCluster.new(
nsqd_count: 1,
verbose: false,
nsqd_options: { verbose: true }
)
$cluster.block_until_running
at_exit { $cluster.destroy }
NSQD = "#{$cluster.nsqd.first.host}:#{$cluster.nsqd.first.tcp_port}"
TOPIC = 'fin-failer'
# Publish messages forever in a separate thread
pub_thread = Thread.new do
producer = Nsq::Producer.new(nsqd: NSQD, topic: TOPIC)
n = 0
loop do
if n % 1000 == 0
#puts "Wrote: #{n}"
sleep 1
end
producer.write(n)
n += 1
end
end
$messages = {}
# Find errors like this, and report what message they were from
# "Error received: E_FIN_FAILED FIN 06dbb0680b436005 failed ID not in flight"
class FailLogger < Logger
def error(msg)
if msg =~ /E_FIN_FAILED FIN (\w+)/
msg_id = $1
puts "FIN Failed for: #{$messages[msg_id]}"
end
end
end
# Read messages forever
consumer = Nsq::Consumer.new(nsqd: NSQD, topic: TOPIC, channel: 'fin-failer-channel', max_in_flight: 50)
Nsq.logger = FailLogger.new(nil)
Nsq.logger.level = Logger::ERROR
loop do
msg = consumer.pop
$messages[msg.id] = msg.body
msg.finish
# print a . every 1000
if msg.body.to_i % 1000 == 0
puts '.'
end
# quit after 5000
if msg.body.to_i >= 5000
exit
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment