Skip to content

Instantly share code, notes, and snippets.

@numberoverzero
Last active September 6, 2016 03:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save numberoverzero/8bde1089b5def6cc8c6d5fba61866702 to your computer and use it in GitHub Desktop.
Save numberoverzero/8bde1089b5def6cc8c6d5fba61866702 to your computer and use it in GitHub Desktop.
Inspect the 'traversing a gap' vs 'caught up to head' behavior of GetRecords on open shards
#!/usr/bin/env bash
echo "00_runner -- Full test run for GetRecords."
python 01_cleanup.py
python 02_generate.py
python 03_diagnose.py
print("01_cleanup -- Deleting existing tables.\n")
import boto3
import time
dynamo = boto3.client("dynamodb")
for table in dynamo.list_tables()["TableNames"]:
dynamo.delete_table(TableName=table)
# DeleteTable is async; give it some time to complete.
time.sleep(60)
print("02_generate -- Creating different update patterns for 6 streams.\n")
import time
from bloop import BaseModel, Engine, Column, Integer
next_id = 0
def push(cls):
global next_id, remaining
engine.save(cls(id=next_id, remaining=remaining))
next_id += 1
def pause(duration):
global remaining
time.sleep(duration)
remaining -= duration
class StreamOneStart(BaseModel):
class Meta:
stream = {
"include": ["new", "old"]
}
id = Column(Integer, hash_key=True)
remaining = Column(Integer)
class StreamOneMiddle(BaseModel):
class Meta:
stream = {
"include": ["new", "old"]
}
id = Column(Integer, hash_key=True, name="i")
remaining = Column(Integer)
class StreamOneEnd(BaseModel):
class Meta:
stream = {
"include": ["new", "old"]
}
id = Column(Integer, hash_key=True, name="i")
remaining = Column(Integer)
class StreamPeriodHour(BaseModel):
class Meta:
stream = {
"include": ["new", "old"]
}
id = Column(Integer, hash_key=True, name="i")
remaining = Column(Integer)
class StreamPeriodMinute(BaseModel):
class Meta:
stream = {
"include": ["new", "old"]
}
id = Column(Integer, hash_key=True, name="i")
remaining = Column(Integer)
class StreamPeriodSecond(BaseModel):
class Meta:
write_units = 2
stream = {
"include": ["new", "old"]
}
id = Column(Integer, hash_key=True, name="i")
remaining = Column(Integer)
# ============================================================================
stream_creation_buffer = 5 # 5 seconds to create the table streams
total = 4 * 60 * 60 # 4 hours is a single shard rotation
remaining = total
# Give streams a bit to finish creating. Not sure if writes between table
# creation and stream creation are buffered anywhere, but I'd suspect not.
engine = Engine()
engine.bind(BaseModel)
time.sleep(stream_creation_buffer)
while remaining > 0:
# Once at start/middle/end
if remaining == total:
push(StreamOneStart)
if total / remaining == 2:
push(StreamOneMiddle)
if remaining == 120:
# Not exactly the last possible second, but let's be safe:
# clock skew, creation buffer... 2 minutes is basically the
# same as 0 seconds for a single data point in a 4 hour window.
push(StreamOneEnd)
# Every hour/minute/second
if remaining % 3600 == 0:
print("\n+", end="", flush=True)
push(StreamPeriodHour)
if remaining % 60 == 0:
print(".", end="", flush=True)
push(StreamPeriodMinute)
if True:
push(StreamPeriodSecond)
pause(1)
print("03_diagnose -- Summary view of iterating each stream.\n")
import boto3
import collections
Description = collections.namedtuple("Description", ["type", "count"])
dynamo = boto3.client("dynamodb")
streams = boto3.client("dynamodbstreams")
tables = dynamo.list_tables()["TableNames"]
arns = {table: dynamo.describe_table(TableName=table)["Table"]["LatestStreamArn"] for table in tables}
shards = {table: streams.describe_stream(StreamArn=arns[table])["StreamDescription"]["Shards"] for table in tables}
is_empty = lambda description: description.type == "empty"
def describe_shard(stream, shard):
summary, max_calls = [], 25
iterator = streams.get_shard_iterator(
StreamArn=stream,
ShardId=shard,
ShardIteratorType="TRIM_HORIZON")["ShardIterator"]
while iterator is not None and max_calls > 0:
max_calls -= 1
response = streams.get_records(ShardIterator=iterator)
records = response.get("Records", [])
iterator = response.get("NextShardIterator", None)
if records:
# Non-empty records page
summary.append(Description(type="records", count=len(records)))
max_calls = 25
elif summary and is_empty(summary[-1]):
# Empty page and the previous was empty, too. Increment empty count
summary[-1] = Description(type="empty", count=summary[-1].count + 1)
else:
# Empty page and previous wasn't empty, add new empty description
summary.append(Description(type="empty", count=1))
return summary
def format(summary):
return ", ".join(
"({})".format(d.count) if is_empty(d) else str(d.count)
for d in summary
)
# WARN Assuming that shards are returned in same order as lineage, since
# write throughput is low and each parent has exactly one child.
for table in tables:
arn = arns[table]
table_shards = shards[table]
print(table)
print("="*len(table))
for shard in table_shards:
shard_id = shard["ShardId"]
summary = describe_shard(arn, shard_id)
is_closed = "EndingSequenceNumber" in shard["SequenceNumberRange"]
mode = ("closed" if is_closed else "open ").upper()
print("{} :: {} :: {}".format(shard_id, mode, format(summary)))
print()
@numberoverzero
Copy link
Author

[2016-09-05 13:19:23] ~/get_records_tests $ ./00_runner.sh 
00_runner -- Full test run for GetRecords.
01_cleanup -- Deleting existing tables.
02_generate -- Creating different update patterns for 6 streams.
+............................................................
+............................................................
+............................................................
+............................................................
03_diagnose -- Summary view of iterating each stream.

StreamOneEnd
============
shardId-00000001473106832774-2426678f :: OPEN   :: (3), 1, (25)

StreamOneMiddle
===============
shardId-00000001473106832810-12ca3004 :: CLOSED :: (2), 1, (1)
shardId-00000001473120731990-bf68ed0a :: OPEN   :: (25)

StreamOneStart
==============
shardId-00000001473106832804-8e51611e :: CLOSED :: 1, (3)
shardId-00000001473121135344-bf29c1aa :: OPEN   :: (25)

StreamPeriodHour
================
shardId-00000001473106832859-404a1114 :: CLOSED :: 2, 1, 1, (1)
shardId-00000001473121446918-553fb15f :: OPEN   :: (25)

StreamPeriodMinute
==================
shardId-00000001473106832829-8c2934c7 :: OPEN   :: 58, 57, 58, 59, 8, (25)

StreamPeriodSecond
==================
shardId-00000001473106834149-cac7a2e4 :: OPEN   :: 787, 792, 799, 798, 774, 802, 787, 790, 788, 774, 788, 781, 786, 800, 781, 778, 794, 798, 203, (25)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment