-
-
Save kdgregory/85ba011913cba70b8cdc824bb84f5817 to your computer and use it in GitHub Desktop.
Demonstrates PutRecords failure when called with stream name for newly created stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2024-05-15 21:04:24,624 - Found credentials in environment variables. | |
2024-05-15 21:04:24,667 - creating stream: example-b849a0c6-4cfe-4fc8-bc77-23c73e731fae | |
2024-05-15 21:04:24,912 - status after create: CREATING | |
2024-05-15 21:04:25,039 - status after create: CREATING | |
2024-05-15 21:04:25,214 - status after create: CREATING | |
2024-05-15 21:04:25,342 - status after create: CREATING | |
2024-05-15 21:04:25,470 - status after create: CREATING | |
2024-05-15 21:04:25,606 - status after create: CREATING | |
2024-05-15 21:04:25,740 - status after create: CREATING | |
2024-05-15 21:04:25,865 - status after create: CREATING | |
2024-05-15 21:04:25,993 - status after create: CREATING | |
2024-05-15 21:04:26,123 - status after create: CREATING | |
2024-05-15 21:04:26,249 - status after create: CREATING | |
2024-05-15 21:04:26,377 - status after create: CREATING | |
2024-05-15 21:04:26,518 - status after create: CREATING | |
2024-05-15 21:04:26,644 - status after create: CREATING | |
2024-05-15 21:04:26,770 - status after create: CREATING | |
2024-05-15 21:04:26,899 - status after create: CREATING | |
2024-05-15 21:04:27,027 - status after create: CREATING | |
2024-05-15 21:04:27,154 - status after create: CREATING | |
2024-05-15 21:04:27,282 - status after create: CREATING | |
2024-05-15 21:04:27,410 - status after create: CREATING | |
2024-05-15 21:04:27,540 - status after create: CREATING | |
2024-05-15 21:04:27,669 - status after create: CREATING | |
2024-05-15 21:04:27,795 - status after create: CREATING | |
2024-05-15 21:04:27,922 - status after create: CREATING | |
2024-05-15 21:04:28,051 - status after create: CREATING | |
2024-05-15 21:04:28,177 - status after create: CREATING | |
2024-05-15 21:04:28,306 - status after create: CREATING | |
2024-05-15 21:04:28,436 - status after create: CREATING | |
2024-05-15 21:04:28,562 - status after create: CREATING | |
2024-05-15 21:04:28,691 - status after create: CREATING | |
2024-05-15 21:04:28,819 - status after create: CREATING | |
2024-05-15 21:04:28,947 - status after create: CREATING | |
2024-05-15 21:04:29,090 - status after create: CREATING | |
2024-05-15 21:04:29,212 - status after create: CREATING | |
2024-05-15 21:04:29,340 - status after create: CREATING | |
2024-05-15 21:04:29,468 - status after create: CREATING | |
2024-05-15 21:04:29,597 - status after create: CREATING | |
2024-05-15 21:04:29,725 - status after create: CREATING | |
2024-05-15 21:04:29,852 - status after create: CREATING | |
2024-05-15 21:04:29,982 - status after create: CREATING | |
2024-05-15 21:04:30,109 - status after create: CREATING | |
2024-05-15 21:04:30,238 - status after create: CREATING | |
2024-05-15 21:04:30,364 - status after create: CREATING | |
2024-05-15 21:04:30,494 - status after create: ACTIVE | |
2024-05-15 21:04:30,495 - sending 1 batch of 10 records | |
2024-05-15 21:04:30,518 - An error occurred (ResourceNotFoundException) when calling the PutRecords operation: Stream example-b849a0c6-4cfe-4fc8-bc77-23c73e731fae under account 123456789012 not found. | |
2024-05-15 21:04:30,518 - sending 2 batch of 10 records | |
2024-05-15 21:04:30,663 - failed record count: 0 | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
## | |
## This program creates a Kinesis stream, waits for it to become active, then writes two | |
## batches of messages to it. The first fails (as of 2024-05-15) with ResourceNotFoundException, | |
## indicating that it can't find the stream. The second batch succeeds. | |
## | |
import boto3 | |
import logging | |
import time | |
import uuid | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s") | |
def main(client, stream_name): | |
create_stream(client, stream_name) | |
wait_until_active(client, stream_name) | |
write_batch(client, stream_name, 1, 10) | |
write_batch(client, stream_name, 2, 10) | |
def get_stream_status(client, stream_name): | |
resp = client.describe_stream_summary(StreamName=stream_name) | |
return resp['StreamDescriptionSummary']['StreamStatus'] | |
def create_stream(client, stream_name): | |
logging.info(f"creating stream: {stream_name}") | |
client.create_stream( | |
StreamName=stream_name, | |
ShardCount=1, | |
StreamModeDetails={ 'StreamMode': 'PROVISIONED' } | |
) | |
status = get_stream_status(client, stream_name) | |
logging.info(f"status after create: {status}") | |
def wait_until_active(client, stream_name): | |
timeout = time.time() + 60 | |
while time.time() < timeout: | |
time.sleep(0.1) | |
status = get_stream_status(client, stream_name) | |
logging.info(f"waiting until active: {status}") | |
if status == "ACTIVE": | |
return | |
raise Exception("timeout waiting for stream to become active") | |
def write_batch(client, stream_name, batch_id, num_recs): | |
logging.info(f"sending {batch_id} batch of {num_recs} records") | |
records = [{'Data': f"batch {batch_id} message {x}".encode('utf-8'), 'PartitionKey': str(x)} for x in range(num_recs)] | |
try: | |
resp = client.put_records( StreamName=stream_name, Records = records ) | |
logging.info(f"failed record count: {resp['FailedRecordCount']}") | |
except Exception as ex: | |
logging.error(ex) | |
if __name__ == "__main__": | |
client = boto3.client('kinesis') | |
stream_name = "example-" + str(uuid.uuid4()) | |
main(client, stream_name) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
## | |
## This program creates a Kinesis stream, waits for it to become active, then writes two | |
## batches of messages to it. Both batches succeed. | |
## | |
## The difference between this version and the version that fails is that it passes the | |
## stream ARN to PutRecords, not the stream name. | |
## | |
import boto3 | |
import logging | |
import time | |
import uuid | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s") | |
def main(client, stream_name): | |
stream_arn = create_stream(client, stream_name) | |
wait_until_active(client, stream_name) | |
write_batch(client, stream_arn, 1, 10) | |
write_batch(client, stream_arn, 2, 10) | |
def get_stream_status(client, stream_name): | |
resp = client.describe_stream_summary(StreamName=stream_name) | |
status = resp['StreamDescriptionSummary']['StreamStatus'] | |
stream_arn = resp['StreamDescriptionSummary']['StreamARN'] | |
return status, stream_arn | |
def create_stream(client, stream_name): | |
logging.info(f"creating stream: {stream_name}") | |
client.create_stream( | |
StreamName=stream_name, | |
ShardCount=1, | |
StreamModeDetails={ 'StreamMode': 'PROVISIONED' } | |
) | |
status, stream_arn = get_stream_status(client, stream_name) | |
logging.info(f"waiting until active: {status}") | |
return stream_arn | |
def wait_until_active(client, stream_name): | |
timeout = time.time() + 60 | |
while time.time() < timeout: | |
time.sleep(0.1) | |
status, _ = get_stream_status(client, stream_name) | |
logging.info(f"current status: {status}") | |
if status == "ACTIVE": | |
return | |
raise Exception("timeout waiting for stream to become active") | |
def write_batch(client, stream_arn, batch_id, num_recs): | |
logging.info(f"sending {batch_id} batch of {num_recs} records") | |
records = [{'Data': f"batch {batch_id} message {x}".encode('utf-8'), 'PartitionKey': str(x)} for x in range(num_recs)] | |
try: | |
resp = client.put_records( StreamARN=stream_arn, Records = records ) | |
logging.info(f"failed record count: {resp['FailedRecordCount']}") | |
except Exception as ex: | |
logging.error(ex) | |
if __name__ == "__main__": | |
client = boto3.client('kinesis') | |
stream_name = "example-" + str(uuid.uuid4()) | |
main(client, stream_name) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Inserting a 0.5 second sleep after the call to
wait_until_active()
also avoids the problem, while a 0.25 second sleep does not.