Skip to content

Instantly share code, notes, and snippets.

@HarlemSquirrel
Last active May 9, 2024 17:47
Show Gist options
  • Save HarlemSquirrel/c0624347dc5198079b71306a0f32a401 to your computer and use it in GitHub Desktop.
Save HarlemSquirrel/c0624347dc5198079b71306a0f32a401 to your computer and use it in GitHub Desktop.
Dynamic Karafka Subscription
# karafka.rb
class KarafkaApp < Karafka::App
setup do |config|
# setup
end
# Unique consumer group for our dynamic consumers
DYNAMIC_CONSUMER_GROUP_NAME = "#{ENV['KAFKA_PREFIX']}dyn-group"
# Track the topics we're currently subscribed to
@dynamic_subscribed_topics = []
def register_dynamic_subscribers
# Load current state of configurations from the database
topic_names = DynamicConfig.enabled.pluck(:topic)
return if @dynamic_subscribed_topics == topic_names # already subscribed
new_topics = topic_names - @dynamic_subscribed_topics
consumer_group DYNAMIC_CONSUMER_GROUP_NAME do
# NOTE: Not entirely sure how/if this will work after the initial load
new_topics.each do |topic_name|
topic topic_name do
consumer DynamicConsumer
end
@dynamic_subscribed_topics << topic_name
end
end
disabled_topics = @dynamic_subscribed_topics - topic_names
# TODO: Somehow unsubscribe to disabled_topics
end
routes.draw do
# Run a separate process for our dynamic subscriptions to avoid them bumping heads with our other subscriptions
if ENV["KARAFKA_DYNAMIC_SUBSCRIBER"] == "true"
# This is a dynamic subscription process
register_dynamic_subscribers
ActiveSupport::Notifications.subscribe(DYNAMIC_CONFIG_CHANGED_NOTIFICATION_KEY) do
register_dynamic_subscribers
end
else
# This is not a dynamic subscription process so subcribe normally
topic :example do
consumer ExampleConsumer
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment