Skip to content

Instantly share code, notes, and snippets.

@simonw
Created December 19, 2024 05:27
Show Gist options
  • Save simonw/9f69cf35889b0445b80eeed691d44504 to your computer and use it in GitHub Desktop.
Save simonw/9f69cf35889b0445b80eeed691d44504 to your computer and use it in GitHub Desktop.

Debugging S3 Access Issues with Python CLI

human (Dec 18, 2024, 08:52 PM)

I can't access the file at https://s3.amazonaws.com/test-public-bucket-simonw/4f798baccbf14570901b89bb61ab2f2b.png

Write me a Python CLI tool using Click and boto3 which takes a URL of that form - or of the form https://test-public-bucket-simonw.s3.us-east-1.amazonaws.com/4f798baccbf14570901b89bb61ab2f2b.png - and then uses EVERY single boto3 trick in the book to try and debug why the file is returning a 404

assistant (Dec 18, 2024, 08:53 PM)

I'll help you create a comprehensive debugging tool for S3 access issues. Here's a script that employs various boto3 techniques to investigate S3 access problems:

# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "click",
#     "boto3",
#     "urllib3",
#     "rich",
# ]
# ///

import click
import boto3
import re
from urllib.parse import urlparse
from rich.console import Console
from rich.table import Table
from botocore.exceptions import ClientError
import json

def parse_s3_url(url):
    """Extract bucket and key from either URL format."""
    if 's3.amazonaws.com' in url:
        # Format: https://s3.amazonaws.com/bucket-name/key
        path_parts = urlparse(url).path.lstrip('/').split('/', 1)
        return path_parts[0], path_parts[1]
    else:
        # Format: https://bucket-name.s3.region.amazonaws.com/key
        bucket = urlparse(url).netloc.split('.')[0]
        key = urlparse(url).path.lstrip('/')
        return bucket, key

def check_bucket_exists(s3_client, bucket):
    """Verify if bucket exists and check its region."""
    try:
        response = s3_client.head_bucket(Bucket=bucket)
        region = s3_client.get_bucket_location(Bucket=bucket)['LocationConstraint']
        return True, region
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        if error_code == '404':
            return False, "Bucket does not exist"
        elif error_code == '403':
            return False, "Access denied to bucket"
        return False, f"Error: {error_code}"

def check_object_metadata(s3_client, bucket, key):
    """Try to get object metadata and versions."""
    results = []
    try:
        # Check if versioning is enabled
        versioning = s3_client.get_bucket_versioning(Bucket=bucket)
        results.append(("Bucket Versioning", versioning.get('Status', 'Not enabled')))

        # Try to get object metadata
        response = s3_client.head_object(Bucket=bucket, Key=key)
        results.append(("Object exists", "Yes"))
        results.append(("Content Type", response.get('ContentType', 'Unknown')))
        results.append(("Size", f"{response.get('ContentLength', 0)} bytes"))
        results.append(("Last Modified", str(response.get('LastModified', 'Unknown'))))
        
        # Check for object versions
        if versioning.get('Status') == 'Enabled':
            versions = s3_client.list_object_versions(
                Bucket=bucket,
                Prefix=key
            )
            for version in versions.get('Versions', []):
                results.append((
                    f"Version {version['VersionId']}",
                    f"Last Modified: {version['LastModified']}"
                ))
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        results.append(("Error", f"{error_code}: {e.response['Error']['Message']}"))
        
        # Check if object was deleted recently
        try:
            deleted = s3_client.list_object_versions(
                Bucket=bucket,
                Prefix=key
            ).get('DeleteMarkers', [])
            if deleted:
                results.append(("Delete Markers Found", "Yes"))
                for marker in deleted:
                    results.append((
                        "Deleted on",
                        f"{marker['LastModified']} (Version: {marker['VersionId']})"
                    ))
        except ClientError:
            pass
    
    return results

def check_bucket_policy(s3_client, bucket):
    """Check bucket policy and ACLs."""
    try:
        policy = s3_client.get_bucket_policy(Bucket=bucket)
        return json.loads(policy['Policy'])
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
            return "No bucket policy found"
        return f"Error getting bucket policy: {e.response['Error']['Message']}"

@click.command()
@click.argument('url')
@click.option('--region', help='AWS region override')
@click.option('--profile', help='AWS profile name')
def debug_s3_access(url, region, profile):
    """Debug S3 access issues for a given URL."""
    console = Console()
    
    bucket, key = parse_s3_url(url)
    console.print(f"\n[bold]Analyzing S3 access for:[/bold]")
    console.print(f"Bucket: {bucket}")
    console.print(f"Key: {key}\n")

    # Initialize boto3 session
    session = boto3.Session(profile_name=profile)
    s3_client = session.client('s3', region_name=region)

    # Check bucket existence and region
    bucket_exists, bucket_region = check_bucket_exists(s3_client, bucket)
    
    if not bucket_exists:
        console.print(f"[red]Bucket check failed: {bucket_region}[/red]")
        return

    if bucket_region and bucket_region != region:
        console.print(f"[yellow]Note: Bucket is in {bucket_region}, retrying with correct region[/yellow]\n")
        s3_client = session.client('s3', region_name=bucket_region)

    # Create results table
    table = Table(title="S3 Access Analysis Results")
    table.add_column("Check", style="cyan")
    table.add_column("Result", style="green")

    # Add bucket info
    table.add_row("Bucket Exists", "Yes")
    table.add_row("Bucket Region", bucket_region or "default")

    # Check bucket policy
    policy = check_bucket_policy(s3_client, bucket)
    table.add_row("Bucket Policy", str(policy))

    # Check object metadata and versions
    for check, result in check_object_metadata(s3_client, bucket, key):
        table.add_row(check, str(result))

    console.print(table)

    # Additional checks for public access
    try:
        public_access = s3_client.get_public_access_block(Bucket=bucket)
        console.print("\n[bold]Public Access Settings:[/bold]")
        for setting, value in public_access['PublicAccessBlockConfiguration'].items():
            console.print(f"{setting}: {value}")
    except ClientError:
        console.print("\n[yellow]Could not retrieve public access block settings[/yellow]")

if __name__ == '__main__':
    debug_s3_access()

This script provides comprehensive S3 debugging capabilities:

  1. Supports both S3 URL formats (s3.amazonaws.com/bucket/key and bucket.s3.region.amazonaws.com/key)
  2. Checks bucket existence and region
  3. Automatically handles region redirects
  4. Examines bucket policies and ACLs
  5. Checks object metadata including size and last modified date
  6. Looks for versioning and checks version history
  7. Checks for delete markers in case the object was recently deleted
  8. Examines public access block settings
  9. Supports AWS profiles for different credentials
  10. Uses rich for pretty console output

You can use it like this:

python s3debug.py https://s3.amazonaws.com/test-public-bucket-simonw/4f798baccbf14570901b89bb61ab2f2b.png
# Or with options:
python s3debug.py --region us-east-1 --profile myprofile https://test-public-bucket-simonw.s3.us-east-1.amazonaws.com/4f798baccbf14570901b89bb61ab2f2b.png

The script will provide a detailed analysis of why the object might be inaccessible, including checking:

  • If the bucket exists
  • If the bucket is in a different region
  • If there are bucket policies preventing access
  • If the object exists or was deleted
  • If there are any version history or delete markers
  • Public access block settings that might prevent access

Let me know if you'd like me to add any additional checks or modify any of the existing functionality!

human (Dec 18, 2024, 08:54 PM)

it should also check that the specified key exists, and then check the various ACL type things that might affect that key

assistant (Dec 18, 2024, 08:55 PM)

I'll enhance the script to add comprehensive key and ACL checks:

# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "click",
#     "boto3",
#     "urllib3",
#     "rich",
# ]
# ///

import click
import boto3
import re
from urllib.parse import urlparse
from rich.console import Console
from rich.table import Table
from botocore.exceptions import ClientError
import json

def parse_s3_url(url):
    """Extract bucket and key from either URL format."""
    if 's3.amazonaws.com' in url:
        # Format: https://s3.amazonaws.com/bucket-name/key
        path_parts = urlparse(url).path.lstrip('/').split('/', 1)
        return path_parts[0], path_parts[1]
    else:
        # Format: https://bucket-name.s3.region.amazonaws.com/key
        bucket = urlparse(url).netloc.split('.')[0]
        key = urlparse(url).path.lstrip('/')
        return bucket, key

def check_bucket_exists(s3_client, bucket):
    """Verify if bucket exists and check its region."""
    try:
        response = s3_client.head_bucket(Bucket=bucket)
        region = s3_client.get_bucket_location(Bucket=bucket)['LocationConstraint']
        return True, region
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        if error_code == '404':
            return False, "Bucket does not exist"
        elif error_code == '403':
            return False, "Access denied to bucket"
        return False, f"Error: {error_code}"

def check_key_exists(s3_client, bucket, key):
    """Verify if key exists and check its properties."""
    results = []
    try:
        # List objects with prefix to check for similar keys
        paginator = s3_client.get_paginator('list_objects_v2')
        similar_keys = []
        for page in paginator.paginate(Bucket=bucket, Prefix=key.split('/')[0]):
            for obj in page.get('Contents', []):
                if obj['Key'] != key and key in obj['Key']:
                    similar_keys.append(obj['Key'])
        
        if similar_keys:
            results.append(("Similar keys found", "\n".join(similar_keys[:5])))
        
        # Try to get the specific key
        s3_client.head_object(Bucket=bucket, Key=key)
        results.append(("Key exists", "Yes"))
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        results.append(("Key exists", f"No - {error_code}"))
        
        # If not found, check if it's in a different case
        try:
            for page in paginator.paginate(Bucket=bucket, Prefix=key.split('/')[0]):
                for obj in page.get('Contents', []):
                    if obj['Key'].lower() == key.lower():
                        results.append(("Case mismatch found", obj['Key']))
                        break
        except ClientError:
            pass
    
    return results

def check_object_metadata(s3_client, bucket, key):
    """Try to get object metadata, versions, and ACL information."""
    results = []
    try:
        # Check if versioning is enabled
        versioning = s3_client.get_bucket_versioning(Bucket=bucket)
        results.append(("Bucket Versioning", versioning.get('Status', 'Not enabled')))

        # Try to get object metadata
        response = s3_client.head_object(Bucket=bucket, Key=key)
        results.append(("Object exists", "Yes"))
        results.append(("Content Type", response.get('ContentType', 'Unknown')))
        results.append(("Size", f"{response.get('ContentLength', 0)} bytes"))
        results.append(("Last Modified", str(response.get('LastModified', 'Unknown'))))
        results.append(("Storage Class", response.get('StorageClass', 'Unknown')))
        
        # Check object ACL
        try:
            acl = s3_client.get_object_acl(Bucket=bucket, Key=key)
            results.append(("Object Owner", acl['Owner'].get('DisplayName', 'Unknown')))
            for grant in acl.get('Grants', []):
                grantee = grant['Grantee'].get('DisplayName', 
                                             grant['Grantee'].get('URI', 
                                             grant['Grantee'].get('ID', 'Unknown')))
                results.append((f"ACL Grant to {grantee}", grant['Permission']))
        except ClientError as e:
            results.append(("Object ACL", f"Error: {e.response['Error']['Message']}"))
        
        # Check for object versions
        if versioning.get('Status') == 'Enabled':
            versions = s3_client.list_object_versions(
                Bucket=bucket,
                Prefix=key
            )
            for version in versions.get('Versions', []):
                results.append((
                    f"Version {version['VersionId']}",
                    f"Last Modified: {version['LastModified']}"
                ))
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        results.append(("Error", f"{error_code}: {e.response['Error']['Message']}"))
        
        # Check if object was deleted recently
        try:
            deleted = s3_client.list_object_versions(
                Bucket=bucket,
                Prefix=key
            ).get('DeleteMarkers', [])
            if deleted:
                results.append(("Delete Markers Found", "Yes"))
                for marker in deleted:
                    results.append((
                        "Deleted on",
                        f"{marker['LastModified']} (Version: {marker['VersionId']})"
                    ))
        except ClientError:
            pass
    
    return results

def check_bucket_policy(s3_client, bucket):
    """Check bucket policy, ACLs, and encryption settings."""
    results = []
    
    # Check bucket policy
    try:
        policy = s3_client.get_bucket_policy(Bucket=bucket)
        results.append(("Bucket Policy", json.loads(policy['Policy'])))
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
            results.append(("Bucket Policy", "No bucket policy found"))
        else:
            results.append(("Bucket Policy", f"Error: {e.response['Error']['Message']}"))
    
    # Check bucket ACL
    try:
        acl = s3_client.get_bucket_acl(Bucket=bucket)
        results.append(("Bucket Owner", acl['Owner'].get('DisplayName', 'Unknown')))
        for grant in acl.get('Grants', []):
            grantee = grant['Grantee'].get('DisplayName', 
                                         grant['Grantee'].get('URI', 
                                         grant['Grantee'].get('ID', 'Unknown')))
            results.append((f"Bucket ACL Grant to {grantee}", grant['Permission']))
    except ClientError as e:
        results.append(("Bucket ACL", f"Error: {e.response['Error']['Message']}"))
    
    # Check encryption
    try:
        encryption = s3_client.get_bucket_encryption(Bucket=bucket)
        results.append(("Default Encryption", 
                       encryption['ServerSideEncryptionConfiguration']['Rules']))
    except ClientError as e:
        if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
            results.append(("Default Encryption", "Not configured"))
        else:
            results.append(("Default Encryption", f"Error: {e.response['Error']['Message']}"))
    
    return results

@click.command()
@click.argument('url')
@click.option('--region', help='AWS region override')
@click.option('--profile', help='AWS profile name')
def debug_s3_access(url, region, profile):
    """Debug S3 access issues for a given URL."""
    console = Console()
    
    bucket, key = parse_s3_url(url)
    console.print(f"\n[bold]Analyzing S3 access for:[/bold]")
    console.print(f"Bucket: {bucket}")
    console.print(f"Key: {key}\n")

    # Initialize boto3 session
    session = boto3.Session(profile_name=profile)
    s3_client = session.client('s3', region_name=region)

    # Check bucket existence and region
    bucket_exists, bucket_region = check_bucket_exists(s3_client, bucket)
    
    if not bucket_exists:
        console.print(f"[red]Bucket check failed: {bucket_region}[/red]")
        return

    if bucket_region and bucket_region != region:
        console.print(f"[yellow]Note: Bucket is in {bucket_region}, retrying with correct region[/yellow]\n")
        s3_client = session.client('s3', region_name=bucket_region)

    # Create results table
    table = Table(title="S3 Access Analysis Results")
    table.add_column("Check", style="cyan")
    table.add_column("Result", style="green")

    # Add bucket info
    table.add_row("Bucket Exists", "Yes")
    table.add_row("Bucket Region", bucket_region or "default")

    # Check key existence and similar keys
    for check, result in check_key_exists(s3_client, bucket, key):
        table.add_row(check, str(result))

    # Check bucket policies and ACLs
    for check, result in check_bucket_policy(s3_client, bucket):
        table.add_row(check, str(result))

    # Check object metadata, versions, and ACLs
    for check, result in check_object_metadata(s3_client, bucket, key):
        table.add_row(check, str(result))

    console.print(table)

    # Additional checks for public access
    try:
        public_access = s3_client.get_public_access_block(Bucket=bucket)
        console.print("\n[bold]Public Access Settings:[/bold]")
        for setting, value in public_access['PublicAccessBlockConfiguration'].items():
            console.print(f"{setting}: {value}")
    except ClientError:
        console.print("\n[yellow]Could not retrieve public access block settings[/yellow]")

if __name__ == '__main__':
    debug_s3_access()

The key enhancements I've made include:

  1. Added a dedicated check_key_exists() function that:

    • Checks for the exact key
    • Looks for similar keys with the same prefix
    • Checks for case-sensitivity issues
  2. Enhanced object ACL checks:

    • Gets the object owner
    • Lists all permissions granted on the object
    • Shows grantee details (DisplayName, URI, or ID)
  3. Enhanced bucket ACL checks:

    • Shows bucket owner
    • Lists all bucket-level permissions
    • Shows encryption settings
  4. Added storage class information for the object

  5. Improved error handling to show more specific error messages

The script now provides a much more comprehensive view of all the permissions and access controls that might be affecting the object:

  • Exact key existence
  • Similar keys that might indicate a typo
  • Case sensitivity issues
  • Object-level ACLs
  • Bucket-level ACLs
  • Storage class (which might affect retrieval)
  • Encryption settings
  • All permissions granted on both bucket and object level

This should help identify most common S3 access issues, whether they're related to the key not existing, permissions problems, or configuration issues.

@simonw
Copy link
Author

simonw commented Dec 19, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment