Skip to content

Instantly share code, notes, and snippets.

@dukex
Created November 22, 2017 10:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dukex/6852b1ae72999dc7c88afcab7498fc57 to your computer and use it in GitHub Desktop.
Save dukex/6852b1ae72999dc7c88afcab7498fc57 to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
module MyApp
module TaggedWorkerTracer
extend ActiveSupport::Concern
attr_reader :metadata, :delivery_info,
:routing_key, :correlation_id, :customer_id,
:timestamp
def work_with_params(message, delivery_info, metadata)
clear_context!
@metadata = metadata.to_hash.deep_symbolize_keys
# To cast delivery_info as_json must exclude fields :channel and :consumer
@delivery_info = delivery_info.to_hash.deep_symbolize_keys
@correlation_id = self.metadata.dig(:headers, :correlation_id)
@customer_id = self.metadata.dig(:headers, :customer_id)
@routing_key = self.delivery_info[:routing_key]
raw_timestamp = self.metadata.dig(:headers, :raw_timestamp)
@timestamp = raw_timestamp ? Time.at(raw_timestamp) : self.metadata[:timestamp]
tag_logger(metadata) do
begin
log_info(
'Msg received',
message,
self.class::QUEUE_NAME,
metadata,
delivery_info
)
parsed_message = parse(message)
logger.info 'Msg parsed'
RequestId.with_request_id(metadata.to_hash.dig(:headers, 'correlation_id')) do
work(parsed_message).tap { logger.info 'Msg processed' }
end
rescue => exception
handle_error!(
exception,
message,
self.class::QUEUE_NAME,
metadata,
delivery_info,
self.class.to_s
)
ensure
ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end
end
end
def run
super
rescue => exception
Raven.capture_exception(exception)
sleep 1 # time to send to sentry
raise
end
private
def parse(message)
JSON.parse(message, symbolize_names: true)
end
def tag_logger(metadata)
correlation_id = metadata.to_hash.dig(:headers, 'correlation_id') || :unknown
message_id = metadata[:message_id] || :nil
Rails.logger.tagged("uuid-#{SecureRandom.uuid}", "cid-#{correlation_id}", "msg_id-#{message_id}") do
yield
end
end
def log_info(description, payload, queue_name, metadata, delivery_info)
logger.info("[DESCRIPTION: #{description}]")
logger.info("[QUEUE_NAME: #{queue_name}]")
logger.info("[PAYLOAD: #{payload.to_s.force_encoding(Encoding::UTF_8)}]")
logger.info("[DELIVERY_INFO: #{delivery_info}]")
logger.info("[METADATA: #{metadata}]")
end
def handle_error!(exception, payload, queue_name, metadata, delivery_info, worker)
log_error_message(exception, payload, queue_name, metadata, delivery_info)
raise exception
end
def log_error_message(exception, payload, queue_name, metadata, delivery_info)
logger.error("[ERROR: #{exception.message} | ERROR_CLASS: #{exception.class.name}]")
logger.error("[QUEUE_NAME: #{queue_name}]")
logger.error("[PAYLOAD: #{payload.to_s.force_encoding(Encoding::UTF_8)}]")
logger.error("[ADDITIONAL_ERROR_INFO: #{exception.try(:additional_error_info)}]")
logger.error("[BACKTRACE: #{exception.backtrace.take(50).join(', ')}]") if exception.backtrace
logger.error("[DELIVERY_INFO: #{delivery_info}]")
logger.error("[METADATA: #{metadata}]")
end
def clear_context!
Raven::Context.clear!
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment