Skip to content

Instantly share code, notes, and snippets.

@toretore
Created June 1, 2013 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toretore/5690589 to your computer and use it in GitHub Desktop.
Save toretore/5690589 to your computer and use it in GitHub Desktop.
rails_env = ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'production'
worker_processes (rails_env == 'production' ? 5 : 3)
preload_app true
timeout 30
listen 20000
###########################################################################################
# ZMQ LOGGER ##############################################################################
###########################################################################################
require 'ffi-rzmq'
require 'json'
require 'time'
require 'timeout'
require 'securerandom'
# ZMQ logger
#
# Sets up a logger pushing out log messages and events on a ZMQ PUB socket
#
# * PUB tcp://0.0.0.0:21000
#
# ZMQLoggerSink
#
# Opens up PUB and PUSH sockets in a separate thread. Additionally opens
# an inproc PAIR socket to receive messages (both events and log messages)
# that will be published.
#
# Messages are in two parts: The first part contains the message type.
#
# * Log messages will have a type starting with "log". E.g. "log.user.create"
# These will be published on the PUSH socket.
#
# * Events will have a type starting with "event". E.g. "event.user.create"
# These will be published on the PUB socket.
#
#TODO: Is this thread really needed? ZMQ will shove messages into its own threads and send asynchronously.
# Just need to make sure that all ZMQ calls are async?
class ZMQLoggerSink
attr_reader :context, :pair_uri, :push_uri
def initialize(atts={})
atts.each{|k,v| instance_variable_set("@#{k}", v) }
@context ||= ZMQ::Context.new
@pair_uri ||= "inproc://zmq-log-#{SecureRandom.uuid}"
@push_uri ||= 'tcp://127.0.0.1:21000'
@t = Thread.new do
logger = context.socket(ZMQ::PAIR)
logger.bind(pair_uri)
#Events & log messages
push = context.socket(ZMQ::PUSH)
push.connect(push_uri)
parts = nil
loop do
parts = []
logger.recv_strings(parts)
push.send_strings(parts)#TODO: Why doesn't this block when PULL is down?
Rails.logger.info "SENDING MSG: #{parts[0]}"
end
end
end
end
# ZMQLogger
#
# Logs messages on a PAIR socket connected to a ZMQLoggerSink. Messages
# conform to those required by ZMQLoggerSink.
class ZMQLogger
def initialize(sink=ZMQLoggerSink.new)
@sink = sink
@logger = sink.context.socket(ZMQ::PAIR)
c = 0
loop do# As the other end of the PAIR is in another thread, we have to wait until it's bound before this connect succeeds
if c >= 10# Max retries = 10; give up after that
Rails.logger.error "[ZMQ LOGGER] Couldn't connect to inproc URI #{sink.pair_uri}"
break
end
break if @logger.connect(sink.pair_uri) == 0
c += 1
sleep 0.05#50ms (* 10 = 500ms before giving up)
end
end
# Log a message
#
# {'type' => 'user.create', message: 'User was created', data: {user_id: 345}}
def log(atts)
type = atts.delete('type') || 'unknown'
atts['time'] ||= Time.now.iso8601(3)
atts['id'] ||= SecureRandom.uuid
atts['source'] ||= "#{Rails.env}.#{Socket.gethostname}.#{$$}"
json = JSON.dump(atts)
Timeout.timeout 1 do #Shouldn't really timeout as it's nonblocking, but you never know (whether i know what i'm doing or not)
ZMQ::Util.error_check 'humbaba', @logger.send_string("log.#{type}", ZMQ::SNDMORE | ZMQ::NonBlocking)
ZMQ::Util.error_check 'humbaba', @logger.send_string(json, ZMQ::NonBlocking)
end
rescue Exception => e #Don't care what happens; this should never result in a user-facing error. It's not critical if it doesn't work.
Rails.logger.error "[ZMQ LOGGER] #{e.class}: #{e.message}\n#{e.backtrace.map{|l| " #{l}"}.join("\n")}"
end
# Emit an event
#
# {type: 'printer.update', printer_ip: '1.2.3.4', ...}
def emit(atts)
type = atts.delete('type') || 'unknown'
atts['time'] ||= Time.now.iso8601(3)
atts['id'] ||= SecureRandom.uuid
atts['source'] ||= "#{Rails.env}.#{Socket.gethostname}.#{$$}"
Timeout.timeout 1 do
ZMQ::Util.error_check 'humbaba', @logger.send_string("event.#{type}", ZMQ::SNDMORE | ZMQ::NonBlocking)
ZMQ::Util.error_check 'humbaba', @logger.send_string(JSON.dump(atts), ZMQ::NonBlocking)
end
rescue Exception => e
Rails.logger.error "[ZMQ LOGGER] #{e.class}: #{e.message}\n#{e.backtrace.map{|l| " #{l}"}.join("\n")}"
end
end
###########################################################################################
# AFTER FORK ##############################################################################
###########################################################################################
after_fork do |server, worker|
ActiveRecord::Base.establish_connection
$ZMQ_LOGGER = ZMQLogger.new
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment