Skip to content

Instantly share code, notes, and snippets.

@eliquious
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliquious/d562c8f691804568edfe to your computer and use it in GitHub Desktop.
Save eliquious/d562c8f691804568edfe to your computer and use it in GitHub Desktop.
IPC tributary POC
from tributary.core import Engine, Message, ExecutionContext, Service
from tributary.streams import StreamElement, StreamProducer
from tributary.events import StopMessage, STOP
import tributary.ext.fs as fs
import tributary.ext.ipc as ipc
from collections import defaultdict
import operator
import logging
import pyhash
import gevent
hasher = pyhash.fnv1a_64()
class FileLogger(StreamElement):
"""docstring for FileLogger"""
def __init__(self, name):
super(FileLogger, self).__init__(name)
def process(self, message):
self.log(message.data.get('filename', None))
def child_main(cend):
"""Main of child process"""
meta = cend.get()
logfile = meta.data.get('logfile', None)
name = meta.data.get('name', None)
ID = meta.data.get('id', None)
if not logfile:
cend.put(status_message('error', 'Missing "logfile" information'))
elif not name:
cend.put(status_message('error', 'Missing "name" information'))
elif not ID:
cend.put(status_message('error', 'Missing "ID" information'))
# create internal process engine
eng = Engine()
# IPC msg handler
child = ipc.IPCSubscriber('IPC #%s' % ID, cend)
child.add(ChildFileProcessor(name, cend, ID))
eng.add(child)
eng.start()
class ChildFileProcessor(StreamElement):
"""docstring for ChildFileProcessor"""
def __init__(self, name, cend, id):
super(ChildFileProcessor, self).__init__(name)
self.cend = cend
self.id = id
def postProcess(self, msg):
self.log('Exiting...')
def process(self, message):
filename = message.data.get('filename', None)
if filename is not None:
hashes = defaultdict(int)
with open(filename) as fh:
self.log('Reading: %s' % filename)
for line in fh:
hashes[hasher(line)] += 1
# ack
self.cend.put(status_message('ok', 'Completed: %s' % filename, hashes=hashes))
def status_message(msgtype, msg, **kwargs):
"""Compose status message"""
return Message(type=msgtype, message=msg, **kwargs)
class FileDispatcher(ipc.IPCDispatcher):
"""docstring for FileDispatcher"""
def __init__(self, name, factory, num, workers):
super(FileDispatcher, self).__init__(name, factory)
self.id = num
self.workers = workers
def onConnection(self):
self.log('Sending initialization data')
self.pipe.put(Message(logfile='child.txt', name='Child Process #%s' % (self.id+1), id=self.id+1))
self.getContext().getService('HashAggregator').open(self.name)
def process(self, message):
filename = message.data.get('filename', None)
if filename is not None:
self.pipe.put(message)
# self.log('Waiting for response...')
response = self.pipe.get()
hashes = response.data.get('hashes', None)
if hashes is not None:
self.getContext().getService('HashAggregator').addAll(hashes)
# self.log(response.data)
def onClose(self):
self.getContext().getService('HashAggregator').close(self.name)
class NaivePool(StreamElement):
"""docstring for NaivePool"""
def __init__(self, name):
super(NaivePool, self).__init__(name)
self.offset = -1
def process(self, msg):
self.offset += 1
for index, element in sorted(self._children.values()):
if self.offset % len(self) == index:
element.insert(msg)
class HashAggregator(Service):
"""docstring for HashAggregator"""
def __init__(self, name, outputfile):
super(HashAggregator, self).__init__(name)
self.hashagg = defaultdict(int)
self.processes = 0
self.outputfile = outputfile
def open(self, name):
self.processes += 1
def close(self, name):
self.processes -= 1
def addAll(self, hashes):
for k, v in hashes.iteritems():
self.hashagg[k] += v
def join(self):
while self.processes > 0:
gevent.sleep(0.1)
self.writeFile()
def writeFile(self):
self.log_info('Logging output')
with open(self.outputfile, 'w') as fh:
for k, v in self:
fh.write('{0:<22} : {1}\n'.format(k, v))
self.log_info('Completed output')
def __iter__(self):
for k,v in sorted(self.hashagg.items(), key=lambda x: x[1], reverse=True):
yield (k, v)
if __name__ == '__main__':
ctx = ExecutionContext()
agg = HashAggregator('HashAggregator', 'output.txt')
ctx.addService(agg)
# create engine
engine = Engine(ctx)
files = fs.GlobFileStream("Data Files", "../tributary-dist/parallel-gevent/data/*txt")
files.add(FileLogger("Logger"))
factory = ipc.SimpleEngineFactory(child_main)
pool = NaivePool('Pool')
workers = 8
for i in xrange(workers):
pool.add(FileDispatcher("Dispatcher #{0}".format(i+1), factory, i, workers))
files.add(pool)
engine.add(files)
engine.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment