Skip to content

Instantly share code, notes, and snippets.

@reizist
Created July 29, 2019 07:17
Show Gist options
  • Save reizist/5727f7213b3db8a1e71b59fce9055344 to your computer and use it in GitHub Desktop.
Save reizist/5727f7213b3db8a1e71b59fce9055344 to your computer and use it in GitHub Desktop.
require 'aws-sdk-kinesis'
require 'pry-byebug'
class GetRecords
def initialize
@client = Aws::Kinesis::Client.new
end
def stream_name
"performance-processor-source-staging-a"
end
def shard_ids
res = @client.list_shards(stream_name: stream_name)
res.shards.map(&:shard_id)
end
def shard_iterators
shard_ids.map do |shard_id|
res = @client.get_shard_iterator(
stream_name: stream_name,
shard_id: shard_id,
shard_iterator_type: "LATEST")
res.shard_iterator
end
end
def get_records
debug if ENV["DEBUG"]
threads = []
shard_iterators.each do |shard_iterator|
threads << Thread.new(shard_iterator) do |iterator|
loop do
break if iterator == ''
res = @client.get_records(
shard_iterator: shard_iterator
)
puts res unless res[:records].empty?
iterator = res[:next_shard_iterator]
end
end
end
threads.each {|t| t.join}
end
def debug
puts "streams: #{@client.list_streams}"
puts "shard_ids: #{shard_ids}"
end
end
GetRecords.new.get_records
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment