Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Last active September 19, 2018 22:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thehesiod/dc04230a5cdac70f25905d3a1cad71ce to your computer and use it in GitHub Desktop.
Save thehesiod/dc04230a5cdac70f25905d3a1cad71ce to your computer and use it in GitHub Desktop.
aiobotocore / botocore leak test script
#!/usr/bin/env python3
import botocore.exceptions
import botocore.session
import tracemalloc
import aiobotocore.session
import gc
import os
import time
import sys
import asyncio
import aiohttp
import re
import wrapt
import linecache
import objgraph
import argparse
import ssl
from urllib.parse import urlparse
import socket
import logging
import glob
from functools import partial
CREDENTIAL = 'DUMMYAWSCREDENTIALS'
URLS = {
'https://s3.us-west-2.amazonaws.com/archpi.dabase.com/style.css': {
'method': 'get',
'headers': {'User-Agent': 'Botocore/1.8.21 Python/3.6.4 Darwin/17.5.0', 'X-Amz-Date': '20180518T025044Z', 'X-Amz-Content-SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 'Authorization': f'AWS4-HMAC-SHA256 Credential={CREDENTIAL}/20180518/us-west-2/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=ae552641b9aa9a7a267fcb4e36960cd5863e55d91c9b45fd39b30fdcd2e81489', 'Accept-Encoding': 'identity'}
},
'https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist': {
'method': 'GET' if sys.argv[1] == 'get_object' else 'HEAD',
'headers': {'User-Agent': 'Botocore/1.8.21 Python/3.6.4 Darwin/17.5.0', 'X-Amz-Date': '20180518T025221Z', 'X-Amz-Content-SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 'Authorization': f'AWS4-HMAC-SHA256 Credential={CREDENTIAL}/20180518/ap-southeast-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=7a7675ef6d70cb647ed59e02d532ffa80d437fb03976d8246ea9ef102d118794', 'Accept-Encoding': 'identity'}
}
}
_TRACE_FILTERS = (
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, tracemalloc.__file__, all_frames=True), # needed because tracemalloc calls fnmatch
tracemalloc.Filter(False, linecache.__file__),
tracemalloc.Filter(False, os.path.abspath(__file__), all_frames=True), # since we call weakref
)
def display_top(snapshot):
top_stats = snapshot.statistics('traceback')
for stat in top_stats:
print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024))
for line in stat.traceback.format():
print(line)
class HttpClient(asyncio.streams.FlowControlMixin):
transport = None
def __init__(self, *args, **kwargs):
self.__url = kwargs.pop('url')
self.__logger = logging.getLogger()
super().__init__()
def connection_made(self, transport):
self.transport = transport
url_parts = urlparse(self.__url)
entry = URLS[self.__url]
body = f'{entry["method"]} {url_parts.path} HTTP/1.1\r\nAccept: */*\r\nHost: {url_parts.hostname}\r\n'
for name, value in entry['headers'].items():
body += f'{name}: {value}\r\n'
body += '\r\n'
self.transport.write(body.encode('ascii'))
self.__logger.info(f'data sent: {body}')
def data_received(self, data):
self.__logger.info(f'data received: {data}')
self.transport.close()
# asyncio.get_event_loop().call_later(1.0, )
def eof_received(self):
self.__logger.info('eof_received')
def connection_lost(self, exc):
self.__logger.info(f'connection lost: {exc}')
super().connection_lost(exc)
@classmethod
def create_factory(cls, url: str):
def factory(*args, **kwargs):
return cls(*args, url=url, **kwargs)
return factory
async def test_aiobotocore(app_args, s3_client):
try:
method = getattr(s3_client, app_args.method)
await method(Bucket='archpi.dabase.com', Key='doesnotexist')
except botocore.exceptions.ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] != 404:
raise
def test_botocore(app_args, s3_client):
try:
method = getattr(s3_client, app_args.method)
method(Bucket='archpi.dabase.com', Key='doesnotexist')
except botocore.exceptions.ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] != 404:
raise
async def test_aiohttp(session: aiohttp.ClientSession, url: str):
try:
async with session.request(**URLS[url], url=url, allow_redirects=False) as response:
data = await response.read()
except KeyError as e:
pass
async def test_asyncio(ssl_context):
loop = asyncio.get_event_loop()
url = 'https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist'
url_parts = urlparse(url)
port = url_parts.port or (80 if url_parts.scheme == 'http' else 443)
infos = await loop.getaddrinfo(url_parts.hostname, port, family=socket.AF_INET)
family, type, proto, canonname, sockaddr = infos[0]
await loop.create_connection(HttpClient.create_factory(url), sockaddr[0], port, ssl=ssl_context, family=family, proto=proto, flags=socket.AI_NUMERICHOST, server_hostname=url_parts.hostname, local_addr=None)
async def aiobotocore_get_obj(s3_client):
response = await s3_client.get_object(Bucket='archpi.dabase.com', Key='style.css')
async with response["Body"] as stream:
data = await response["Body"].read()
def botocore_get_obj(s3_client):
response = s3_client.get_object(Bucket='archpi.dabase.com', Key='style.css')
response["Body"].read()
def _process_tick(app_args, start, count):
re._cache.clear()
gc.collect()
if count == 0:
if app_args.trace:
tracemalloc.start(40)
else:
objgraph.show_most_common_types(limit=50)
print(".", end='', flush=True)
if time.time() - start > (60 * app_args.time):
if app_args.trace:
snapshot = tracemalloc.take_snapshot()
snapshot = snapshot.filter_traces(_TRACE_FILTERS)
display_top(snapshot)
else:
print("")
objgraph.show_most_common_types(limit=50)
print(f"cycles: {count}")
return True
return False
async def aiobotocore_test(app_args):
boto_session = aiobotocore.session.get_session()
boto_config = botocore.config.Config(connect_timeout=15, read_timeout=15, max_pool_connections=1)
s3_client = boto_session.create_client('s3', verify=False, config=boto_config)
await aiobotocore_get_obj(s3_client)
start = time.time()
count = 0
while True:
await test_aiobotocore(app_args, s3_client)
if _process_tick(app_args, start, count):
break
count += 1
async def botocore_test(app_args):
boto_session = botocore.session.get_session()
boto_config = botocore.config.Config(connect_timeout=15, read_timeout=15, max_pool_connections=1)
s3_client = boto_session.create_client('s3', verify=False, config=boto_config)
botocore_get_obj(s3_client)
start = time.time()
count = 0
while True:
test_botocore(app_args, s3_client)
if _process_tick(app_args, start, count):
break
count += 1
async def _aiohttp_test_helper(app_args, url: str, connector_args: dict={}):
conn = aiohttp.TCPConnector(limit=1, **connector_args)
async with aiohttp.ClientSession(connector=conn) as session:
await test_aiohttp(session, url)
start = time.time()
count = 0
while True:
req_start = time.time()
await test_aiohttp(session, url)
if _process_tick(app_args, start, count):
break
req_elapsed = time.time() - req_start
sleep_s = max(0.0, 0.5 - req_elapsed) # max 2 requests / s
await asyncio.sleep(sleep_s)
count += 1
aiohttp_test = partial(_aiohttp_test_helper, url='https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist')
aiohttp_test_insecure = partial(_aiohttp_test_helper, url='http://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist')
aiohttp_test_cleanup = partial(_aiohttp_test_helper, url='https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist', connector_args=dict(enable_cleanup_closed=True))
async def asyncio_test(app_args):
start = time.time()
count = 0
ssl_context = ssl.create_default_context()
while True:
await test_asyncio(ssl_context)
if _process_tick(app_args, start, count):
break
count += 1
def _adjust_thread_count(wrapped, instance, args, kwargs):
num_threads = len(instance._threads)
while num_threads < instance._max_workers:
wrapped(*args, **kwargs)
num_threads = len(instance._threads)
def main():
parser = argparse.ArgumentParser(prog="leak tester")
parser.add_argument("-method", choices=['head_object', 'get_object'], default='head_object')
parser.add_argument("-loop", default="native", choices=['native', 'uvloop'], help="Which asyncio loop to use")
parser.add_argument('-test', required=True, choices=['aiohttp_test', 'aiohttp_test_insecure', 'aiohttp_test_cleanup', 'aiobotocore_test', 'asyncio_test', 'botocore_test'])
parser.add_argument('-time', type=int, default=5, help="test time in minutes")
parser.add_argument('-trace', action='store_true', help="Enable tracemalloc tracing")
app_args = parser.parse_args()
logging.basicConfig(level=logging.WARNING)
# adjust thread count by default only adds one thread at a time until it hits max_workers
wrapt.wrap_function_wrapper('concurrent.futures', 'ThreadPoolExecutor._adjust_thread_count', _adjust_thread_count)
method = globals()[app_args.test]
if app_args.loop == "native":
loop = asyncio.get_event_loop()
elif app_args.loop == "uvloop":
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
else:
assert False, "unknown loop type: {}".format(app_args.loop)
loop.run_until_complete(method(app_args))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment