human (Dec 18, 2024, 08:52 PM)
I can't access the file at https://s3.amazonaws.com/test-public-bucket-simonw/4f798baccbf14570901b89bb61ab2f2b.png
Write me a Python CLI tool using Click and boto3 which takes a URL of that form - or of the form https://test-public-bucket-simonw.s3.us-east-1.amazonaws.com/4f798baccbf14570901b89bb61ab2f2b.png - and then uses EVERY single boto3 trick in the book to try and debug why the file is returning a 404
assistant (Dec 18, 2024, 08:53 PM)
I'll help you create a comprehensive debugging tool for S3 access issues. Here's a script that employs various boto3 techniques to investigate S3 access problems:
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "click",
# "boto3",
# "urllib3",
# "rich",
# ]
# ///
import click
import boto3
import re
from urllib.parse import urlparse
from rich.console import Console
from rich.table import Table
from botocore.exceptions import ClientError
import json
def parse_s3_url(url):
"""Extract bucket and key from either URL format."""
if 's3.amazonaws.com' in url:
# Format: https://s3.amazonaws.com/bucket-name/key
path_parts = urlparse(url).path.lstrip('/').split('/', 1)
return path_parts[0], path_parts[1]
else:
# Format: https://bucket-name.s3.region.amazonaws.com/key
bucket = urlparse(url).netloc.split('.')[0]
key = urlparse(url).path.lstrip('/')
return bucket, key
def check_bucket_exists(s3_client, bucket):
"""Verify if bucket exists and check its region."""
try:
response = s3_client.head_bucket(Bucket=bucket)
region = s3_client.get_bucket_location(Bucket=bucket)['LocationConstraint']
return True, region
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
if error_code == '404':
return False, "Bucket does not exist"
elif error_code == '403':
return False, "Access denied to bucket"
return False, f"Error: {error_code}"
def check_object_metadata(s3_client, bucket, key):
"""Try to get object metadata and versions."""
results = []
try:
# Check if versioning is enabled
versioning = s3_client.get_bucket_versioning(Bucket=bucket)
results.append(("Bucket Versioning", versioning.get('Status', 'Not enabled')))
# Try to get object metadata
response = s3_client.head_object(Bucket=bucket, Key=key)
results.append(("Object exists", "Yes"))
results.append(("Content Type", response.get('ContentType', 'Unknown')))
results.append(("Size", f"{response.get('ContentLength', 0)} bytes"))
results.append(("Last Modified", str(response.get('LastModified', 'Unknown'))))
# Check for object versions
if versioning.get('Status') == 'Enabled':
versions = s3_client.list_object_versions(
Bucket=bucket,
Prefix=key
)
for version in versions.get('Versions', []):
results.append((
f"Version {version['VersionId']}",
f"Last Modified: {version['LastModified']}"
))
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
results.append(("Error", f"{error_code}: {e.response['Error']['Message']}"))
# Check if object was deleted recently
try:
deleted = s3_client.list_object_versions(
Bucket=bucket,
Prefix=key
).get('DeleteMarkers', [])
if deleted:
results.append(("Delete Markers Found", "Yes"))
for marker in deleted:
results.append((
"Deleted on",
f"{marker['LastModified']} (Version: {marker['VersionId']})"
))
except ClientError:
pass
return results
def check_bucket_policy(s3_client, bucket):
"""Check bucket policy and ACLs."""
try:
policy = s3_client.get_bucket_policy(Bucket=bucket)
return json.loads(policy['Policy'])
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
return "No bucket policy found"
return f"Error getting bucket policy: {e.response['Error']['Message']}"
@click.command()
@click.argument('url')
@click.option('--region', help='AWS region override')
@click.option('--profile', help='AWS profile name')
def debug_s3_access(url, region, profile):
"""Debug S3 access issues for a given URL."""
console = Console()
bucket, key = parse_s3_url(url)
console.print(f"\n[bold]Analyzing S3 access for:[/bold]")
console.print(f"Bucket: {bucket}")
console.print(f"Key: {key}\n")
# Initialize boto3 session
session = boto3.Session(profile_name=profile)
s3_client = session.client('s3', region_name=region)
# Check bucket existence and region
bucket_exists, bucket_region = check_bucket_exists(s3_client, bucket)
if not bucket_exists:
console.print(f"[red]Bucket check failed: {bucket_region}[/red]")
return
if bucket_region and bucket_region != region:
console.print(f"[yellow]Note: Bucket is in {bucket_region}, retrying with correct region[/yellow]\n")
s3_client = session.client('s3', region_name=bucket_region)
# Create results table
table = Table(title="S3 Access Analysis Results")
table.add_column("Check", style="cyan")
table.add_column("Result", style="green")
# Add bucket info
table.add_row("Bucket Exists", "Yes")
table.add_row("Bucket Region", bucket_region or "default")
# Check bucket policy
policy = check_bucket_policy(s3_client, bucket)
table.add_row("Bucket Policy", str(policy))
# Check object metadata and versions
for check, result in check_object_metadata(s3_client, bucket, key):
table.add_row(check, str(result))
console.print(table)
# Additional checks for public access
try:
public_access = s3_client.get_public_access_block(Bucket=bucket)
console.print("\n[bold]Public Access Settings:[/bold]")
for setting, value in public_access['PublicAccessBlockConfiguration'].items():
console.print(f"{setting}: {value}")
except ClientError:
console.print("\n[yellow]Could not retrieve public access block settings[/yellow]")
if __name__ == '__main__':
debug_s3_access()
This script provides comprehensive S3 debugging capabilities:
- Supports both S3 URL formats (s3.amazonaws.com/bucket/key and bucket.s3.region.amazonaws.com/key)
- Checks bucket existence and region
- Automatically handles region redirects
- Examines bucket policies and ACLs
- Checks object metadata including size and last modified date
- Looks for versioning and checks version history
- Checks for delete markers in case the object was recently deleted
- Examines public access block settings
- Supports AWS profiles for different credentials
- Uses rich for pretty console output
You can use it like this:
python s3debug.py https://s3.amazonaws.com/test-public-bucket-simonw/4f798baccbf14570901b89bb61ab2f2b.png
# Or with options:
python s3debug.py --region us-east-1 --profile myprofile https://test-public-bucket-simonw.s3.us-east-1.amazonaws.com/4f798baccbf14570901b89bb61ab2f2b.png
The script will provide a detailed analysis of why the object might be inaccessible, including checking:
- If the bucket exists
- If the bucket is in a different region
- If there are bucket policies preventing access
- If the object exists or was deleted
- If there are any version history or delete markers
- Public access block settings that might prevent access
Let me know if you'd like me to add any additional checks or modify any of the existing functionality!
human (Dec 18, 2024, 08:54 PM)
it should also check that the specified key exists, and then check the various ACL type things that might affect that key
assistant (Dec 18, 2024, 08:55 PM)
I'll enhance the script to add comprehensive key and ACL checks:
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "click",
# "boto3",
# "urllib3",
# "rich",
# ]
# ///
import click
import boto3
import re
from urllib.parse import urlparse
from rich.console import Console
from rich.table import Table
from botocore.exceptions import ClientError
import json
def parse_s3_url(url):
"""Extract bucket and key from either URL format."""
if 's3.amazonaws.com' in url:
# Format: https://s3.amazonaws.com/bucket-name/key
path_parts = urlparse(url).path.lstrip('/').split('/', 1)
return path_parts[0], path_parts[1]
else:
# Format: https://bucket-name.s3.region.amazonaws.com/key
bucket = urlparse(url).netloc.split('.')[0]
key = urlparse(url).path.lstrip('/')
return bucket, key
def check_bucket_exists(s3_client, bucket):
"""Verify if bucket exists and check its region."""
try:
response = s3_client.head_bucket(Bucket=bucket)
region = s3_client.get_bucket_location(Bucket=bucket)['LocationConstraint']
return True, region
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
if error_code == '404':
return False, "Bucket does not exist"
elif error_code == '403':
return False, "Access denied to bucket"
return False, f"Error: {error_code}"
def check_key_exists(s3_client, bucket, key):
"""Verify if key exists and check its properties."""
results = []
try:
# List objects with prefix to check for similar keys
paginator = s3_client.get_paginator('list_objects_v2')
similar_keys = []
for page in paginator.paginate(Bucket=bucket, Prefix=key.split('/')[0]):
for obj in page.get('Contents', []):
if obj['Key'] != key and key in obj['Key']:
similar_keys.append(obj['Key'])
if similar_keys:
results.append(("Similar keys found", "\n".join(similar_keys[:5])))
# Try to get the specific key
s3_client.head_object(Bucket=bucket, Key=key)
results.append(("Key exists", "Yes"))
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
results.append(("Key exists", f"No - {error_code}"))
# If not found, check if it's in a different case
try:
for page in paginator.paginate(Bucket=bucket, Prefix=key.split('/')[0]):
for obj in page.get('Contents', []):
if obj['Key'].lower() == key.lower():
results.append(("Case mismatch found", obj['Key']))
break
except ClientError:
pass
return results
def check_object_metadata(s3_client, bucket, key):
"""Try to get object metadata, versions, and ACL information."""
results = []
try:
# Check if versioning is enabled
versioning = s3_client.get_bucket_versioning(Bucket=bucket)
results.append(("Bucket Versioning", versioning.get('Status', 'Not enabled')))
# Try to get object metadata
response = s3_client.head_object(Bucket=bucket, Key=key)
results.append(("Object exists", "Yes"))
results.append(("Content Type", response.get('ContentType', 'Unknown')))
results.append(("Size", f"{response.get('ContentLength', 0)} bytes"))
results.append(("Last Modified", str(response.get('LastModified', 'Unknown'))))
results.append(("Storage Class", response.get('StorageClass', 'Unknown')))
# Check object ACL
try:
acl = s3_client.get_object_acl(Bucket=bucket, Key=key)
results.append(("Object Owner", acl['Owner'].get('DisplayName', 'Unknown')))
for grant in acl.get('Grants', []):
grantee = grant['Grantee'].get('DisplayName',
grant['Grantee'].get('URI',
grant['Grantee'].get('ID', 'Unknown')))
results.append((f"ACL Grant to {grantee}", grant['Permission']))
except ClientError as e:
results.append(("Object ACL", f"Error: {e.response['Error']['Message']}"))
# Check for object versions
if versioning.get('Status') == 'Enabled':
versions = s3_client.list_object_versions(
Bucket=bucket,
Prefix=key
)
for version in versions.get('Versions', []):
results.append((
f"Version {version['VersionId']}",
f"Last Modified: {version['LastModified']}"
))
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
results.append(("Error", f"{error_code}: {e.response['Error']['Message']}"))
# Check if object was deleted recently
try:
deleted = s3_client.list_object_versions(
Bucket=bucket,
Prefix=key
).get('DeleteMarkers', [])
if deleted:
results.append(("Delete Markers Found", "Yes"))
for marker in deleted:
results.append((
"Deleted on",
f"{marker['LastModified']} (Version: {marker['VersionId']})"
))
except ClientError:
pass
return results
def check_bucket_policy(s3_client, bucket):
"""Check bucket policy, ACLs, and encryption settings."""
results = []
# Check bucket policy
try:
policy = s3_client.get_bucket_policy(Bucket=bucket)
results.append(("Bucket Policy", json.loads(policy['Policy'])))
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
results.append(("Bucket Policy", "No bucket policy found"))
else:
results.append(("Bucket Policy", f"Error: {e.response['Error']['Message']}"))
# Check bucket ACL
try:
acl = s3_client.get_bucket_acl(Bucket=bucket)
results.append(("Bucket Owner", acl['Owner'].get('DisplayName', 'Unknown')))
for grant in acl.get('Grants', []):
grantee = grant['Grantee'].get('DisplayName',
grant['Grantee'].get('URI',
grant['Grantee'].get('ID', 'Unknown')))
results.append((f"Bucket ACL Grant to {grantee}", grant['Permission']))
except ClientError as e:
results.append(("Bucket ACL", f"Error: {e.response['Error']['Message']}"))
# Check encryption
try:
encryption = s3_client.get_bucket_encryption(Bucket=bucket)
results.append(("Default Encryption",
encryption['ServerSideEncryptionConfiguration']['Rules']))
except ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
results.append(("Default Encryption", "Not configured"))
else:
results.append(("Default Encryption", f"Error: {e.response['Error']['Message']}"))
return results
@click.command()
@click.argument('url')
@click.option('--region', help='AWS region override')
@click.option('--profile', help='AWS profile name')
def debug_s3_access(url, region, profile):
"""Debug S3 access issues for a given URL."""
console = Console()
bucket, key = parse_s3_url(url)
console.print(f"\n[bold]Analyzing S3 access for:[/bold]")
console.print(f"Bucket: {bucket}")
console.print(f"Key: {key}\n")
# Initialize boto3 session
session = boto3.Session(profile_name=profile)
s3_client = session.client('s3', region_name=region)
# Check bucket existence and region
bucket_exists, bucket_region = check_bucket_exists(s3_client, bucket)
if not bucket_exists:
console.print(f"[red]Bucket check failed: {bucket_region}[/red]")
return
if bucket_region and bucket_region != region:
console.print(f"[yellow]Note: Bucket is in {bucket_region}, retrying with correct region[/yellow]\n")
s3_client = session.client('s3', region_name=bucket_region)
# Create results table
table = Table(title="S3 Access Analysis Results")
table.add_column("Check", style="cyan")
table.add_column("Result", style="green")
# Add bucket info
table.add_row("Bucket Exists", "Yes")
table.add_row("Bucket Region", bucket_region or "default")
# Check key existence and similar keys
for check, result in check_key_exists(s3_client, bucket, key):
table.add_row(check, str(result))
# Check bucket policies and ACLs
for check, result in check_bucket_policy(s3_client, bucket):
table.add_row(check, str(result))
# Check object metadata, versions, and ACLs
for check, result in check_object_metadata(s3_client, bucket, key):
table.add_row(check, str(result))
console.print(table)
# Additional checks for public access
try:
public_access = s3_client.get_public_access_block(Bucket=bucket)
console.print("\n[bold]Public Access Settings:[/bold]")
for setting, value in public_access['PublicAccessBlockConfiguration'].items():
console.print(f"{setting}: {value}")
except ClientError:
console.print("\n[yellow]Could not retrieve public access block settings[/yellow]")
if __name__ == '__main__':
debug_s3_access()
The key enhancements I've made include:
-
Added a dedicated
check_key_exists()
function that:- Checks for the exact key
- Looks for similar keys with the same prefix
- Checks for case-sensitivity issues
-
Enhanced object ACL checks:
- Gets the object owner
- Lists all permissions granted on the object
- Shows grantee details (DisplayName, URI, or ID)
-
Enhanced bucket ACL checks:
- Shows bucket owner
- Lists all bucket-level permissions
- Shows encryption settings
-
Added storage class information for the object
-
Improved error handling to show more specific error messages
The script now provides a much more comprehensive view of all the permissions and access controls that might be affecting the object:
- Exact key existence
- Similar keys that might indicate a typo
- Case sensitivity issues
- Object-level ACLs
- Bucket-level ACLs
- Storage class (which might affect retrieval)
- Encryption settings
- All permissions granted on both bucket and object level
This should help identify most common S3 access issues, whether they're related to the key not existing, permissions problems, or configuration issues.
More on this example: https://simonwillison.net/2024/Dec/19/one-shot-python-tools/