Skip to content

Instantly share code, notes, and snippets.

@I159
Created June 13, 2019 09:27
Show Gist options
  • Save I159/bef3748239ad0bee4bc6154135f59b05 to your computer and use it in GitHub Desktop.
Save I159/bef3748239ad0bee4bc6154135f59b05 to your computer and use it in GitHub Desktop.
def _get_shard_iterator(self, shard, stream_arn):
return self.client.get_shard_iterator(
StreamArn=stream_arn,
ShardId=shard["ShardId"],
ShardIteratorType="TRIM_HORIZON",
)
def _process_signals(self, shards, remaining_items, termination_moment):
for shard_iterator in shards:
while remaining_items and dt.datetime.utcnow() <= termination_moment:
records_response = self.client.get_records(ShardIterator=shard_iterator)
if records_response["Records"]:
records = records_response["Records"]
for j in records:
key = dynamo_formatter.loads(j["dynamodb"]["Keys"])
key = key["Id"]
if key in remaining_items:
remaining_items.remove(key)
shard_iterator = records_response.get("NextShardIterator")
if not shard_iterator:
break
time.sleep(1)
return remaining_items
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment