Skip to content

Instantly share code, notes, and snippets.

@l04m33
Last active April 10, 2017 14:45
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l04m33/1aa059b1a85c73bc7222 to your computer and use it in GitHub Desktop.
Save l04m33/1aa059b1a85c73bc7222 to your computer and use it in GitHub Desktop.
Async file wrapper for asyncio
class AsyncFileWrapper(object):
DEFAULT_BLOCK_SIZE = 8192
def __init__(self, loop=None, filename=None,
fileobj=None, mode='rb'):
if (filename is None and fileobj is None) or \
(filename is not None and fileobj is not None):
raise RuntimeError('Confilicting arguments')
if filename is not None:
if 'b' not in mode:
raise RuntimeError('Only binary mode is supported')
fileobj = open(filename, mode=mode)
elif 'b' not in fileobj.mode:
raise RuntimeError('Only binary mode is supported')
fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
if filename is not None:
fileobj.close()
errcode = ctypes.get_errno()
raise OSError((errcode, errno.errorcode[errcode]))
self.fileobj = fileobj
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
self.rbuffer = bytearray()
def seek(self, offset, whence=None):
if whence is None:
return self.fileobj.seek(offset)
else:
return self.fileobj.seek(offset, whence)
def read_ready(self, future, n, total):
try:
res = self.fileobj.read(n)
except Exception as exc:
future.set_exception(exc)
return
if res is None: # Blocked
self.read_handle = self.loop.call_soon(self.read_ready, future, n, total)
return
if not res: # EOF
future.set_result(bytes(self.rbuffer))
return
self.rbuffer.extend(res)
if total > 0:
more_to_go = total - len(self.rbuffer)
if more_to_go <= 0: # enough
res, self.rbuffer = self.rbuffer[:n], self.rbuffer[n:]
future.set_result(bytes(res))
else:
more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
self.read_handle = self.loop.call_soon(self.read_ready, future, more_to_go, total)
else: # total < 0
self.read_handle = self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total)
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self.loop)
if n == 0:
future.set_result(b'')
return future
elif n < 0:
self.rbuffer.clear()
self.read_handle = self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n)
else:
self.rbuffer.clear()
read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
self.read_handle = self.loop.call_soon(self.read_ready, future, read_block_size, n)
return future
def write_ready(self, future, data, written):
try:
res = self.fileobj.write(data)
except BlockingIOError:
self.write_handle = self.loop.call_soon(self.write_ready, future, data, written)
return
except Exception as exc:
future.set_exception(exc)
return
if res < len(data):
data = data[res:]
self.write_handle = self.loop.call_soon(self.write_ready, future, data, written + res)
else:
future.set_result(written + res)
@asyncio.coroutine
def write(self, data):
future = asyncio.Future(loop=self.loop)
if len(data) > 0:
self.write_handle = self.loop.call_soon(self.write_ready, future, data, 0)
else:
future.set_result(0)
return future
@asyncio.coroutine
def copy_to(self, dest, copy_len=-1):
copied_size = 0
while copy_len != 0:
if copy_len >= 0:
read_size = min(copy_len, self.DEFAULT_BLOCK_SIZE)
else:
read_size = self.DEFAULT_BLOCK_SIZE
rcontent = yield from self.read(read_size)
rlen = len(rcontent)
if rlen <= 0:
break
write_res = dest.write(rcontent)
if isinstance(write_res, asyncio.Future) \
or asyncio.iscoroutine(write_res):
yield from write_res
copied_size += rlen
copy_len = copy_len - len(rcontent) if copy_len > 0 else copy_len
return copied_size
def close(self):
self.fileobj.close()
if hasattr(self, 'read_handle'):
self.read_handle.cancel()
if hasattr(self, 'write_handle'):
self.write_handle.cancel()
@kzinglzy
Copy link

Hi,  今晚翻邮件, 看到一年前因为这代码向你发过邮件.
很抱歉没有在你回复我的时候就私自使用修改了这个gist的代码(个人项目)
如果不被允许, 请联系我吧: )
谢谢.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment