Last active
March 29, 2017 15:55
Star
You must be signed in to star a gist
demonstrate memory leak issue with pypy and s3/boto using threads.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Pypy memory leak issue | |
this needs to process some large buckets to show the continual memory | |
accumulation. growth after 100 pages processed shows increase from 240mb rss | |
to 1221 mb rss. | |
interestingly playing around with chunk size shows dramatic effects on memory | |
growth rate. | |
""" | |
import argparse | |
from collections import defaultdict | |
import gc | |
import sys | |
import time | |
import boto3 | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
def chunks(iterable, size=50): | |
"""Break an iterable into lists of size""" | |
batch = [] | |
for n in iterable: | |
batch.append(n) | |
if len(batch) % size == 0: | |
yield batch | |
batch = [] | |
if batch: | |
yield batch | |
def scan_keys(creds, bucket, region, key_set): | |
s = boto3.Session( | |
aws_access_key_id=creds.access_key, | |
aws_secret_access_key=creds.secret_key, | |
aws_session_token=creds.token) | |
client = s.client('s3', region_name=region) | |
stats = {'error': 0, 'objects': 0} | |
for k in key_set: | |
try: | |
info = client.head_object(Bucket=bucket, Key=k['Key']) | |
except Exception as e: | |
stats['error'] += 1 | |
stats['objects'] += 1 | |
return stats | |
def scan_bucket(bucket, region): | |
session = boto3.Session() | |
creds = session.get_credentials().get_frozen_credentials() | |
client = session.client('s3', region_name=region) | |
counters = {'error': 0, 'objects': 0} | |
page_count = 0 | |
paginator = client.get_paginator('list_objects') | |
for page in paginator.paginate(Bucket=bucket): | |
page_count += 1 | |
print "processing page", page_count, counters | |
objects = page.get('Contents') | |
futures = {} | |
# normally we do this in distributed setup passing around | |
# the pages, for local usage the thread pool as the outer loop | |
# would be better. | |
with ThreadPoolExecutor(max_workers=5) as w: | |
for key_set in chunks(objects, 100): | |
futures[w.submit( | |
scan_keys, creds, bucket, region, key_set)] = key_set | |
for f in as_completed(futures): | |
if f.exception(): | |
error_count += 1 | |
continue | |
stats = f.result() | |
for k in stats: | |
counters[k] += stats[k] | |
# https://bitbucket.org/pypy/pypy/issues/1124/memory-usage-parsing-json | |
# note the json here is the sdk files, s3 is xml. | |
# | |
# even with this will see slow and steady memory growth, not quite as | |
# dramatic as the actual application but hopefully enough to track. | |
if getattr(sys, 'pypy_version_info', None): | |
gc.collect() | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-b', '--bucket', required=True) | |
parser.add_argument('-r', '--region', required=True) | |
options = parser.parse_args() | |
scan_bucket(options.bucket, options.region) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
so switching out from creating the session in the thread to passing a client, does seem to reduce the issues. it still needs the gc.collect() to keep memory usage down.