Skip to content

Instantly share code, notes, and snippets.

@kapilt
Last active March 29, 2017 15:55
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save kapilt/b2550e6b36da5650230a763268e3bd0a to your computer and use it in GitHub Desktop.
demonstrate memory leak issue with pypy and s3/boto using threads.
"""Pypy memory leak issue
this needs to process some large buckets to show the continual memory
accumulation. growth after 100 pages processed shows increase from 240mb rss
to 1221 mb rss.
interestingly playing around with chunk size shows dramatic effects on memory
growth rate.
"""
import argparse
from collections import defaultdict
import gc
import sys
import time
import boto3
from concurrent.futures import ThreadPoolExecutor, as_completed
def chunks(iterable, size=50):
"""Break an iterable into lists of size"""
batch = []
for n in iterable:
batch.append(n)
if len(batch) % size == 0:
yield batch
batch = []
if batch:
yield batch
def scan_keys(creds, bucket, region, key_set):
s = boto3.Session(
aws_access_key_id=creds.access_key,
aws_secret_access_key=creds.secret_key,
aws_session_token=creds.token)
client = s.client('s3', region_name=region)
stats = {'error': 0, 'objects': 0}
for k in key_set:
try:
info = client.head_object(Bucket=bucket, Key=k['Key'])
except Exception as e:
stats['error'] += 1
stats['objects'] += 1
return stats
def scan_bucket(bucket, region):
session = boto3.Session()
creds = session.get_credentials().get_frozen_credentials()
client = session.client('s3', region_name=region)
counters = {'error': 0, 'objects': 0}
page_count = 0
paginator = client.get_paginator('list_objects')
for page in paginator.paginate(Bucket=bucket):
page_count += 1
print "processing page", page_count, counters
objects = page.get('Contents')
futures = {}
# normally we do this in distributed setup passing around
# the pages, for local usage the thread pool as the outer loop
# would be better.
with ThreadPoolExecutor(max_workers=5) as w:
for key_set in chunks(objects, 100):
futures[w.submit(
scan_keys, creds, bucket, region, key_set)] = key_set
for f in as_completed(futures):
if f.exception():
error_count += 1
continue
stats = f.result()
for k in stats:
counters[k] += stats[k]
# https://bitbucket.org/pypy/pypy/issues/1124/memory-usage-parsing-json
# note the json here is the sdk files, s3 is xml.
#
# even with this will see slow and steady memory growth, not quite as
# dramatic as the actual application but hopefully enough to track.
if getattr(sys, 'pypy_version_info', None):
gc.collect()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--bucket', required=True)
parser.add_argument('-r', '--region', required=True)
options = parser.parse_args()
scan_bucket(options.bucket, options.region)
if __name__ == '__main__':
main()
@kapilt
Copy link
Author

kapilt commented Mar 29, 2017

so switching out from creating the session in the thread to passing a client, does seem to reduce the issues. it still needs the gc.collect() to keep memory usage down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment