Skip to content

Instantly share code, notes, and snippets.

@dasch
Last active February 14, 2023 15:35
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dasch/af38c28e62117e19f6ba to your computer and use it in GitHub Desktop.
Save dasch/af38c28e62117e19f6ba to your computer and use it in GitHub Desktop.
class ParseEntries
def process(message)
entry = JSON.parse(message.value)
[entry]
end
end
class CombineEntries
def initialize
@open_transactions = {}
end
def process(entry)
txnid = entry.fetch("txnid")
if entry.fetch("commit")
entries = @open_transactions.delete(txnid)
entries << entry
[entries]
else
@open_transactions[txnid] ||= []
@open_transactions[txnid] << entry
[]
end
end
end
class PublishTransactions
def initialize
kafka = Kafka.new(...)
@producer = kafka.producer
end
def process(transaction_entries)
data = JSON.dump(transaction_entries)
@producer.produce(data, topic: "maxwell-transactions")
@producer.deliver
[]
end
end
class Pipeline
def initialize(topic:, group_id:)
brokers = ENV.fetch("KAFKA_BROKERS").split(",")
kafka = Kafka.new(seed_brokers: brokers)
@consumer = kafka.consumer(group_id: group_id)
@consumer.subscribe(topic)
@stages = []
end
def add_stage(stage)
@stages << stage
end
def run
@consumer.each_message do |message|
@stages.inject([message] do |records, stage|
records.flat_map {|record| stage.process(record) }
end
end
end
end
pipeline = Pipeline.new(topic: "maxwell", group_id: "maxwell-transaction-combiner")
pipeline.add_stage(ParseEntries.new)
pipeline.add_stage(CombineEntries.new)
pipeline.add_stage(PublishTransactions.new)
pipeline.run
@ayqazi
Copy link

ayqazi commented Aug 3, 2021

I believe line 61 should read:

      @stages.inject([message]) do |records, stage|

(closing parenthesis on method call, before the do)

@saiqulhaq-hh
Copy link

correct @ayqazi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment