Skip to content

Instantly share code, notes, and snippets.

@IlyaDonskikh
Created August 20, 2019 18:42
Show Gist options
  • Save IlyaDonskikh/5044abd6064f3bbcee7ce2864c12cf6f to your computer and use it in GitHub Desktop.
Save IlyaDonskikh/5044abd6064f3bbcee7ce2864c12cf6f to your computer and use it in GitHub Desktop.
class KafkaProducer
include Singleton
class << self
def assign_kafka
kafka = Kafka.new(
ENV.fetch('KAFKA_BROKERS').split(','),
sasl_scram_username: ENV['KAFKA_USERNAME'],
sasl_scram_password: ENV['KAFKA_PASSWORD'],
sasl_scram_mechanism: ENV['KAFKA_MECHANISM'],
ssl_ca_cert_file_path: tmp_ca_file.try(:path),
logger: Rails.logger
)
class_variable_set(:@@kafka, kafka)
assign_kafka_producer
end
def assign_kafka_producer
kafka = class_variable_get(:@@kafka)
kafka_producer = kafka.async_producer(
delivery_interval: 10,
)
class_variable_set(:@@kafka_producer, kafka_producer)
end
def tmp_ca_file
return unless ENV['KAFKA_TRUSTED_CERT']
tmp_ca_file = Tempfile.new('kafka_ca_certs', "#{Rails.root.to_s}/tmp/")
tmp_ca_file.write(ENV.fetch("KAFKA_TRUSTED_CERT"))
tmp_ca_file.close
tmp_ca_file
end
def method_missing(message, *args, &block)
kafka = class_variable_get(:@@kafka)
kafka.send(message, *args)
end
end
assign_kafka
assign_kafka_producer
end
at_exit do
KafkaProducer.class_variable_get(:@@kafka_producer).shutdown
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment