Skip to content

Instantly share code, notes, and snippets.

@domenukk
Created January 3, 2019 20:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save domenukk/716ea606e0ff915ef4a5d4ebb9f3cbf3 to your computer and use it in GitHub Desktop.
Save domenukk/716ea606e0ff915ef4a5d4ebb9f3cbf3 to your computer and use it in GitHub Desktop.
Calling Foreground Functions from Background Processes in Python
#!/bin/env python
"""
A simple example calling any functon in the foreground from background threads, including a response, using Queues.
"""
from multiprocessing import Queue, Process, cpu_count
from collections import namedtuple
import traceback
Event = namedtuple("Event", "thread_id function args kwargs")
def done(thread_id, request_queue, response_queue):
request_queue.put(Event(thread_id, "done", [], {}))
def call_from_bg(thread_id, request_queue, response_queue, func, *args, **kwargs):
request_queue.put(Event(thread_id, func, args, kwargs))
return response_queue.get()
def background_thread(thread_id, *queues):
try:
def call(func, *args, **kwargs):
return call_from_bg(thread_id, *queues, func, *args, **kwargs)
for i in range(1000):
print("{}: calling on main".format(thread_id))
resp = call("on_main", "{}: Hello from the foreground :) ({})".format(thread_id, i))
print("{}: Foreground responded: {}".format(thread_id, resp))
except Exception as ex:
print("An exception occurred: {}".format(ex))
traceback.print_exc()
finally:
done(thread_id, *queues)
# Example foo
counter = 0
def on_main(*args):
global counter
counter += 1
print(*args)
return "Already counted to: {}".format(counter)
def main_thread(n_threads):
request_queue = Queue()
response_queues = []
threads = []
done = []
for i in range(n_threads):
response_queue = Queue()
response_queues.append(response_queue)
threads.append(Process(target=background_thread, args=[i, request_queue, response_queue]))
threads[i].start()
while not len(done) == n_threads:
event = request_queue.get()
if event.function == "done":
done.append(threads[event.thread_id])
else:
try:
response_queues[event.thread_id].put(globals()[event.function](*event.args, **event.kwargs))
except Exception as ex:
response_queues[event.thread_id].put(ex)
for p in done:
p.join()
if __name__ == '__main__':
main_thread(cpu_count() * 2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment