Skip to content

Instantly share code, notes, and snippets.

@w3irdrobot
Last active February 4, 2022 15:02
Show Gist options
  • Save w3irdrobot/688a2dd1a7e288102178 to your computer and use it in GitHub Desktop.
Save w3irdrobot/688a2dd1a7e288102178 to your computer and use it in GitHub Desktop.
Reading Data From Kinesis
from datetime import datetime, timedelta
import json
import boto
def get_kinesis_data_iterator(stream_name, minutes_running):
# Connect to Kinesis
kinesis = boto.connect_kinesis()
# Get data about Kinesis stream for Tag Monitor
kinesis_stream = kinesis.describe_stream(stream_name)
# Get the shards in that stream
shards = kinesis_stream['StreamDescription']['Shards']
# Collect together the shard IDs
shard_ids = [shard['ShardId'] for shard in shards]
# Get shard iterator
iter_response = kinesis.get_shard_iterator(stream_name, shard_ids[0], "TRIM_HORIZON")
shard_iterator = iter_response['ShardIterator']
# Calculate end time
end_time = datetime.now() + timedelta(minutes=minutes_running)
while True:
try:
# Get data
record_response = kinesis.get_records(shard_iterator)
# Only run for a certain amount of time.
# Stop looping if no data returned. This means it's done
now = datetime.now()
print 'Time: {0}'.format(now.strftime('%Y/%m/%d %H:%M:%S'))
if end_time < now or not record_response:
break
# yield data to outside calling iterator
for record in record_response['Records']:
last_sequence = record['SequenceNumber']
yield json.loads(record['Data'])
# Get next iterator for shard from previous request
shard_iterator = record_response['NextShardIterator']
# Catch exception meaning hitting API too much
except boto.kinesis.exceptions.ProvisionedThroughputExceededException:
print 'ProvisionedThroughputExceededException found. Sleeping for 0.5 seconds...'
time.sleep(0.5)
# Catch exception meaning iterator has expired
except boto.kinesis.exceptions.ExpiredIteratorException:
iter_response = kinesis.get_shard_iterator(stream_name, shard_ids[0], "AFTER_SEQUENCE_NUMBER", last_sequence)
shard_iterator = iter_response['ShardIterator']
kinesis.close()
import kinesis
STREAM_NAME = 'awesome_data_stream'
MINUTES_RUNNING = 60
# Get Kinesis generator
kinesis_data = kinesis.get_kinesis_data_iterator(STREAM_NAME, MINUTES_RUNNING)
# Iterate over records
for data in kinesis_data:
# Do something crazy with your data
pass
@ishangupta3
Copy link

After getting the shards, how would it be possible to extract frame level information?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment