Created
June 1, 2013 14:27
-
-
Save toretore/5690589 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rails_env = ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'production' | |
worker_processes (rails_env == 'production' ? 5 : 3) | |
preload_app true | |
timeout 30 | |
listen 20000 | |
########################################################################################### | |
# ZMQ LOGGER ############################################################################## | |
########################################################################################### | |
require 'ffi-rzmq' | |
require 'json' | |
require 'time' | |
require 'timeout' | |
require 'securerandom' | |
# ZMQ logger | |
# | |
# Sets up a logger pushing out log messages and events on a ZMQ PUB socket | |
# | |
# * PUB tcp://0.0.0.0:21000 | |
# | |
# ZMQLoggerSink | |
# | |
# Opens up PUB and PUSH sockets in a separate thread. Additionally opens | |
# an inproc PAIR socket to receive messages (both events and log messages) | |
# that will be published. | |
# | |
# Messages are in two parts: The first part contains the message type. | |
# | |
# * Log messages will have a type starting with "log". E.g. "log.user.create" | |
# These will be published on the PUSH socket. | |
# | |
# * Events will have a type starting with "event". E.g. "event.user.create" | |
# These will be published on the PUB socket. | |
# | |
#TODO: Is this thread really needed? ZMQ will shove messages into its own threads and send asynchronously. | |
# Just need to make sure that all ZMQ calls are async? | |
class ZMQLoggerSink | |
attr_reader :context, :pair_uri, :push_uri | |
def initialize(atts={}) | |
atts.each{|k,v| instance_variable_set("@#{k}", v) } | |
@context ||= ZMQ::Context.new | |
@pair_uri ||= "inproc://zmq-log-#{SecureRandom.uuid}" | |
@push_uri ||= 'tcp://127.0.0.1:21000' | |
@t = Thread.new do | |
logger = context.socket(ZMQ::PAIR) | |
logger.bind(pair_uri) | |
#Events & log messages | |
push = context.socket(ZMQ::PUSH) | |
push.connect(push_uri) | |
parts = nil | |
loop do | |
parts = [] | |
logger.recv_strings(parts) | |
push.send_strings(parts)#TODO: Why doesn't this block when PULL is down? | |
Rails.logger.info "SENDING MSG: #{parts[0]}" | |
end | |
end | |
end | |
end | |
# ZMQLogger | |
# | |
# Logs messages on a PAIR socket connected to a ZMQLoggerSink. Messages | |
# conform to those required by ZMQLoggerSink. | |
class ZMQLogger | |
def initialize(sink=ZMQLoggerSink.new) | |
@sink = sink | |
@logger = sink.context.socket(ZMQ::PAIR) | |
c = 0 | |
loop do# As the other end of the PAIR is in another thread, we have to wait until it's bound before this connect succeeds | |
if c >= 10# Max retries = 10; give up after that | |
Rails.logger.error "[ZMQ LOGGER] Couldn't connect to inproc URI #{sink.pair_uri}" | |
break | |
end | |
break if @logger.connect(sink.pair_uri) == 0 | |
c += 1 | |
sleep 0.05#50ms (* 10 = 500ms before giving up) | |
end | |
end | |
# Log a message | |
# | |
# {'type' => 'user.create', message: 'User was created', data: {user_id: 345}} | |
def log(atts) | |
type = atts.delete('type') || 'unknown' | |
atts['time'] ||= Time.now.iso8601(3) | |
atts['id'] ||= SecureRandom.uuid | |
atts['source'] ||= "#{Rails.env}.#{Socket.gethostname}.#{$$}" | |
json = JSON.dump(atts) | |
Timeout.timeout 1 do #Shouldn't really timeout as it's nonblocking, but you never know (whether i know what i'm doing or not) | |
ZMQ::Util.error_check 'humbaba', @logger.send_string("log.#{type}", ZMQ::SNDMORE | ZMQ::NonBlocking) | |
ZMQ::Util.error_check 'humbaba', @logger.send_string(json, ZMQ::NonBlocking) | |
end | |
rescue Exception => e #Don't care what happens; this should never result in a user-facing error. It's not critical if it doesn't work. | |
Rails.logger.error "[ZMQ LOGGER] #{e.class}: #{e.message}\n#{e.backtrace.map{|l| " #{l}"}.join("\n")}" | |
end | |
# Emit an event | |
# | |
# {type: 'printer.update', printer_ip: '1.2.3.4', ...} | |
def emit(atts) | |
type = atts.delete('type') || 'unknown' | |
atts['time'] ||= Time.now.iso8601(3) | |
atts['id'] ||= SecureRandom.uuid | |
atts['source'] ||= "#{Rails.env}.#{Socket.gethostname}.#{$$}" | |
Timeout.timeout 1 do | |
ZMQ::Util.error_check 'humbaba', @logger.send_string("event.#{type}", ZMQ::SNDMORE | ZMQ::NonBlocking) | |
ZMQ::Util.error_check 'humbaba', @logger.send_string(JSON.dump(atts), ZMQ::NonBlocking) | |
end | |
rescue Exception => e | |
Rails.logger.error "[ZMQ LOGGER] #{e.class}: #{e.message}\n#{e.backtrace.map{|l| " #{l}"}.join("\n")}" | |
end | |
end | |
########################################################################################### | |
# AFTER FORK ############################################################################## | |
########################################################################################### | |
after_fork do |server, worker| | |
ActiveRecord::Base.establish_connection | |
$ZMQ_LOGGER = ZMQLogger.new | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment