Skip to content

Instantly share code, notes, and snippets.

@piroyoung
Last active March 27, 2018 17:06
Show Gist options
  • Save piroyoung/bd03d87123cbfcb8dfcb3f999b311bb7 to your computer and use it in GitHub Desktop.
Save piroyoung/bd03d87123cbfcb8dfcb3f999b311bb7 to your computer and use it in GitHub Desktop.
シングルノードマルチスレッドでサクッとパイプライン処理できたらいいなと思ったけど,普通にアクターモデルとかに乗っかった方がいい気がするのでボツ
import queue
import threading
from typing import Callable, Generator, Any
_TERM = threading.Event()
class Worker(threading.Thread):
def __init__(self, source: queue.Queue, destination: queue.Queue, batch_size: int,
callback: Callable[[Generator], Any]):
self._callback = callback
self._batch_size = batch_size
self._source = source
self._destination = destination
super().__init__()
def _batching(self):
for _ in range(self._batch_size):
yield self._source.get()
def run(self):
while not _TERM.is_set():
self._destination.put(self._callback(self._batching()))
class Step:
def __init__(self, source: queue.Queue, batch_size: int, callback: Callable[[Generator], Any]):
self._source = source
self._destination = queue.Queue()
self._batch_size = batch_size
self._callback = callback
self._workers = [self._get_worker() for _ in range(4)]
for w in self._workers:
w.start()
@property
def destination(self):
return self._destination
def _get_worker(self) -> Worker:
return Worker(self._source, self._destination, self._batch_size, self._callback)
def put(self, row: Any) -> None:
self._source.put(row)
def pipe(self, batch_size: int, callback: Callable):
return Step(self.destination, batch_size, callback)
def to_list(gen):
l = []
for row in gen:
l.append(row)
return l
def sum_each(gen):
l = []
for row in gen:
l.append(sum(row))
return l
source = queue.Queue()
for i in range(1000):
source.put(i)
result = Step(source, 10, to_list).pipe(20, sum_each).destination
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment