Skip to content

Instantly share code, notes, and snippets.

@sahglie
Created October 26, 2018 17:47
Show Gist options
  • Save sahglie/79b266b0f2cf2bb8603b27682f094875 to your computer and use it in GitHub Desktop.
Save sahglie/79b266b0f2cf2bb8603b27682f094875 to your computer and use it in GitHub Desktop.
Racecar config
class DhcpConsumer < Racecar::Consumer
self.group_id = "sock-dhcp-#{Rails.env.to_s}"
TOPICS = ["dhcp-infoblox"]
subscribes_to(*TOPICS, start_from_beginning: false, max_bytes_per_partition: 250_000)
MAX_THREADS = 5
def initialize
set_db_pool_size(6) unless Rails.env.test?
end
def process_batch(batch)
threads = []
dhcp_logs = []
batch.messages.each do |message|
attrs = JSON.parse(message.value)
dhcp_logs.push(attrs)
if save_threshold_reached?(dhcp_logs.size)
threads << save_dhcp_logs(dhcp_logs.dup)
dhcp_logs.clear
end
if threads.size == MAX_THREADS
threads.each(&:join)
threads.clear
end
end
if dhcp_logs.size > 0
threads << save_dhcp_logs(dhcp_logs)
end
threads.each(&:join)
end
private
def set_db_pool_size(pool_size)
ActiveRecord::Base.connection.disconnect!
db_config = Rails.application.config.database_configuration[Rails.env].merge("pool" => pool_size)
ActiveRecord::Base.establish_connection(db_config)
end
def save_dhcp_logs(dhcp_logs)
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
dhcp_logs.each { |attrs| NosData::InfobloxDhcpLog.create(attrs) }
end
end
end
end
def save_threshold_reached?(num)
return true if Rails.env.test?
num >= 500
end
end
Racecar.configure do |config|
config.brokers = KafkaConfig.kafka_brokers
config.client_id = "sock-client"
config.offset_commit_threshold = 1_000
config.connect_timeout = 30
config.socket_timeout = 30
config.session_timeout = 200
config.heartbeat_interval = 10
config.log_level = 'info'
config.max_fetch_queue_size = 500
config.logger = Rails.logger
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment