Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env ruby
require 'aws-sdk'
require 'base64'
AWS.config(region: 'us-east-1')
client = AWS.kinesis.client
STREAM_NAME = 'hello'
SHARD_ID = 'shardId-000000000000'
# shard_idの取得
#res = client.describe_stream(stream_name: STREAM_NAME }
#p res[:stream_description][:shards].map {|i| i[:shard_id] }
# Consumer
th = Thread.start do
shard_iterator = client.get_shard_iterator(
stream_name: STREAM_NAME,
shard_id: SHARD_ID,
shard_iterator_type: 'LATEST'
)[:shard_iterator]
loop do
res = client.get_records(shard_iterator: shard_iterator)
records = res[:records]
shard_iterator = res[:next_shard_iterator]
datas = records.map {|i| Base64.decode64(i[:data])}
puts "[Consumer] get_records: #{datas.inspect}"
sleep 0.3
end
end
th.join
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment