Skip to content

Instantly share code, notes, and snippets.

@lukestadtler
Last active December 2, 2022 18:43
Show Gist options
  • Save lukestadtler/5568c9e06c94fa206744902ecb5725c3 to your computer and use it in GitHub Desktop.
Save lukestadtler/5568c9e06c94fa206744902ecb5725c3 to your computer and use it in GitHub Desktop.
Random thoughts on the TurbineRb implementation.
TurbineRb.configure do |config|
# this pattern allows for setup of other configuration as they becomes available, beyond providing a register method.
config.max_turbines = 20
config.max_blades_per_turbine = 3
# Provide a Turbine-able class to TurbineRb, when needed it can initialize internally with app_class.new.
# Alternatively, does MyApp need to be initialized at all? It seems that call could/should be a class-level method
# that purely operates on app.
# If it needs to be an instance we can simply pass MyApp.new into register instead.
#
# The same may not be true for Passthrough since it's an instance of TurbineRb::Process and idk what instance methods this relies on.
# If a user has a custom implementation they simply register all
config.register(MyApp)
config.register(AnotherApp)
# If the example processing flow in the example MyApp is common (seems plausible) e.g.
# 1. define source
# 2. read records
# 3. register secrets
# 4. process records
# 5. write new records
# Then a TurbineRb user need only define a few things for simple implementations.
# A common simple ELT implementation could be registered like this:
# @param mode [String] the default implementation pattern to use
# @param resource_name [String] the resource name for app.resources(name: resource_name)
# @param read_collection [String] the collection name to use for database.records(collection: read_collection)
# @param write_collection [String|Array<String>] the collection(s) name to use for database.write(records: processed_records, collection: collection_name)
# @param process [TurbineRb::Process] the process to apply to the extraction. (if none passed equivalent to 'You can also decide to forgo Process and do a direct end-to-end stream of records or events.' described here: https://docs.meroxa.com/turbine/develop/ruby#process)
# @param secrets [String|Array<String>] a list of secrets to use for app.register_secrets(secrets)
config.register(mode: 'elt',
resource_name: 'demopg',
read_collection: 'events',
write_collection: 'events_copy',
process: Passthrough, #
secrets: ENV['MY_ENV_TEST']
)
config.register(mode: 'elt',
resource_name: 'demopg',
read_collection: 'events_copy',
write_collection: 'events_processed_another',
process: AnotherProcess,
secrets: %w[abc def]
)
# if writing to multiple destinations is desired in a single flow.
config.register(mode: 'elt',
resource_name: 'demopg',
read_collection: 'events',
write_collection: %w[events_copy events_copy_redundancy],
process: Passthrough,
secrets: ENV['MY_ENV_TEST']
)
# When no MyApp class is provided it should use this default pattern with the configuration params provided to register.
# def call(app)
# database = app.resource(name: 'demopg')
# ELT pipeline example
# records = database.records(collection: 'events')
# database.write(records: records, collection: 'events_copy')
# records = database.records(collection: 'events',configs:{"incrementing.column.name" => "id"})
# # This register the secret to be available in the turbine application
# app.register_secrets("MY_ENV_TEST")
# # you can also register several secrets at once
# # app.register_secrets(["MY_ENV_TEST", "MY_OTHER_ENV_TEST"])
# processed_records = app.process(records: records, process: Passthrough.new) # Passthrough just has to match the signature
# database.write(records: processed_records, collection: "events_copy")
# end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment