Skip to content

Instantly share code, notes, and snippets.

@bcavagnolo
Created October 26, 2017 17:59
Show Gist options
  • Save bcavagnolo/8729683ffb85ad8dd86e5749b531624e to your computer and use it in GitHub Desktop.
Save bcavagnolo/8729683ffb85ad8dd86e5749b531624e to your computer and use it in GitHub Desktop.
A better kinesis consumer example in python?
# Most of the kinesis examples out there do not seem to elucidate the
# opportunities to parallelize processing of kinesis streams, nor the
# interactions of the service limits. You may want to start your
# journey by familiarizing yourself with the concepts (e.g., what is a
# shard?) and the service limits:
# http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
# The idea is to spawn a process per shard in order to maximize
# parallelization while respecting the service limits. If you run
# multiple instances of this script (or equivalent) you will exhaust
# the service limits. And so in this scenario you may have to futz
# with the constants below. Why would you do this? Maybe because you
# have diverse and unrelated processing steps that you want to run on
# the data. Or maybe you want to improve availability by processing
# the stream in multiple AZs.
# If you need to increase your read bandwith, you must split your
# stream into additional shards. As written, this script must be
# restarted in this scenario.
# Notably, the "KCL" does offer a high-availability story for
# python. But it involves dynamodb and some sort of
# java-wrapped-in-python thing that smelled like a terrible amount of
# service sprawl.
# Some credit for this code goes to
# https://www.parse.ly/help/rawdata/code/#python-code-for-kinesis-with-boto3
# ...but that code fails to actually run.
import json
import time
import boto3
from multiprocessing import Process
from botocore.exceptions import ClientError
STREAM = "my-test-stream"
REGION = 'us-gov-west-1'
# Each shard can support up to 5 transactions per second for reads, up
# to a maximum total data read rate of 2 MB per second.
MAX_REQUESTS_PER_SECOND = 5
# Shards are also limited to 2MB per second.
MAX_BYTES_PER_SECOND = 2 * 1024 ** 2 # 2MB
# This is a property of your data.
MAX_RECORD_SIZE = 200
# There does not appear to be any sort of blocking read support for
# kinesis streams, and no automatic way to respect the read
# bandwidth. So we must explicitly sleep to achieve these
# things. Seems like a bit of an antipattern in the context of
# streams. Shrug.
SLEEP_PERIOD = 1.0/MAX_REQUESTS_PER_SECOND
MAX_RECORDS_PER_SECOND = MAX_BYTES_PER_SECOND / MAX_RECORD_SIZE
MAX_RECORDS_PER_REQUEST = MAX_RECORDS_PER_SECOND / MAX_REQUESTS_PER_SECOND
print 'Planning to read {} records every {} seconds'\
.format(MAX_REQUESTS_PER_SECOND, SLEEP_PERIOD)
kinesis = boto3.client('kinesis', region_name=REGION)
def get_kinesis_shards(stream):
"""Return list of shard iterators, one for each shard of stream."""
descriptor = kinesis.describe_stream(StreamName=stream)
shards = descriptor['StreamDescription']['Shards']
shard_ids = [shard[u"ShardId"] for shard in shards]
shard_iters = [kinesis.get_shard_iterator(
StreamName=stream,
ShardId=shard_id,
ShardIteratorType="LATEST")
for shard_id in shard_ids]
return shard_iters
def process_shard(shard_number, shard):
shard_iterator = shard[u"ShardIterator"]
while True:
try:
response = kinesis.get_records(ShardIterator=shard_iterator,
Limit=MAX_RECORDS_PER_REQUEST)
except ClientError as e:
code = e.response['Error']['Code']
if code != 'ProvisionedThroughputExceededException':
raise
print 'Throughput exceeded!'
time.sleep(SLEEP_PERIOD)
continue
start = time.time()
for record in response[u"Records"]:
datum = json.loads(record[u"Data"])
# here's where you do your processing
print(shard_number, json.dumps(datum))
shard_iterator = response['NextShardIterator']
delta = time.time() - start
time.sleep(SLEEP_PERIOD - delta)
shard_iters = get_kinesis_shards(STREAM)
while True:
processes = []
for i, shard in enumerate(shard_iters):
p = Process(target=process_shard, args=(i, shard))
p.start()
processes.append(p)
for p in processes:
p.join()
@rex
Copy link

rex commented Jul 15, 2019

Just wanted to let you know that this just saved me and my team literal hours of work. We had been struggling to find an "easy" way to read from a kinesis stream so we could test a new integration and the process of repeatedly getting the next shard iterator and running get-records was difficult and tedious. This program made it not just possible, but easy. All the changes required were to STREAM and REGION as well as a new line to select a profile (right above kinesis = boto3.client()):

boto3.setup_default_session(profile_name="preprod")

THANK YOU!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment