Skip to content

Instantly share code, notes, and snippets.

@mightymercado
Last active March 2, 2024 09:02
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mightymercado/4efba1f070a6ba6526c3e237f0eb0443 to your computer and use it in GitHub Desktop.
Save mightymercado/4efba1f070a6ba6526c3e237f0eb0443 to your computer and use it in GitHub Desktop.
Send data to a multiprocessing.Process running an asyncio event loop
# A rare scenario: Communicate to a child process that's running an event loop
import asyncio
from asyncio import StreamReader, StreamReaderProtocol
from multiprocessing import Process
import os
class Worker:
def __init__(self):
self.read_fd, self.write_fd = os.pipe()
self.writer = os.fdopen(self.write_fd, 'wb', 0)
def start(self):
os.set_inheritable(self.read_fd, True)
Process(target=Worker.worker,
args=(self.read_fd, )) \
.start()
os.close(self.read_fd)
@staticmethod
async def do_task(task_number):
pass
@staticmethod
async def accept_tasks(read_fd):
loop = asyncio.get_running_loop()
reader = StreamReader()
protocol = lambda: StreamReaderProtocol(reader)
transport, _ = await loop.connect_read_pipe(
protocol, os.fdopen(read_fd, 'rb', 0))
while True:
task_number = int(await reader.readline())
loop.create_task(Worker.do_task(task_number))
transport.close()
@staticmethod
def worker(read_fd):
loop = asyncio.get_event_loop()
loop.run_until_complete(Worker.accept_tasks(read_fd))
def add(self, task):
self.writer.write((str(task) + '\n').encode())
worker = Worker()
worker.start()
worker.add('1')
worker.add('2')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment