Skip to content

Instantly share code, notes, and snippets.

@rooterkyberian
Last active March 27, 2024 15:15
Show Gist options
  • Save rooterkyberian/a2c12fc6269c86bcf4e199149eb6b9ec to your computer and use it in GitHub Desktop.
Save rooterkyberian/a2c12fc6269c86bcf4e199149eb6b9ec to your computer and use it in GitHub Desktop.
python asyncio file access libs test
# derived from https://github.com/mosquito/aiofile/issues/18#issuecomment-497983435
# I do not vouch for correctness of this benchmark results
# pip install aiofile aiofiles tabulate uvloop
import asyncio
import os
import platform
import random
import tempfile
import time
from importlib.metadata import version
from uuid import uuid4
import aiofile
import uvloop
import tabulate
from aiofile import AIOFile, LineReader, Writer
from aiofiles import open as aio_open
_files = 10
_iters = 10**4
_rand_max = 10
def read_sync(fname):
freqs = [0] * _rand_max
with open(fname, "r") as fp:
for line in fp:
num = int(line.strip())
freqs[num] -= 1
return freqs
def write_sync(fname):
freqs = [0] * _rand_max
with open(fname, "w") as fp:
for _ in range(_iters):
num = random.randrange(0, _rand_max)
freqs[num] += 1
fp.write(f"{num}\n")
return freqs
def test_sync():
fnames = [f"{uuid4()}.txt" for _ in range(_files)]
freqs = map(write_sync, fnames)
write_freqs = dict(zip(fnames, freqs))
freqs = map(read_sync, fnames)
read_freqs = dict(zip(fnames, freqs))
return {
name: [w + r for w, r in zip(write_freqs[name], read_freqs[name])]
for name in fnames
}
async def read_aiofile(fname):
freqs = [0] * 10
async with AIOFile(fname, "r") as fp:
r = LineReader(fp)
async for line in r:
num = int(line.strip())
freqs[num] -= 1
return freqs
async def write_aiofile(fname):
freqs = [0] * 10
async with AIOFile(fname, "w") as fp:
w = Writer(fp)
for _ in range(_iters):
num = random.randrange(0, 10)
freqs[num] += 1
await w(f"{num}\n")
return freqs
async def read_aiofiles(fname):
freqs = [0] * 10
async with aio_open(fname, "r") as fp:
async for line in fp:
num = int(line.strip())
freqs[num] -= 1
return freqs
async def write_aiofiles(fname):
freqs = [0] * 10
async with aio_open(fname, "w") as fp:
for _ in range(_iters):
num = random.randrange(0, 10)
freqs[num] += 1
await fp.write(f"{num}\n")
return freqs
async def test_async(reader, writer):
fnames = [f"{uuid4()}.txt" for _ in range(_files)]
freqs = await asyncio.gather(*map(writer, fnames))
write_freqs = dict(zip(fnames, freqs))
freqs = await asyncio.gather(*map(reader, fnames))
read_freqs = dict(zip(fnames, freqs))
return {
name: [w + r for w, r in zip(write_freqs[name], read_freqs[name])]
for name in fnames
}
async def test_executor():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, test_sync)
async def test_multi_job_executor():
async def sleeper():
while True:
await asyncio.sleep(0.01)
freqs, pending = await asyncio.wait(
(
asyncio.ensure_future(test_executor()),
asyncio.ensure_future(sleeper()),
),
return_when=asyncio.FIRST_COMPLETED,
)
for co in pending:
co.cancel()
return await list(freqs)[0]
def test_sync_one():
fname = f"{uuid4()}.txt"
write_freq = write_sync(fname)
read_freq = read_sync(fname)
return (fname, [w + r for w, r in zip(write_freq, read_freq)])
async def test_executor_parallel():
loop = asyncio.get_event_loop()
return dict(
await asyncio.gather(
*(loop.run_in_executor(None, test_sync_one) for _ in range(_files))
)
)
def timed_test(cb):
with tempfile.TemporaryDirectory(prefix="io_pref_test") as tmpdir:
os.chdir(tmpdir)
t = time.perf_counter()
ret = cb()
timed = time.perf_counter() - t
assert check(ret)
return timed
def check(freqs):
return all(all(v == 0 for v in f) for f in freqs.values())
def main():
stats = {}
counts = [1e3, 1e4, 1e5, 1e6]
print(f"Python version: {platform.python_version()}")
print(f"Platform: {platform.platform()}")
for package in ["aiofiles", "aiofile", "uvloop"]:
print(f"{package} version: {version(package)}")
print("aiofile default context", aiofile.aio.get_default_context())
targets = {
"sync": test_sync,
"async executor 'dumb'": lambda: asyncio.run(test_executor()),
"async executor w/ coroutines": lambda: asyncio.run(test_multi_job_executor()),
"async multiple executors": lambda: asyncio.run(test_executor_parallel()),
"async aiofiles": lambda: asyncio.run(test_async(read_aiofiles, write_aiofiles)),
"async aiofile": lambda: asyncio.run(test_async(read_aiofile, write_aiofile)),
"aiofiles@uvloop": lambda: uvloop.run(test_async(read_aiofiles, write_aiofiles)),
"aiofile@uvloop": lambda: uvloop.run(test_async(read_aiofile, write_aiofile)),
}
global _iters
for n in counts:
_iters = int(n)
iteration_stats = stats[n] = {}
print(f"Running tests for {n:.0e} iterations")
for target, cb in targets.items():
print(f"Running {target} test")
iteration_stats[target] = timed_test(cb)
table = [["iterations"] + list(stats[counts[0]].keys())]
for n_, iteration_stats in stats.items():
table.append([f"{n_:.0e}"] + [f"{round(v, 3)}" for v in iteration_stats.values()])
print(tabulate.tabulate(table, tablefmt="github", headers="firstrow"))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment