Skip to content

Instantly share code, notes, and snippets.

@adamstrickland
Created June 5, 2020 00:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamstrickland/bd28c7c6c3ce374547a38ca5dcdd0378 to your computer and use it in GitHub Desktop.
Save adamstrickland/bd28c7c6c3ce374547a38ca5dcdd0378 to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
require "delivery_boy"
class EventPublisher
KAFKA_TOPIC_PREFIX = ENV.fetch("KAFKA_TOPIC_PREFIX")
THREAD_LOCAL_KEY = :event_publisher_transaction_queue
DEFAULT_EVENT_DELIVERER = "ThreadLocalBackedDeliverer".freeze
delegate :add_event, :transaction_opened!, :transaction_committed!, :transaction_rolled_back!, to: :deliverer
def self.add_event(event)
deliverer.add_event(event)
end
def deliverer
deliverer_name = ENV.fetch("KAFKA_EVENT_DELIVERER", DEFAULT_EVENT_DELIVERER)
"EventPublisher::#{deliverer_name}".constantize
end
class SimpleArrayDeliverer
def self.add_event(event)
end
def self.transaction_opened!
@messages ||= []
end
def self.transaction_committed!
end
def self.transaction_rolled_back!
end
end
class DeliveryBoyDeliverer
def self.add_event(event)
end
def self.transaction_opened!
@
end
def self.transaction_committed!
end
def self.transaction_rolled_back!
end
end
class ThreadLocalBackedDeliverer
def self.transaction_opened!
Thread.current[THREAD_LOCAL_KEY] ||= []
end
def self.transaction_committed!
transaction_events.each do |event|
publish!(event)
end
clear_transaction_events!
end
def self.transaction_rolled_back!
clear_transaction_events!
end
def self.transaction_open?
Thread.current[THREAD_LOCAL_KEY].is_a?(Array)
end
def self.add_event(event)
if transaction_open?
transaction_events << event
else
publish!(event)
end
end
def self.publish!(event)
return unless Flipper.enabled?(:event_publishing)
if KafkaClient.nil?
log_event!(event, message: "Kafka client not initialized")
else
publish_event!(event)
end
end
private_class_method def self.increment_metric!
NewRelic::Agent.increment_metric("Custom/EventPublisher/publish_event!", 1)
end
private_class_method def self.publish_event!(event)
table_name = event[:data].keys[0]
begin
KafkaClient.deliver_message(event.to_json, topic: "#{KAFKA_TOPIC_PREFIX}#{table_name}")
increment_metric!
rescue StandardError => e
# Broad rescue so that event publishing dooes not interfere with user experience/business logic
Rollbar.error(e)
log_event!(event, severity: :error, message: "Failed to send event to Kafka")
end
end
private_class_method def self.log_event!(event, severity: :debug, message: "Logging event")
Rails.logger.send(severity, "#{message}: #{event.to_json}")
end
private_class_method def self.clear_transaction_events!
Thread.current[THREAD_LOCAL_KEY] = nil
end
private_class_method def self.transaction_events
Thread.current[THREAD_LOCAL_KEY]
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment