Created
February 9, 2017 23:36
-
-
Save StevenACoffman/fdb4ce6f54d7d979d0a631c1bb0a5983 to your computer and use it in GitHub Desktop.
Recursively walk S3 file contents from root url
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import boto3 | |
from datetime import datetime | |
import json | |
import sys | |
import re | |
def json_serial(obj): | |
"""JSON serializer for objects not serializable by default json code""" | |
if isinstance(obj, datetime): | |
serial = obj.isoformat() | |
return serial | |
raise TypeError("Type not serializable") | |
class InvalidArgument(RuntimeError): | |
"""Exception for invalid input parameters""" | |
pass | |
class S3URL: | |
"""Simple wrapper for S3 URL. | |
This class parses a S3 URL and provides accessors to each component. | |
""" | |
S3URL_PATTERN = re.compile(r'(s3[n]?)://([^/]+)[/]?(.*)') | |
def __init__(self, uri): | |
"""Initialization, parse S3 URL""" | |
try: | |
self.proto, self.bucket, self.path = S3URL.S3URL_PATTERN.match(uri).groups() | |
self.proto = 's3' # normalize s3n => s3 | |
except Exception: | |
raise InvalidArgument('Invalid S3 URI: %s' % uri) | |
def __str__(self): | |
"""Return the original S3 URL""" | |
return S3URL.combine(self.proto, self.bucket, self.path) | |
def get_fixed_path(self): | |
"""Get the fixed part of the path without wildcard""" | |
pi = self.path.split(PATH_SEP) | |
fi = [] | |
for p in pi: | |
if '*' in p or '?' in p: | |
break | |
fi.append(p) | |
return PATH_SEP.join(fi) | |
@staticmethod | |
def combine(proto, bucket, path): | |
"""Combine each component and general a S3 url string, no path normalization | |
here. The path should not start with slash. | |
""" | |
return '%s://%s/%s' % (proto, bucket, path) | |
@staticmethod | |
def is_valid(uri): | |
"""Check if given uri is a valid S3 URL""" | |
return S3URL.S3URL_PATTERN.match(uri) is not None | |
def s3walk(s3url, s3dir): | |
"""Recursively walk into all subdirectories""" | |
result = [] | |
paginator = client.get_paginator('list_objects') | |
for page in paginator.paginate( | |
Bucket=s3url.bucket, | |
Prefix=s3dir, | |
Delimiter=PATH_SEP, | |
PaginationConfig={'PageSize': 1000}): | |
# Get subdirectories first. | |
for obj in page.get('CommonPrefixes') or []: | |
obj_name = obj['Prefix'] | |
result.extend(s3walk(s3url, obj_name)) | |
# Then get all items in this folder. | |
for obj in page.get('Contents') or []: | |
obj_name = obj['Key'] | |
result.append({ | |
'Bucket': s3url.bucket, | |
'Key': obj_name, | |
'name': S3URL.combine(s3url.proto, s3url.bucket, obj_name), | |
'is_dir': False, | |
'size': obj['Size'], | |
'last_modified': obj['LastModified'] | |
}) | |
return result | |
s3url = None | |
if len(sys.argv) > 1: | |
s3path = sys.argv[1] | |
if s3path and S3URL.is_valid(s3path): | |
s3url = S3URL(s3path) | |
else: | |
raise ValueError("Invalid S3 url") | |
else: | |
raise ValueError("Uh... you need an S3 path") | |
client = boto3.client("s3") | |
PATH_SEP = "/" | |
s3path = sys.argv[1] | |
result = s3walk(s3url, s3url.get_fixed_path()) | |
print(json.dumps(result, default=json_serial)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Lifted lots from s4cmd.py.
Intended to be executed like
s3walk.py "s3://myBucket/some/long/crazy/path"