Last active
September 6, 2016 03:32
-
-
Save numberoverzero/8bde1089b5def6cc8c6d5fba61866702 to your computer and use it in GitHub Desktop.
Inspect the 'traversing a gap' vs 'caught up to head' behavior of GetRecords on open shards
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
echo "00_runner -- Full test run for GetRecords." | |
python 01_cleanup.py | |
python 02_generate.py | |
python 03_diagnose.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
print("01_cleanup -- Deleting existing tables.\n") | |
import boto3 | |
import time | |
dynamo = boto3.client("dynamodb") | |
for table in dynamo.list_tables()["TableNames"]: | |
dynamo.delete_table(TableName=table) | |
# DeleteTable is async; give it some time to complete. | |
time.sleep(60) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
print("02_generate -- Creating different update patterns for 6 streams.\n") | |
import time | |
from bloop import BaseModel, Engine, Column, Integer | |
next_id = 0 | |
def push(cls): | |
global next_id, remaining | |
engine.save(cls(id=next_id, remaining=remaining)) | |
next_id += 1 | |
def pause(duration): | |
global remaining | |
time.sleep(duration) | |
remaining -= duration | |
class StreamOneStart(BaseModel): | |
class Meta: | |
stream = { | |
"include": ["new", "old"] | |
} | |
id = Column(Integer, hash_key=True) | |
remaining = Column(Integer) | |
class StreamOneMiddle(BaseModel): | |
class Meta: | |
stream = { | |
"include": ["new", "old"] | |
} | |
id = Column(Integer, hash_key=True, name="i") | |
remaining = Column(Integer) | |
class StreamOneEnd(BaseModel): | |
class Meta: | |
stream = { | |
"include": ["new", "old"] | |
} | |
id = Column(Integer, hash_key=True, name="i") | |
remaining = Column(Integer) | |
class StreamPeriodHour(BaseModel): | |
class Meta: | |
stream = { | |
"include": ["new", "old"] | |
} | |
id = Column(Integer, hash_key=True, name="i") | |
remaining = Column(Integer) | |
class StreamPeriodMinute(BaseModel): | |
class Meta: | |
stream = { | |
"include": ["new", "old"] | |
} | |
id = Column(Integer, hash_key=True, name="i") | |
remaining = Column(Integer) | |
class StreamPeriodSecond(BaseModel): | |
class Meta: | |
write_units = 2 | |
stream = { | |
"include": ["new", "old"] | |
} | |
id = Column(Integer, hash_key=True, name="i") | |
remaining = Column(Integer) | |
# ============================================================================ | |
stream_creation_buffer = 5 # 5 seconds to create the table streams | |
total = 4 * 60 * 60 # 4 hours is a single shard rotation | |
remaining = total | |
# Give streams a bit to finish creating. Not sure if writes between table | |
# creation and stream creation are buffered anywhere, but I'd suspect not. | |
engine = Engine() | |
engine.bind(BaseModel) | |
time.sleep(stream_creation_buffer) | |
while remaining > 0: | |
# Once at start/middle/end | |
if remaining == total: | |
push(StreamOneStart) | |
if total / remaining == 2: | |
push(StreamOneMiddle) | |
if remaining == 120: | |
# Not exactly the last possible second, but let's be safe: | |
# clock skew, creation buffer... 2 minutes is basically the | |
# same as 0 seconds for a single data point in a 4 hour window. | |
push(StreamOneEnd) | |
# Every hour/minute/second | |
if remaining % 3600 == 0: | |
print("\n+", end="", flush=True) | |
push(StreamPeriodHour) | |
if remaining % 60 == 0: | |
print(".", end="", flush=True) | |
push(StreamPeriodMinute) | |
if True: | |
push(StreamPeriodSecond) | |
pause(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
print("03_diagnose -- Summary view of iterating each stream.\n") | |
import boto3 | |
import collections | |
Description = collections.namedtuple("Description", ["type", "count"]) | |
dynamo = boto3.client("dynamodb") | |
streams = boto3.client("dynamodbstreams") | |
tables = dynamo.list_tables()["TableNames"] | |
arns = {table: dynamo.describe_table(TableName=table)["Table"]["LatestStreamArn"] for table in tables} | |
shards = {table: streams.describe_stream(StreamArn=arns[table])["StreamDescription"]["Shards"] for table in tables} | |
is_empty = lambda description: description.type == "empty" | |
def describe_shard(stream, shard): | |
summary, max_calls = [], 25 | |
iterator = streams.get_shard_iterator( | |
StreamArn=stream, | |
ShardId=shard, | |
ShardIteratorType="TRIM_HORIZON")["ShardIterator"] | |
while iterator is not None and max_calls > 0: | |
max_calls -= 1 | |
response = streams.get_records(ShardIterator=iterator) | |
records = response.get("Records", []) | |
iterator = response.get("NextShardIterator", None) | |
if records: | |
# Non-empty records page | |
summary.append(Description(type="records", count=len(records))) | |
max_calls = 25 | |
elif summary and is_empty(summary[-1]): | |
# Empty page and the previous was empty, too. Increment empty count | |
summary[-1] = Description(type="empty", count=summary[-1].count + 1) | |
else: | |
# Empty page and previous wasn't empty, add new empty description | |
summary.append(Description(type="empty", count=1)) | |
return summary | |
def format(summary): | |
return ", ".join( | |
"({})".format(d.count) if is_empty(d) else str(d.count) | |
for d in summary | |
) | |
# WARN Assuming that shards are returned in same order as lineage, since | |
# write throughput is low and each parent has exactly one child. | |
for table in tables: | |
arn = arns[table] | |
table_shards = shards[table] | |
print(table) | |
print("="*len(table)) | |
for shard in table_shards: | |
shard_id = shard["ShardId"] | |
summary = describe_shard(arn, shard_id) | |
is_closed = "EndingSequenceNumber" in shard["SequenceNumberRange"] | |
mode = ("closed" if is_closed else "open ").upper() | |
print("{} :: {} :: {}".format(shard_id, mode, format(summary))) | |
print() |
Author
numberoverzero
commented
Sep 6, 2016
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment