Skip to content

Instantly share code, notes, and snippets.

@sdia-zz
Last active April 27, 2018 22:01
Show Gist options
  • Save sdia-zz/5e33374bd125934760efb422707dfca8 to your computer and use it in GitHub Desktop.
Save sdia-zz/5e33374bd125934760efb422707dfca8 to your computer and use it in GitHub Desktop.
import boto3
KINESIS = boto3.client('kinesis')
STREAM_NAME = 'my-kinesis-stream'
SHARD_ID = 'my-shard-id'
def get_sequence_number():
# returns max sequence number if found, None if not
return '0123456789'
def put_sequence_number():
# persist sequence number in database
return 'Ok.'
def gen_records():
sequence_number = get_sequence_number()
if sequence_number is not None: # get only newer than sequence number
shard = KINESIS.get_shard_iterator(StreamName=STREAM_NAME, ShardId=SHARD_ID, ShardIteratorType='AFTER_SEQUENCE_NUMBER', StartingSequenceNumber=sequence_number)
else: # get earliest available
shard = KINESIS.get_shard_iterator(StreamName=STREAM_NAME, ShardId=SHARD_ID, ShardIteratorType='TRIM_HORIZON')
next_shard_iterator = shard ['ShardIterator']
while True:
shard_iterator = next_shard_iterator
records_resp = KINESIS.get_records(ShardIterator=shard_iterator)
for record in records_resp['Records']:
yield record[u'Data'])
next_shard_iterator = records_resp.get('NextShardIterator', None)
if next_shard_iterator is None: # no more shards
break
put_sequence_number(record[u'SequenceNumber'])
for rec in gen_records():
# do stuff...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment