Skip to content

Instantly share code, notes, and snippets.

@tomotaka
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomotaka/9231092 to your computer and use it in GitHub Desktop.
Save tomotaka/9231092 to your computer and use it in GitHub Desktop.
cat test.txt | python ./stream_text_processor.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import hashlib
import gevent
import gevent.queue
import gevent.fileobject as gfo
# class AbstractSource(object):
# def open(self):
# pass
# class FileSource(AbstractSource):
# def __init__(self, path):
# self.path = path
# def open(self):
# fd = open(self.path, 'rb')
# return gfo.FileObject(fd)
# class StdinSource(AbstractSource):
# def open(self):
# return gfo.FileObject(sys.stdin)
class AbstractProcessor(object):
name = 'processor'
def process(self, read_pipe, write_pipe):
pass
def debug(self, msg):
print '[%s] %s' % (self.name, msg)
def _close(self, fd):
try:
fd.close()
# pass
except:
pass
class ProcessorHead(AbstractProcessor):
def __init__(self, n):
self.n = n
def process(self, read_pipe, write_pipe):
for i in range(self.n):
line = read_pipe.readline()
write_pipe.write(line)
write_pipe.flush()
self.debug('[head] wrote %s' % line.rstrip())
write_pipe.flush()
# write_pipe.close()
self._close(write_pipe)
# read_pipe.close()
self._close(read_pipe)
self.debug('finished')
class ProcessorTranslate(AbstractProcessor):
def __init__(self, find_chars, replacement_chars):
self.tr_map = self._build_map(find_chars, replacement_chars)
def process(self, read_pipe, write_pipe):
for line in read_pipe:
translated_line = self._tr(line, self.tr_map)
write_pipe.write(translated_line)
self.debug('[tr] wrote %s' % translated_line.rstrip())
write_pipe.flush()
# write_pipe.close()
self._close(write_pipe)
# read_pipe.close()
self._close(read_pipe)
self.debug('finished')
def _tr(self, string, map):
buf = ''
i = 0
strlen = len(string)
while i < strlen:
c = string[i]
buf += map.get(c, c)
i += 1
return buf
def _build_map(self, fchars, rchars):
assert len(fchars) == len(rchars)
map = dict()
for i in range(len(fchars)):
map[fchars[i]] = rchars[i]
return map
class MultiProcessPipelineExecutor(object):
def __init__(self):
self.r_bufsize = 1024 * 10
self.w_bufsize = 1024 * 10
def execute(self, src, out, processors):
# called without any processor...
if len(processors) == 0:
for line in src:
out.write(line)
out.flush()
# src.close()
# out.close()
return
# decide id
task_id = self._gen_id()
# create pipes
num_processors = len(processors)
fifo_paths = self._gen_fifo_paths(task_id, num_processors)
self._prepare_fifos(fifo_paths)
# execute processors as child process
for i in range(num_processors):
if os.fork() == 0:
# prepare FIFOs
print '[processor-%d] hello' % (i+1)
r_fifo = self._open_read_fifo(fifo_paths[i])
w_fifo = self._open_write_fifo(fifo_paths[i+1])
processor = processors[i]
processor.name = 'processor-%d' % (i+1)
processor.process(r_fifo, w_fifo)
return
# parent: make initial stream from data source 'src'
print '[parent] reading source and putting it into fifo=0'
first_fifo = self._open_write_fifo(fifo_paths[0])
for line in src:
first_fifo.write(line)
print '[parent] src -> fifo1: %s' % line.rstrip()
# src.close()
first_fifo.flush()
self._close(first_fifo)
# parent: then waiting for result
last_fifo = self._open_read_fifo(fifo_paths[-1])
for line in last_fifo:
out.write(line)
print '[parent] fifo-last -> out: %s' % line.rstrip()
out.flush()
# out.close()
self._close(last_fifo)
# remove FIFO files
for fifo_path in fifo_paths:
try:
os.unlink(fifo_path)
except:
pass
print '[parent] all done.'
def _gen_fifo_paths(self, task_id, num_processors):
fifo_paths = []
for i in range(num_processors + 1):
number = i + 1
fifo_path = 'fifo-%s-%d' % (task_id, number)
fifo_paths.append(fifo_path)
return fifo_paths
def _gen_id(self):
return hashlib.md5(os.urandom(100)).hexdigest()
def _open_read_fifo(self, path):
# fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK)
fd = os.open(path, os.O_RDONLY)
return gfo.FileObject(os.fdopen(fd, 'rb', self.r_bufsize))
def _open_write_fifo(self, path):
fd = open(path, 'wb', self.w_bufsize)
return gfo.FileObject(fd)
def _prepare_fifos(self, fifo_paths):
for fifo_path in fifo_paths:
try:
os.unlink(fifo_path)
except:
pass
# fh = open(fifo_path, 'w')
# fh.close()
os.mkfifo(fifo_path)
print '[prepare] created fifo: %s' % fifo_path
def _close(self, fd):
try:
fd.close()
except:
pass
class GeventQueueWrapper(object):
def __init__(self, gevent_queue):
self.queue = gevent_queue
def readline(self):
line, termination = self.queue.get()
if termination is False:
return line
else:
raise StopIteration()
def write(self, line):
self.queue.put([line, False])
def flush(self):
pass
def close(self):
self.queue.put([None, True])
def next(self):
while True:
line, termination = self.queue.get()
if termination:
raise StopIteration()
else:
return line
def __iter__(self):
return self
class GeventPipelineExecutor(object):
def execute(self, src, out, processors):
# prepare fifo(queue)
p_num = len(processors)
fifos = [self._create_fifo() for i in range(p_num+1)]
# spawn processors
p_threads = []
for i in range(p_num):
r_fifo = fifos[i]
w_fifo = fifos[i+1]
processor = processors[i]
processor.name = 'processor-%d' % (i+1)
thread = gevent.spawn(processor.process, r_fifo, w_fifo)
p_threads.append(thread)
# start
first_fifo = fifos[0]
for line in src:
first_fifo.write(line)
print '[parent] src -> fifo1: %s' % line.rstrip()
first_fifo.flush() # actually no affect
first_fifo.close()
# end
last_fifo = fifos[-1]
for line in last_fifo:
out.write(line)
print '[parent] fifo-last -> out: %s' % line.rstrip()
out.flush()
def _create_fifo(self):
return GeventQueueWrapper(gevent.queue.Queue())
def main():
processors = [ProcessorHead(3), ProcessorTranslate("hw", "HW")]
executor = MultiProcessPipelineExecutor()
g_stdin = gfo.FileObject(sys.stdin)
g_stdout = gfo.FileObject(sys.stdout)
# executor.execute(g_stdin, g_stdout, processors)
gth = gevent.spawn(executor.execute, g_stdin, g_stdout, processors)
gth.join()
def main2():
processors = [ProcessorHead(3), ProcessorTranslate("hw", "HW")]
g_stdin = gfo.FileObject(sys.stdin)
g_stdout = gfo.FileObject(sys.stdout)
executor = GeventPipelineExecutor()
gth = gevent.spawn(executor.execute, g_stdin, g_stdout, processors)
gth.join()
if __name__ == '__main__':
# main()
main2()
@tomotaka
Copy link
Author

multiprocess
ちょっと実装が不安定な感じ...
cpuをたくさん食うような多段の処理ではgeventでは1コアしか使い切れないので、アドバンテージがあるかも

gevent
ちいさいファイルを、複数のコマンドのパイプ連結で大量に処理するようなシェルスクリプトをこれで置き換えたらプロセス起動のオーバーヘッドが少なくなって効率化できそう
実装は割とstableと思います。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment