Skip to content

Instantly share code, notes, and snippets.

Created September 4, 2020 01:20
What would you like to do?
# Asynchronous file open/close
import asyncio
class _AsyncOpen():
def __init__(self, args, kwargs):
self._args = args
self._kwargs = kwargs
def __aenter__(self):
def thread_open():
return open(*self._args, **self._kwargs)
loop = asyncio.get_event_loop()
self._future = loop.run_in_executor(None, thread_open)
return self._future
async def __aexit__(self, exc_type, exc, tb):
file = await self._future
def thread_close():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, thread_close)
def aopen(*args, **kwargs):
return _AsyncOpen(args, kwargs)
## Test / demo
async def heartbeat():
while True:
await asyncio.sleep(0.5)
async def write(i):
async with aopen(f'/tmp/{i}', 'w') as out:
print(i, file=out)
async def main():
beat = asyncio.create_task(heartbeat())
tasks = [asyncio.create_task(write(i)) for i in range(5)]
await asyncio.gather(*tasks)
/* $ cc -shared -fPIC -o open64.c */
#define _GNU_SOURCE
#include <dlfcn.h>
#include <string.h>
#include <unistd.h>
open64(const char *path, int flags, int mode)
if (!strncmp(path, "/tmp/", 5)) {
int (*orig)(const char *, int, int) = dlsym(RTLD_NEXT, "open64");
return orig(path, flags, mode);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment