Skip to content

Instantly share code, notes, and snippets.

@jaycosaur
Last active February 1, 2020 12:16
Show Gist options
  • Save jaycosaur/d62f61ad9617e0dbbef0b651feff3239 to your computer and use it in GitHub Desktop.
Save jaycosaur/d62f61ad9617e0dbbef0b651feff3239 to your computer and use it in GitHub Desktop.
Select statement golang emulation in python.
from queue import Queue
from threading import Thread
from typing import Any, List, Iterator
class PipeClose:
pass
class PipeExtra:
"""Like QueuePipe however message on output queue is a tuple containing input queue and sent message (Queue, Message)
To be used by Select class.
"""
def __init__(self, queue_in: Queue, queue_out: Queue) -> None:
self._queue_in = queue_in
self._queue_out = queue_out
self.thread = Thread(target=self._pipe, args=(self._queue_in, self._queue_out,))
self.thread.start()
def _pipe(self, queue_in: Queue, queue_out: Queue) -> None:
while True:
mes_in = queue_in.get()
queue_out.put((queue_in, mes_in))
if isinstance(mes_in, PipeClose):
return
def stop(self) -> None:
self._queue_in.put(PipeClose())
class Select:
"""Analogue to select in Golang to be able to wait on multiple queues simultaneously.
This is similar in functionality to the builtin python select
but acting on queues instead of unix pipes.
Usage:
q1 = Queue()
q2 = Queue()
kill = Queue()
queue_select = Select(q1, q2, kill)
for (which_q, message) in queue_select:
if which_q is q1:
print(f"I got a message {message} from queue 1")
elif which_q is q2:
print(f"I got a message {message} from queue 2")
elif which_q is kill:
print("I got a message from the killer queue! Stopping now")
queue_select.stop()
"""
def __init__(
self, *queues: Queue,
):
self.__multiplexed_queue = Queue(maxsize=0)
self._input_queues = queues
self._pipes: List[PipeExtra] = [
PipeExtra(q_in, self.__multiplexed_queue) for q_in in self._input_queues
]
self.is_stopped = False
def stop(self) -> None:
for pipe in self._pipes:
pipe.stop()
self.is_stopped = True
def ouput(self) -> Queue:
return self.__multiplexed_queue
def __call__(self) -> Any:
return self.__multiplexed_queue.get()
def __next__(self) -> Any:
if self.is_stopped:
raise StopIteration
return self.__multiplexed_queue.get()
def __iter__(self) -> Iterator[Any]:
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment