Skip to content

Instantly share code, notes, and snippets.

@kdgregory
Last active May 16, 2024 10:58
Show Gist options
  • Save kdgregory/85ba011913cba70b8cdc824bb84f5817 to your computer and use it in GitHub Desktop.
Save kdgregory/85ba011913cba70b8cdc824bb84f5817 to your computer and use it in GitHub Desktop.
Demonstrates PutRecords failure when called with stream name for newly created stream
2024-05-15 21:04:24,624 - Found credentials in environment variables.
2024-05-15 21:04:24,667 - creating stream: example-b849a0c6-4cfe-4fc8-bc77-23c73e731fae
2024-05-15 21:04:24,912 - status after create: CREATING
2024-05-15 21:04:25,039 - status after create: CREATING
2024-05-15 21:04:25,214 - status after create: CREATING
2024-05-15 21:04:25,342 - status after create: CREATING
2024-05-15 21:04:25,470 - status after create: CREATING
2024-05-15 21:04:25,606 - status after create: CREATING
2024-05-15 21:04:25,740 - status after create: CREATING
2024-05-15 21:04:25,865 - status after create: CREATING
2024-05-15 21:04:25,993 - status after create: CREATING
2024-05-15 21:04:26,123 - status after create: CREATING
2024-05-15 21:04:26,249 - status after create: CREATING
2024-05-15 21:04:26,377 - status after create: CREATING
2024-05-15 21:04:26,518 - status after create: CREATING
2024-05-15 21:04:26,644 - status after create: CREATING
2024-05-15 21:04:26,770 - status after create: CREATING
2024-05-15 21:04:26,899 - status after create: CREATING
2024-05-15 21:04:27,027 - status after create: CREATING
2024-05-15 21:04:27,154 - status after create: CREATING
2024-05-15 21:04:27,282 - status after create: CREATING
2024-05-15 21:04:27,410 - status after create: CREATING
2024-05-15 21:04:27,540 - status after create: CREATING
2024-05-15 21:04:27,669 - status after create: CREATING
2024-05-15 21:04:27,795 - status after create: CREATING
2024-05-15 21:04:27,922 - status after create: CREATING
2024-05-15 21:04:28,051 - status after create: CREATING
2024-05-15 21:04:28,177 - status after create: CREATING
2024-05-15 21:04:28,306 - status after create: CREATING
2024-05-15 21:04:28,436 - status after create: CREATING
2024-05-15 21:04:28,562 - status after create: CREATING
2024-05-15 21:04:28,691 - status after create: CREATING
2024-05-15 21:04:28,819 - status after create: CREATING
2024-05-15 21:04:28,947 - status after create: CREATING
2024-05-15 21:04:29,090 - status after create: CREATING
2024-05-15 21:04:29,212 - status after create: CREATING
2024-05-15 21:04:29,340 - status after create: CREATING
2024-05-15 21:04:29,468 - status after create: CREATING
2024-05-15 21:04:29,597 - status after create: CREATING
2024-05-15 21:04:29,725 - status after create: CREATING
2024-05-15 21:04:29,852 - status after create: CREATING
2024-05-15 21:04:29,982 - status after create: CREATING
2024-05-15 21:04:30,109 - status after create: CREATING
2024-05-15 21:04:30,238 - status after create: CREATING
2024-05-15 21:04:30,364 - status after create: CREATING
2024-05-15 21:04:30,494 - status after create: ACTIVE
2024-05-15 21:04:30,495 - sending 1 batch of 10 records
2024-05-15 21:04:30,518 - An error occurred (ResourceNotFoundException) when calling the PutRecords operation: Stream example-b849a0c6-4cfe-4fc8-bc77-23c73e731fae under account 123456789012 not found.
2024-05-15 21:04:30,518 - sending 2 batch of 10 records
2024-05-15 21:04:30,663 - failed record count: 0
#!/usr/bin/env python
##
## This program creates a Kinesis stream, waits for it to become active, then writes two
## batches of messages to it. The first fails (as of 2024-05-15) with ResourceNotFoundException,
## indicating that it can't find the stream. The second batch succeeds.
##
import boto3
import logging
import time
import uuid
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
def main(client, stream_name):
create_stream(client, stream_name)
wait_until_active(client, stream_name)
write_batch(client, stream_name, 1, 10)
write_batch(client, stream_name, 2, 10)
def get_stream_status(client, stream_name):
resp = client.describe_stream_summary(StreamName=stream_name)
return resp['StreamDescriptionSummary']['StreamStatus']
def create_stream(client, stream_name):
logging.info(f"creating stream: {stream_name}")
client.create_stream(
StreamName=stream_name,
ShardCount=1,
StreamModeDetails={ 'StreamMode': 'PROVISIONED' }
)
status = get_stream_status(client, stream_name)
logging.info(f"status after create: {status}")
def wait_until_active(client, stream_name):
timeout = time.time() + 60
while time.time() < timeout:
time.sleep(0.1)
status = get_stream_status(client, stream_name)
logging.info(f"waiting until active: {status}")
if status == "ACTIVE":
return
raise Exception("timeout waiting for stream to become active")
def write_batch(client, stream_name, batch_id, num_recs):
logging.info(f"sending {batch_id} batch of {num_recs} records")
records = [{'Data': f"batch {batch_id} message {x}".encode('utf-8'), 'PartitionKey': str(x)} for x in range(num_recs)]
try:
resp = client.put_records( StreamName=stream_name, Records = records )
logging.info(f"failed record count: {resp['FailedRecordCount']}")
except Exception as ex:
logging.error(ex)
if __name__ == "__main__":
client = boto3.client('kinesis')
stream_name = "example-" + str(uuid.uuid4())
main(client, stream_name)
#!/usr/bin/env python
##
## This program creates a Kinesis stream, waits for it to become active, then writes two
## batches of messages to it. Both batches succeed.
##
## The difference between this version and the version that fails is that it passes the
## stream ARN to PutRecords, not the stream name.
##
import boto3
import logging
import time
import uuid
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
def main(client, stream_name):
stream_arn = create_stream(client, stream_name)
wait_until_active(client, stream_name)
write_batch(client, stream_arn, 1, 10)
write_batch(client, stream_arn, 2, 10)
def get_stream_status(client, stream_name):
resp = client.describe_stream_summary(StreamName=stream_name)
status = resp['StreamDescriptionSummary']['StreamStatus']
stream_arn = resp['StreamDescriptionSummary']['StreamARN']
return status, stream_arn
def create_stream(client, stream_name):
logging.info(f"creating stream: {stream_name}")
client.create_stream(
StreamName=stream_name,
ShardCount=1,
StreamModeDetails={ 'StreamMode': 'PROVISIONED' }
)
status, stream_arn = get_stream_status(client, stream_name)
logging.info(f"waiting until active: {status}")
return stream_arn
def wait_until_active(client, stream_name):
timeout = time.time() + 60
while time.time() < timeout:
time.sleep(0.1)
status, _ = get_stream_status(client, stream_name)
logging.info(f"current status: {status}")
if status == "ACTIVE":
return
raise Exception("timeout waiting for stream to become active")
def write_batch(client, stream_arn, batch_id, num_recs):
logging.info(f"sending {batch_id} batch of {num_recs} records")
records = [{'Data': f"batch {batch_id} message {x}".encode('utf-8'), 'PartitionKey': str(x)} for x in range(num_recs)]
try:
resp = client.put_records( StreamARN=stream_arn, Records = records )
logging.info(f"failed record count: {resp['FailedRecordCount']}")
except Exception as ex:
logging.error(ex)
if __name__ == "__main__":
client = boto3.client('kinesis')
stream_name = "example-" + str(uuid.uuid4())
main(client, stream_name)
@kdgregory
Copy link
Author

Inserting a 0.5 second sleep after the call to wait_until_active() also avoids the problem, while a 0.25 second sleep does not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment