Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import json
import os
import time
import boto3
# Partition keys are only required when you have more than 1 shard defined in the data stream but always required. Kinesis computes the MD5 hash of a partition key to decide what shard to store the record on (if you describe the stream you'll see the hash range as part of the shard description).
#
# Each shard can only accept 1,000 records and/or 1 MB per second. Hence If you attempt to write to a single shard faster than the default rate limit you'll get a ProvisionedThroughputExceededException Using With multiple shards, you can scale this limit: 8 shards gives you 8,000 records and/or 8 MB per second.
#
# It's required that different partition keys be used as If all of your records consist of the same partition key then distribution will not occur as the writing of records will occur on a single shard
#
# if you're writing from multiple processes then it might be sufficient to use the process ID, server's IP address, or hostname. If you're writing from a single process then you can either use information that's in the record (for example, a unique record ID) or generate a random string.
#
# Partition key counts against the total write size and is stored in the stream.
#
# Lastly, if you're using PutRecords which is recommended for large records, individual records in the request may be rejected while others are accepted. This happens because those records went to a shard that has already reached its write limits, Thus it's important you re-send them (after a predefined delay).
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_DEFAULT_REGION = os.getenv('AWS_DEFAULT_REGION')
AWS_APP_STREAM_NAME = os.getenv('AWS_APP_STREAM_NAME')
kinesis_client = boto3.client('kinesis', aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_DEFAULT_REGION)
response = kinesis_client.describe_stream(StreamName=AWS_APP_STREAM_NAME)
# print(json.dumps(response, indent=4, sort_keys=True, default=str))
# shard id used to get the first record in the chain of records
next_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
print("next_shard_id={}".format(next_shard_id))
# AT_SEQUENCE_NUMBER - Start reading from the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.
#
# AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.
#
# AT_TIMESTAMP - Start reading from the position denoted by a specific time stamp, provided in the value Timestamp.
#
# TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.
# This iterator type means that records should be returned beginning with the first record added to the shard-rather than beginning with the most recently added record, also known as the tip
#
# LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
shard_iterator = kinesis_client.get_shard_iterator(StreamName=AWS_APP_STREAM_NAME,
ShardId=next_shard_id,
ShardIteratorType='TRIM_HORIZON')
print("shard_iterator={}".format(shard_iterator))
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=2)
print(record_response)
# since data in form of iterator we need to keep looping until records are detected
# sometimes we may need to iterator over multiple empty records to get records
# If no records are returned, that means no data records are currently available from
# this shard at the sequence number referenced by the shard iterator.
# In this situation, your application should wait for an amount of time that's '
# appropriate for the data sources for the stream, but at least 1 second.
# Then try to get data from the shard again using the shard iterator returned by the preceding call to getRecords. There is about a 3-second latency from the time that a record is added to the stream to the time that it is available from getRecords.
while 'NextShardIterator' in record_response:
print("Running :{}".format(record_response['NextShardIterator']))
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'], Limit=10)
# Check if ShardIterator has a record
if len(record_response["Records"]) > 0:
# Do stuff with record below we just print it and indent json object pay not apply if input data is not a json
print(json.dumps(record_response, indent=4, sort_keys=True, default=str))
# wait for 5 seconds
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.