Skip to content

Instantly share code, notes, and snippets.

@oxguy3
Created April 10, 2020 23:21
Show Gist options
  • Save oxguy3/859a028941e3c04419979e8a69e1491e to your computer and use it in GitHub Desktop.
Save oxguy3/859a028941e3c04419979e8a69e1491e to your computer and use it in GitHub Desktop.
Command-line tool for retrieving the listing of a public S3 bucket and saving it to a CSV file
#! /usr/bin/env python3
from lxml import etree
import requests
import argparse
import csv
from pprint import pprint
import time
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def getBucketList(bucket, startAfter=None):
params = {'list-type': '2'}
if (startAfter is not None):
params['start-after'] = startAfter
r = requests.get(
'https://'+bucket+'.s3.amazonaws.com/',
params=params,
verify=False # TODO: partial verification would be nice
)
root = etree.fromstring(r.content)
return root
def main(bucket, startAfter):
xmlns = {'s3': "http://s3.amazonaws.com/doc/2006-03-01/"}
files = []
isTruncated = True
# startAfter = None
count = 0
filename = bucket + '.csv'
keys = ['Key', 'LastModified', 'ETag', 'Size', 'StorageClass']
# while there are still more pages left
while (isTruncated):
print(startAfter)
root = getBucketList(bucket, startAfter)
# check if there's still more pages left
isTruncated = root.xpath(
"/s3:ListBucketResult/s3:IsTruncated//text()",
namespaces=xmlns
)
if (len(isTruncated) > 0 and isTruncated[0] == "true"):
isTruncated = True
else:
isTruncated = False
contents = root.xpath(
"/s3:ListBucketResult/s3:Contents",
namespaces=xmlns
)
for cont in contents:
file = {}
for child in cont:
tag = (child.tag)[len(xmlns['s3'])+2:]
file[tag] = child.text
files.append(file)
startAfter = files[-1]['Key']
if startAfter is None:
# write the CSV header after the first page
with open(filename, 'w', newline='') as csvFile:
dict_writer = csv.DictWriter(csvFile, keys)
dict_writer.writeheader()
elif count % 30 == 0:
# write to disk and clear memory every 100 pages
with open(filename, 'a', newline='') as csvFile:
dict_writer = csv.DictWriter(csvFile, keys)
dict_writer.writerows(files)
files.clear()
print("################ WROTE TO DISK ################")
count += 1
time.sleep(0.1)
# write any remaining files to disk
with open(filename, 'a', newline='') as csvFile:
dict_writer = csv.DictWriter(csvFile, keys)
dict_writer.writerows(files)
files.clear()
parser = argparse.ArgumentParser(description='Get full listing of S3 bucket')
parser.add_argument('bucket', type=str,
help='ID of the S3 bucket')
parser.add_argument('--start', type=str, default=None,
help='file to start download from')
args = parser.parse_args()
main(args.bucket, args.start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment