Skip to content

Instantly share code, notes, and snippets.

@hashbrowncipher
Last active July 15, 2020 17:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hashbrowncipher/69f4ce9f8bfa6d5bb88d9c2562e7529d to your computer and use it in GitHub Desktop.
Save hashbrowncipher/69f4ce9f8bfa6d5bb88d9c2562e7529d to your computer and use it in GitHub Desktop.
from aiohttp.client_exceptions import ClientError
import asyncio
from asyncio import Queue
from aiohttp.resolver import AsyncResolver
import json
import os
import random
from blake3 import blake3
from hashlib import md5
from socket import socketpair
from socket import AF_UNIX
from socket import SOCK_DGRAM
from socket import SHUT_WR
import sys
from multiprocessing import Process
from time import perf_counter
import aiohttp
from boto3 import Session
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
backend = default_backend()
aes = algorithms.AES(os.urandom(32))
MB = 1024 * 1024
MB_ZEROES = b'\x00' * MB
CHUNK_SIZE = 128 * MB
CHUNK_ZEROES = b'\x00' * CHUNK_SIZE
JOB_COUNT = 4096
CONCURRENCY = 8
BUCKET = sys.argv[1]
class RandomPayload(aiohttp.Payload):
def __init__(self, limit):
self._iv = os.urandom(16)
self._limit = limit
super().__init__(self._iv)
async def write(self, writer):
cipher = Cipher(aes, modes.CTR(self._iv), backend=backend)
encryptor = cipher.encryptor()
limit = self._limit
buf = bytearray(MB + 16)
hasher = blake3()
while limit:
encryptor.update_into(MB_ZEROES, buf)
view = memoryview(buf)[:MB]
hasher.update(view)
await writer.write(view)
limit -= MB
hasher.digest()
credentials = Session().get_credentials()
def make_request(to):
headers = {
"Host": f"{BUCKET}.s3.amazonaws.com",
"Content-Length": str(CHUNK_SIZE),
"X-Amz-Content-SHA256": "UNSIGNED-PAYLOAD",
}
request = AWSRequest(
method="PUT",
url=f"https://{BUCKET}.s3.amazonaws.com/{to}",
headers=headers
)
SigV4Auth(credentials, "s3", "us-east-1").add_auth(request)
prepared = request.prepare()
return dict(method=prepared.method, headers=dict(prepared.headers), url=prepared.url)
async def do_request(session, job, counts):
url = job["url"]
try:
resp = await session.request(
job["method"],
url,
headers=job["headers"],
data=RandomPayload(CHUNK_SIZE),
)
async with resp:
if resp.status == 200:
pass
else:
print(resp)
return resp.status == 200
return True
except ClientError as ex:
counts.setdefault(type(ex), 0)
counts[type(ex)] += 1
print(f"{type(ex)} {url}")
except asyncio.TimeoutError as ex:
counts["timeout"] += 1
#print(f"{type(ex)} {url}")
return False
class RandomAsyncResolver(AsyncResolver):
async def resolve(self, *args, **kwargs):
ret = await super().resolve(*args, **kwargs)
random.shuffle(ret)
return ret
async def async_worker(q, loop, count, identifier):
timeout = aiohttp.ClientTimeout(sock_read=1, total=3)
conn = aiohttp.TCPConnector(
resolver=RandomAsyncResolver(),
loop=loop,
use_dns_cache=False
)
async with aiohttp.ClientSession(
connector=conn,
loop=loop,
timeout=timeout
) as session:
while True:
job = await q.get()
count["got"] += 1
while not await do_request(session, job, count):
pass
count["done"] += 1
count["active"] = count["got"] - count["done"]
q.task_done()
async def dispatcher(q, loop, sock, counts):
while True:
payload = await loop.sock_recv(sock, 16384)
if not payload:
break
job = json.loads(payload)
await q.put(job)
print("Finished dispatch")
await q.join()
pid = os.getpid()
print((pid, counts))
print(open(f"/proc/{pid}/schedstat").read().split()[1])
async def async_parent(loop, sock, identifier):
counts = dict(got=0, done=0, timeout=0, payload=0, os=0)
q = Queue(1, loop=loop)
workers = [
async_worker(q, loop, counts, identifier)
for i in range(6)
]
done, pending = await asyncio.wait([
dispatcher(q, loop, sock, counts), *workers
], return_when=asyncio.FIRST_COMPLETED)
print(done)
def worker(sock, identifier):
loop = asyncio.get_event_loop()
sock.setblocking(0)
loop.run_until_complete(async_parent(loop, sock, identifier))
def main():
s1, s2 = socketpair(AF_UNIX, SOCK_DGRAM)
processes = [Process(target=worker, args=(s1, i)) for i in range(CONCURRENCY)]
for p in processes:
p.start()
# 1024 files -> 128 GiB
for i in range(JOB_COUNT):
payload = json.dumps(make_request(f"{sys.argv[2]}/{i:04}")).encode("utf8")
s2.send(payload)
for i in range(CONCURRENCY):
s2.send(b"")
s2.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment