Skip to content

Instantly share code, notes, and snippets.

@eliquious
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliquious/f3636f3b8048d496acc9 to your computer and use it in GitHub Desktop.
Save eliquious/f3636f3b8048d496acc9 to your computer and use it in GitHub Desktop.
Tributary with IPC
import tributary
from tributary.core import Engine, Message
from tributary.streams import StreamElement, StreamProducer
from tributary.events import StopMessage, STOP
import tributary.ext.fs as fs
import decimal
import time
import logging
import pyhash
import gipc
class FileLogger(StreamElement):
"""docstring for FileLogger"""
def __init__(self, name):
super(FileLogger, self).__init__(name)
def process(self, message):
self.log(message.data.get('filename', None))
class IPCProducer(StreamProducer):
"""docstring for IPCProducer"""
def __init__(self, name, cend):
super(IPCProducer, self).__init__(name)
self.cend = cend
def process(self, message=None):
"""Recieves message from IPC"""
while self.running:
message = self.cend.get()
# self.log(message.channel + ': ' + str(message))
if message.channel == STOP:
self.stop()
break
# send to child nodes
self.scatter(message)
def handleException(self, ex):
self.log_exception('IPC')
class ChildFileProcessor(StreamElement):
"""docstring for ChildFileProcessor"""
def __init__(self, name, cend, id):
super(ChildFileProcessor, self).__init__(name)
self.cend = cend
self.id = id
def process(self, message):
filename = message.data.get('filename', None)
if filename is not None:
with open(filename) as fh:
self.log('Reading: %s' % filename)
for line in fh:
pass
# ack
self.cend.put(status_message('ok', 'Completed: %s' % filename))
def status_message(msgtype, msg):
"""Compose status message"""
return Message(type=msgtype, message=msg)
def file_processor(cend):
"""Main of child process"""
meta = cend.get()
logfile = meta.data.get('logfile', None)
name = meta.data.get('name', None)
ID = meta.data.get('id', None)
if not logfile:
cend.put(status_message('error', 'Missing "logfile" information'))
elif not name:
cend.put(status_message('error', 'Missing "name" information'))
elif not ID:
cend.put(status_message('error', 'Missing "ID" information'))
# create internal process engine
eng = Engine()
# IPC msg handler
ipc = IPCProducer('IPC #%s' % ID, cend)
ipc.add(ChildFileProcessor(name, cend, ID))
eng.add(ipc)
eng.start()
hasher = pyhash.fnv1a_64()
class FileDispatcher(StreamElement):
"""docstring for FileDispatcher"""
def __init__(self, name, num):
super(FileDispatcher, self).__init__(name)
self.name = name
self.id = num
cend, pend = gipc.pipe(duplex=True)
self.pend = pend
self.child = gipc.start_process(file_processor, args=(cend,))
# init child
self.pend.put(Message(logfile='child.txt', name='Child Process #%s' % (num+1), id=num+1))
def process(self, message):
filename = message.data.get('filename', None)
if filename is not None:
hash = hasher(filename)
if (hash % 8) == self.id:
self.pend.put(message)
response = self.pend.get()
# self.log(response.data)
def postProcess(self, message=None):
self.log('Stopping child process')
self.pend.put(StopMessage)
self.log('Waiting for child process to stop')
# self.pend.put(Message(filename=None))
self.child.join()
self.log('Child process stopped')
if __name__ == '__main__':
engine = Engine()
files = fs.GlobFileStream("Data Files", "./data/*txt")
# files.add(FileLogger("Logger"))
files.add(FileDispatcher("Dispatcher #1", 0))
files.add(FileDispatcher("Dispatcher #2", 1))
files.add(FileDispatcher("Dispatcher #3", 2))
files.add(FileDispatcher("Dispatcher #4", 3))
files.add(FileDispatcher("Dispatcher #5", 4))
files.add(FileDispatcher("Dispatcher #6", 5))
files.add(FileDispatcher("Dispatcher #7", 6))
files.add(FileDispatcher("Dispatcher #8", 7))
engine.add(files)
engine.start()
import os
import random
import cStringIO
import string
# data domain
alphabet = string.octdigits + 'abcdef'
def generate_source_file(filename, count, size):
"""Generates a file of fake data"""
possible = []
with open (filename, 'w') as fh:
for i in xrange(count):
possible.append(''.join(random.sample(alphabet, size)))
# write to file
fh.write('\n'.join(possible))
# return truth
return possible
def generate_sample_file(filename, sample):
"""Generates a file of fake data"""
with open (filename, 'w') as fh:
fh.write('\n'.join(sample))
def main(directory, filecount, combinations, chars, sample):
"""Executes the data creation and processing"""
if not os.path.exists(directory):
os.mkdir(directory)
# generate source file
possible = generate_source_file('source.txt', combinations, chars)
# generate sample files
for i in xrange(filecount):
generate_sample_file(os.path.join(directory, '%s.txt' % i), random.sample(possible, sample))
if __name__ == '__main__':
main(directory='./data', filecount=500, combinations=1000000, chars=10, sample=100000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment