Skip to content

Instantly share code, notes, and snippets.

@Delaunay
Created March 6, 2024 15:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Delaunay/746f9d72142fbc3741bdcb795d6be129 to your computer and use it in GitHub Desktop.
Save Delaunay/746f9d72142fbc3741bdcb795d6be129 to your computer and use it in GitHub Desktop.
SHR_ON = 1
def _worker(in_queue, out_queue, shared_state, cls, cls_args):
shared_state[SHR_ON] = 1
with cls(*cls_args, out_queue=out_queue) as handler:
while shared_state[SHR_ON]:
try:
args, kwargs = in_queue.get(True, timeout=0.01)
handler(args, kwargs)
except Empty:
continue
except Exception:
traceback.print_exc()
class ProcWorker:
def __init__(self, observer_cls, observer_args=tuple(), size=20):
self.mm = multiprocessing.Manager()
self.in_queue = self.mm.Queue(size)
self.out_queue = self.mm.Queue(size)
self.state = self.mm.dict()
self.worker = None
self.observer_cls = observer_cls
self.observer_args = observer_args
def __enter__(self):
self.mm.__enter__()
self.state[SHR_ON] = 0
self._init_worker()
self._wait_worker_init()
return self
def _init_worker(self):
self.worker = multiprocessing.Process(
target=_worker,
args=(
self.in_queue,
self.out_queue,
self.state,
self.observer_cls,
self.observer_args
),
)
self.worker.start()
def _wait_worker_init(self, timeout=None):
s = time.time()
while True:
is_ready = self.state[SHR_ON]
if is_ready:
break
if timeout is not None and (time.time() - s > timeout):
raise TimeoutError()
def send(self, *args, **kwargs):
self.in_queue.put((args, kwargs))
def wait(self, timeout=None):
s = time.time()
while self.state[SHR_ON]:
if self.in_queue.empty():
break
if timeout is not None and (time.time() - s > timeout):
raise TimeoutError()
def wait_output(self, timeout=None):
s = time.time()
while self.state[SHR_ON]:
if timeout is not None and (time.time() - s > timeout):
raise TimeoutError()
try:
return self.out_queue.get_nowait()
except Empty:
continue
def __exit__(self, *args):
self.wait()
self.state[SHR_ON] = 0
self.worker.join()
return self.mm.__exit__(*args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment