Skip to content

Instantly share code, notes, and snippets.

@njouanin
Created October 5, 2014 09:02
Show Gist options
  • Save njouanin/9d2d1ce85bf5ed4c2540 to your computer and use it in GitHub Desktop.
Save njouanin/9d2d1ce85bf5ed4c2540 to your computer and use it in GitHub Desktop.
Asyncio test for file copy
import asyncio
import os
import io
import logging
import time
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
num_tests = 100
source_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), "./data")
dest_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), "./data.copy")
def test_copy_async(full_async):
_loop = asyncio.get_event_loop()
_loop.run_until_complete(do_copy_async(_loop, full_async))
def do_copy_async(_loop, full_async):
(fr, fw) = yield from asyncio.gather(
open_async(_loop, full_async, source_file, 'r'),
open_async(_loop, full_async, dest_file, 'w'))
lines = yield from readlines_async(_loop, full_async, fr)
yield from do_something_async(lines)
yield from writelines_async(fw, full_async, lines, _loop)
asyncio.gather(close_async(_loop, full_async, fr), close_async(_loop, full_async, fw))
def test_copy_sync():
fr = open(source_file, 'r')
fw = open(dest_file, 'w')
lines = fr.readlines()
do_something(lines)
fw.writelines(lines)
fr.close()
fw.close()
def do_something(lines):
pass
time.sleep(0.1)
@asyncio.coroutine
def do_something_async(lines):
logger.debug("BEGIN do_something_async()")
yield from asyncio.sleep(0.1)
logger.debug("END do_something_async()")
@asyncio.coroutine
def open_async(_loop, full_async, file, mode='r'):
logger.debug("BEGIN open_async(%s)" % file)
try:
if full_async:
fd = yield from _loop.run_in_executor(None, io.open, file, mode)
else:
fd = io.open(file, mode)
except Exception as e:
fd = None
logger.debug("END open_async(%s)" % file)
return fd
@asyncio.coroutine
def close_async(_loop, full_async, file_object):
if file_object is not None:
name = file_object.name
logger.debug("BEGIN close_async(%s)" % name)
if full_async:
try:
_loop.call_soon(file_object.close)
except Exception as e:
logger.warning("Erreur", e)
else:
file_object.close()
logger.debug("END close_async(%s)" % name)
@asyncio.coroutine
def writelines_async(file_object, full_async, lines, _loop):
logger.debug("BEGIN writelines_async()")
if full_async:
yield from _loop.run_in_executor(None, file_object.writelines, lines)
else:
file_object.writelines(lines)
logger.debug("END writelines_async()")
@asyncio.coroutine
def readlines_async(_loop, full_async, file_object):
logger.debug("BEGIN readlines_async(%s)" % file_object.name)
if full_async:
lines = yield from _loop.run_in_executor(None, file_object.readlines)
else:
lines = file_object.readlines()
logger.debug("END readlines_async(%s)" % file_object.name)
return lines
if __name__ == '__main__':
test_copy_async(False)
logger.info("------")
test_copy_async(True)
exit()
logger.info("Running %d Sync/Async copy compare" % num_tests)
import timeit
tps = timeit.timeit("test_copy_sync()", setup="from __main__ import test_copy_sync", number=num_tests)
logger.info("Classic copy: %fs" % tps)
os.remove(dest_file)
tps = timeit.timeit("test_copy_async(False)", setup="import asyncio;from __main__ import test_copy_async", number=num_tests)
loop = asyncio.get_event_loop()
logger.info("Asyncio copy with blocking I/O: %fs" % tps)
os.remove(dest_file)
tps = timeit.timeit("test_copy_async(True)", setup="import asyncio;from __main__ import test_copy_async", number=num_tests)
logger.info("Asyncio copy with non blocking I/O: %fs" % tps)
os.remove(dest_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment