Skip to content

Instantly share code, notes, and snippets.

@surajnarwade
Created April 27, 2020 07:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save surajnarwade/80604e965734e0a10982dde2fcc1ff98 to your computer and use it in GitHub Desktop.
Save surajnarwade/80604e965734e0a10982dde2fcc1ff98 to your computer and use it in GitHub Desktop.
AWS kinesis consumer
import boto
from boto.kinesis.exceptions import ResourceInUseException
import time
stream_name = "kinesis-fluent-bit-test"
region = 'eu-west-1'
kinesis = boto.kinesis.connect_to_region(region)
r = kinesis.describe_stream(stream_name)
description = r.get('StreamDescription')
status = description.get('StreamStatus')
my_shard_id = description['Shards'][0]['ShardId']
shard_iterator = kinesis.get_shard_iterator(stream_name,my_shard_id,'LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis.get_records(my_shard_iterator)
while 'NextShardIterator' in record_response:
record_response = kinesis.get_records(record_response['NextShardIterator'], 25)
print(record_response)
# wait for 5 seconds
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment