Last active
September 19, 2018 22:52
-
-
Save thehesiod/dc04230a5cdac70f25905d3a1cad71ce to your computer and use it in GitHub Desktop.
aiobotocore / botocore leak test script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import botocore.exceptions | |
import botocore.session | |
import tracemalloc | |
import aiobotocore.session | |
import gc | |
import os | |
import time | |
import sys | |
import asyncio | |
import aiohttp | |
import re | |
import wrapt | |
import linecache | |
import objgraph | |
import argparse | |
import ssl | |
from urllib.parse import urlparse | |
import socket | |
import logging | |
import glob | |
from functools import partial | |
CREDENTIAL = 'DUMMYAWSCREDENTIALS' | |
URLS = { | |
'https://s3.us-west-2.amazonaws.com/archpi.dabase.com/style.css': { | |
'method': 'get', | |
'headers': {'User-Agent': 'Botocore/1.8.21 Python/3.6.4 Darwin/17.5.0', 'X-Amz-Date': '20180518T025044Z', 'X-Amz-Content-SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 'Authorization': f'AWS4-HMAC-SHA256 Credential={CREDENTIAL}/20180518/us-west-2/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=ae552641b9aa9a7a267fcb4e36960cd5863e55d91c9b45fd39b30fdcd2e81489', 'Accept-Encoding': 'identity'} | |
}, | |
'https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist': { | |
'method': 'GET' if sys.argv[1] == 'get_object' else 'HEAD', | |
'headers': {'User-Agent': 'Botocore/1.8.21 Python/3.6.4 Darwin/17.5.0', 'X-Amz-Date': '20180518T025221Z', 'X-Amz-Content-SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 'Authorization': f'AWS4-HMAC-SHA256 Credential={CREDENTIAL}/20180518/ap-southeast-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=7a7675ef6d70cb647ed59e02d532ffa80d437fb03976d8246ea9ef102d118794', 'Accept-Encoding': 'identity'} | |
} | |
} | |
_TRACE_FILTERS = ( | |
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), | |
tracemalloc.Filter(False, tracemalloc.__file__, all_frames=True), # needed because tracemalloc calls fnmatch | |
tracemalloc.Filter(False, linecache.__file__), | |
tracemalloc.Filter(False, os.path.abspath(__file__), all_frames=True), # since we call weakref | |
) | |
def display_top(snapshot): | |
top_stats = snapshot.statistics('traceback') | |
for stat in top_stats: | |
print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024)) | |
for line in stat.traceback.format(): | |
print(line) | |
class HttpClient(asyncio.streams.FlowControlMixin): | |
transport = None | |
def __init__(self, *args, **kwargs): | |
self.__url = kwargs.pop('url') | |
self.__logger = logging.getLogger() | |
super().__init__() | |
def connection_made(self, transport): | |
self.transport = transport | |
url_parts = urlparse(self.__url) | |
entry = URLS[self.__url] | |
body = f'{entry["method"]} {url_parts.path} HTTP/1.1\r\nAccept: */*\r\nHost: {url_parts.hostname}\r\n' | |
for name, value in entry['headers'].items(): | |
body += f'{name}: {value}\r\n' | |
body += '\r\n' | |
self.transport.write(body.encode('ascii')) | |
self.__logger.info(f'data sent: {body}') | |
def data_received(self, data): | |
self.__logger.info(f'data received: {data}') | |
self.transport.close() | |
# asyncio.get_event_loop().call_later(1.0, ) | |
def eof_received(self): | |
self.__logger.info('eof_received') | |
def connection_lost(self, exc): | |
self.__logger.info(f'connection lost: {exc}') | |
super().connection_lost(exc) | |
@classmethod | |
def create_factory(cls, url: str): | |
def factory(*args, **kwargs): | |
return cls(*args, url=url, **kwargs) | |
return factory | |
async def test_aiobotocore(app_args, s3_client): | |
try: | |
method = getattr(s3_client, app_args.method) | |
await method(Bucket='archpi.dabase.com', Key='doesnotexist') | |
except botocore.exceptions.ClientError as e: | |
if e.response['ResponseMetadata']['HTTPStatusCode'] != 404: | |
raise | |
def test_botocore(app_args, s3_client): | |
try: | |
method = getattr(s3_client, app_args.method) | |
method(Bucket='archpi.dabase.com', Key='doesnotexist') | |
except botocore.exceptions.ClientError as e: | |
if e.response['ResponseMetadata']['HTTPStatusCode'] != 404: | |
raise | |
async def test_aiohttp(session: aiohttp.ClientSession, url: str): | |
try: | |
async with session.request(**URLS[url], url=url, allow_redirects=False) as response: | |
data = await response.read() | |
except KeyError as e: | |
pass | |
async def test_asyncio(ssl_context): | |
loop = asyncio.get_event_loop() | |
url = 'https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist' | |
url_parts = urlparse(url) | |
port = url_parts.port or (80 if url_parts.scheme == 'http' else 443) | |
infos = await loop.getaddrinfo(url_parts.hostname, port, family=socket.AF_INET) | |
family, type, proto, canonname, sockaddr = infos[0] | |
await loop.create_connection(HttpClient.create_factory(url), sockaddr[0], port, ssl=ssl_context, family=family, proto=proto, flags=socket.AI_NUMERICHOST, server_hostname=url_parts.hostname, local_addr=None) | |
async def aiobotocore_get_obj(s3_client): | |
response = await s3_client.get_object(Bucket='archpi.dabase.com', Key='style.css') | |
async with response["Body"] as stream: | |
data = await response["Body"].read() | |
def botocore_get_obj(s3_client): | |
response = s3_client.get_object(Bucket='archpi.dabase.com', Key='style.css') | |
response["Body"].read() | |
def _process_tick(app_args, start, count): | |
re._cache.clear() | |
gc.collect() | |
if count == 0: | |
if app_args.trace: | |
tracemalloc.start(40) | |
else: | |
objgraph.show_most_common_types(limit=50) | |
print(".", end='', flush=True) | |
if time.time() - start > (60 * app_args.time): | |
if app_args.trace: | |
snapshot = tracemalloc.take_snapshot() | |
snapshot = snapshot.filter_traces(_TRACE_FILTERS) | |
display_top(snapshot) | |
else: | |
print("") | |
objgraph.show_most_common_types(limit=50) | |
print(f"cycles: {count}") | |
return True | |
return False | |
async def aiobotocore_test(app_args): | |
boto_session = aiobotocore.session.get_session() | |
boto_config = botocore.config.Config(connect_timeout=15, read_timeout=15, max_pool_connections=1) | |
s3_client = boto_session.create_client('s3', verify=False, config=boto_config) | |
await aiobotocore_get_obj(s3_client) | |
start = time.time() | |
count = 0 | |
while True: | |
await test_aiobotocore(app_args, s3_client) | |
if _process_tick(app_args, start, count): | |
break | |
count += 1 | |
async def botocore_test(app_args): | |
boto_session = botocore.session.get_session() | |
boto_config = botocore.config.Config(connect_timeout=15, read_timeout=15, max_pool_connections=1) | |
s3_client = boto_session.create_client('s3', verify=False, config=boto_config) | |
botocore_get_obj(s3_client) | |
start = time.time() | |
count = 0 | |
while True: | |
test_botocore(app_args, s3_client) | |
if _process_tick(app_args, start, count): | |
break | |
count += 1 | |
async def _aiohttp_test_helper(app_args, url: str, connector_args: dict={}): | |
conn = aiohttp.TCPConnector(limit=1, **connector_args) | |
async with aiohttp.ClientSession(connector=conn) as session: | |
await test_aiohttp(session, url) | |
start = time.time() | |
count = 0 | |
while True: | |
req_start = time.time() | |
await test_aiohttp(session, url) | |
if _process_tick(app_args, start, count): | |
break | |
req_elapsed = time.time() - req_start | |
sleep_s = max(0.0, 0.5 - req_elapsed) # max 2 requests / s | |
await asyncio.sleep(sleep_s) | |
count += 1 | |
aiohttp_test = partial(_aiohttp_test_helper, url='https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist') | |
aiohttp_test_insecure = partial(_aiohttp_test_helper, url='http://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist') | |
aiohttp_test_cleanup = partial(_aiohttp_test_helper, url='https://s3.ap-southeast-1.amazonaws.com/archpi.dabase.com/doesnotexist', connector_args=dict(enable_cleanup_closed=True)) | |
async def asyncio_test(app_args): | |
start = time.time() | |
count = 0 | |
ssl_context = ssl.create_default_context() | |
while True: | |
await test_asyncio(ssl_context) | |
if _process_tick(app_args, start, count): | |
break | |
count += 1 | |
def _adjust_thread_count(wrapped, instance, args, kwargs): | |
num_threads = len(instance._threads) | |
while num_threads < instance._max_workers: | |
wrapped(*args, **kwargs) | |
num_threads = len(instance._threads) | |
def main(): | |
parser = argparse.ArgumentParser(prog="leak tester") | |
parser.add_argument("-method", choices=['head_object', 'get_object'], default='head_object') | |
parser.add_argument("-loop", default="native", choices=['native', 'uvloop'], help="Which asyncio loop to use") | |
parser.add_argument('-test', required=True, choices=['aiohttp_test', 'aiohttp_test_insecure', 'aiohttp_test_cleanup', 'aiobotocore_test', 'asyncio_test', 'botocore_test']) | |
parser.add_argument('-time', type=int, default=5, help="test time in minutes") | |
parser.add_argument('-trace', action='store_true', help="Enable tracemalloc tracing") | |
app_args = parser.parse_args() | |
logging.basicConfig(level=logging.WARNING) | |
# adjust thread count by default only adds one thread at a time until it hits max_workers | |
wrapt.wrap_function_wrapper('concurrent.futures', 'ThreadPoolExecutor._adjust_thread_count', _adjust_thread_count) | |
method = globals()[app_args.test] | |
if app_args.loop == "native": | |
loop = asyncio.get_event_loop() | |
elif app_args.loop == "uvloop": | |
import uvloop | |
loop = uvloop.new_event_loop() | |
asyncio.set_event_loop(loop) | |
else: | |
assert False, "unknown loop type: {}".format(app_args.loop) | |
loop.run_until_complete(method(app_args)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment