Last active
March 27, 2024 15:15
-
-
Save rooterkyberian/a2c12fc6269c86bcf4e199149eb6b9ec to your computer and use it in GitHub Desktop.
python asyncio file access libs test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# derived from https://github.com/mosquito/aiofile/issues/18#issuecomment-497983435 | |
# I do not vouch for correctness of this benchmark results | |
# pip install aiofile aiofiles tabulate uvloop | |
import asyncio | |
import os | |
import platform | |
import random | |
import tempfile | |
import time | |
from importlib.metadata import version | |
from uuid import uuid4 | |
import aiofile | |
import uvloop | |
import tabulate | |
from aiofile import AIOFile, LineReader, Writer | |
from aiofiles import open as aio_open | |
_files = 10 | |
_iters = 10**4 | |
_rand_max = 10 | |
def read_sync(fname): | |
freqs = [0] * _rand_max | |
with open(fname, "r") as fp: | |
for line in fp: | |
num = int(line.strip()) | |
freqs[num] -= 1 | |
return freqs | |
def write_sync(fname): | |
freqs = [0] * _rand_max | |
with open(fname, "w") as fp: | |
for _ in range(_iters): | |
num = random.randrange(0, _rand_max) | |
freqs[num] += 1 | |
fp.write(f"{num}\n") | |
return freqs | |
def test_sync(): | |
fnames = [f"{uuid4()}.txt" for _ in range(_files)] | |
freqs = map(write_sync, fnames) | |
write_freqs = dict(zip(fnames, freqs)) | |
freqs = map(read_sync, fnames) | |
read_freqs = dict(zip(fnames, freqs)) | |
return { | |
name: [w + r for w, r in zip(write_freqs[name], read_freqs[name])] | |
for name in fnames | |
} | |
async def read_aiofile(fname): | |
freqs = [0] * 10 | |
async with AIOFile(fname, "r") as fp: | |
r = LineReader(fp) | |
async for line in r: | |
num = int(line.strip()) | |
freqs[num] -= 1 | |
return freqs | |
async def write_aiofile(fname): | |
freqs = [0] * 10 | |
async with AIOFile(fname, "w") as fp: | |
w = Writer(fp) | |
for _ in range(_iters): | |
num = random.randrange(0, 10) | |
freqs[num] += 1 | |
await w(f"{num}\n") | |
return freqs | |
async def read_aiofiles(fname): | |
freqs = [0] * 10 | |
async with aio_open(fname, "r") as fp: | |
async for line in fp: | |
num = int(line.strip()) | |
freqs[num] -= 1 | |
return freqs | |
async def write_aiofiles(fname): | |
freqs = [0] * 10 | |
async with aio_open(fname, "w") as fp: | |
for _ in range(_iters): | |
num = random.randrange(0, 10) | |
freqs[num] += 1 | |
await fp.write(f"{num}\n") | |
return freqs | |
async def test_async(reader, writer): | |
fnames = [f"{uuid4()}.txt" for _ in range(_files)] | |
freqs = await asyncio.gather(*map(writer, fnames)) | |
write_freqs = dict(zip(fnames, freqs)) | |
freqs = await asyncio.gather(*map(reader, fnames)) | |
read_freqs = dict(zip(fnames, freqs)) | |
return { | |
name: [w + r for w, r in zip(write_freqs[name], read_freqs[name])] | |
for name in fnames | |
} | |
async def test_executor(): | |
loop = asyncio.get_event_loop() | |
return await loop.run_in_executor(None, test_sync) | |
async def test_multi_job_executor(): | |
async def sleeper(): | |
while True: | |
await asyncio.sleep(0.01) | |
freqs, pending = await asyncio.wait( | |
( | |
asyncio.ensure_future(test_executor()), | |
asyncio.ensure_future(sleeper()), | |
), | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
for co in pending: | |
co.cancel() | |
return await list(freqs)[0] | |
def test_sync_one(): | |
fname = f"{uuid4()}.txt" | |
write_freq = write_sync(fname) | |
read_freq = read_sync(fname) | |
return (fname, [w + r for w, r in zip(write_freq, read_freq)]) | |
async def test_executor_parallel(): | |
loop = asyncio.get_event_loop() | |
return dict( | |
await asyncio.gather( | |
*(loop.run_in_executor(None, test_sync_one) for _ in range(_files)) | |
) | |
) | |
def timed_test(cb): | |
with tempfile.TemporaryDirectory(prefix="io_pref_test") as tmpdir: | |
os.chdir(tmpdir) | |
t = time.perf_counter() | |
ret = cb() | |
timed = time.perf_counter() - t | |
assert check(ret) | |
return timed | |
def check(freqs): | |
return all(all(v == 0 for v in f) for f in freqs.values()) | |
def main(): | |
stats = {} | |
counts = [1e3, 1e4, 1e5, 1e6] | |
print(f"Python version: {platform.python_version()}") | |
print(f"Platform: {platform.platform()}") | |
for package in ["aiofiles", "aiofile", "uvloop"]: | |
print(f"{package} version: {version(package)}") | |
print("aiofile default context", aiofile.aio.get_default_context()) | |
targets = { | |
"sync": test_sync, | |
"async executor 'dumb'": lambda: asyncio.run(test_executor()), | |
"async executor w/ coroutines": lambda: asyncio.run(test_multi_job_executor()), | |
"async multiple executors": lambda: asyncio.run(test_executor_parallel()), | |
"async aiofiles": lambda: asyncio.run(test_async(read_aiofiles, write_aiofiles)), | |
"async aiofile": lambda: asyncio.run(test_async(read_aiofile, write_aiofile)), | |
"aiofiles@uvloop": lambda: uvloop.run(test_async(read_aiofiles, write_aiofiles)), | |
"aiofile@uvloop": lambda: uvloop.run(test_async(read_aiofile, write_aiofile)), | |
} | |
global _iters | |
for n in counts: | |
_iters = int(n) | |
iteration_stats = stats[n] = {} | |
print(f"Running tests for {n:.0e} iterations") | |
for target, cb in targets.items(): | |
print(f"Running {target} test") | |
iteration_stats[target] = timed_test(cb) | |
table = [["iterations"] + list(stats[counts[0]].keys())] | |
for n_, iteration_stats in stats.items(): | |
table.append([f"{n_:.0e}"] + [f"{round(v, 3)}" for v in iteration_stats.values()]) | |
print(tabulate.tabulate(table, tablefmt="github", headers="firstrow")) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment