Skip to content

Instantly share code, notes, and snippets.

@kzinglzy
Last active August 29, 2015 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kzinglzy/a0e7ddcc8e775fd00480 to your computer and use it in GitHub Desktop.
Save kzinglzy/a0e7ddcc8e775fd00480 to your computer and use it in GitHub Desktop.
# !/usr/bin/env python3
# coding: utf-8
import asyncio
import time
import fcntl
import os
import ctypes
import errno
""" Async file wrapper for asyncio.
Just a fork from https://gist.github.com/l04m33/1aa059b1a85c73bc7222. Thanks a lot.
"""
def is_async(result):
return asyncio.iscoroutine(result) or isinstance(result, asyncio.Future)
class AsyncFile:
""" Async file wrapper.
"""
MAX_BLOCK_SIZE = 8192
def __init__(self, *, loop=None, file=None, mode='rb'):
if 'b' not in file.mode:
raise RuntimeError('Only binary mode is supported')
fl = fcntl.fcntl(file, fcntl.F_GETFL)
if fcntl.fcntl(file, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
errcode = ctypes.get_errno()
raise OSError((errcode, errno.errorcode[errcode]))
self.file = file
self.loop = loop or asyncio.get_event_loop()
self.buffer = bytearray()
def seek(self, offset, whence=None):
return self.file.seek(offset) if whence is None else \
self.file.seek(offset, whence)
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self.loop)
if n == 0:
future.set_result(b'')
else:
max_size = self.MAX_BLOCK_SIZE
read_size = min(max_size, n) if n >= 0 else max_size
self.buffer.clear()
self.read_handler = self.loop.call_soon(self._read,
future, read_size, n)
return future
def _read(self, future, n, total):
try:
res = self.file.read(n)
except Exception as exc:
future.set_exception(exc)
else:
if res is None: # Blocked
self.read_handler = self.loop.call_soon(self._read,
future, n, total)
elif not res: # EOF
future.set_result(bytes(self.buffer))
else:
self.buffer.extend(res)
if total > 0:
read_more = total - len(self.buffer)
if read_more <= 0: # Enough
res, self.buffer = self.buffer[:n], self.buffer[n:]
future.set_result(bytes(res))
else:
read_more_size = min(self.MAX_BLOCK_SIZE, read_more)
self.read_handler = self.loop.call_soon(
self._read, future, read_more_size, total)
else:
self.read_handler = self.loop.call_soon(
self._read, future, self.MAX_BLOCK_SIZE,
total)
@asyncio.coroutine
def write(self, data):
future = asyncio.Future(loop=self.loop)
if len(data) == 0:
future.set_result(0)
else:
self.write_handler = self.loop.call_soon(self._write, future,
data, 0)
return future
def _write(self, future, data, written):
try:
size = self.file.write(data)
except BlockingIOError:
self.write_handler = self.loop.call_soon(self._write, future,
data, written)
except Exception as exc:
future.set_exception(exc)
else:
total = written + size
if size < len(data):
data = data[size:]
self.write_handler = self.loop.call_soon(self._write, future,
data, total)
else:
future.set_result(total)
@asyncio.coroutine
def copy_to(self, dest, copy_len=-1):
copied_size = 0
max_size = self.MAX_BLOCK_SIZE
while copy_len != 0:
read_size = min(copy_len, max_size) if copy_len > 0 else max_size
rcontent = yield from self.read(read_size)
rlen = len(rcontent)
if rlen <= 0:
break
write_res = dest.write(rcontent)
if is_async(write_res):
yield from write_res
copied_size += rlen
copy_len = copy_len - len(rcontent) if copy_len > 0 else copy_len
return copied_size
def close(self):
self.file.close()
if hasattr(self, 'read_handler'):
self.read_handler.cancel()
if hasattr(self, 'write_handler'):
self.write_handler.cancel()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, trace):
self.close()
def async_file_read(file_path, n=0, loop=None):
""" Async read a file.
"""
def _read(file, n, output):
with open(file_path, 'rb') as f:
output.append(f.read(n))
buffer = []
loop = loop or asyncio.get_event_loop()
yield from loop.run_in_executor(None, _read, file_path,
n or 8192, buffer)
return buffer[0]
# ------------- TEST ---------------
@asyncio.coroutine
def test_async_file():
name = '/web/tmp/test.txt'
with open(name, 'rb') as f:
f = AsyncFile(file=f)
res = yield from f.read()
return res
@asyncio.coroutine
def test_async_read():
name = '/web/tmp/test.txt'
return (yield from async_file_read(name))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
t = time.time()
for i in range(1000):
# loop.run_until_complete(test_async_file())
loop.run_until_complete(test_async_read())
print(time.time() - t)
#---------Simple result----------
# Usetime AsyncFile: 1.4s
# Usetime async_file_read: 2.4s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment