Created
June 6, 2017 10:10
-
-
Save FMNSSun/641ce44a8963d07dfe6d941c56c210b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bson | |
import datetime | |
import sys | |
import queue | |
import threading | |
import time | |
path = lambda x: x['path'] | |
target = lambda x: x['target'] | |
condition = lambda x: x['condition'] | |
paths = lambda x: map(path, x) | |
targets = lambda x: map(target, x) | |
conditions = lambda x: map(condition, x) | |
const = lambda a: lambda b: a | |
class IteratorThread(threading.Thread): | |
def __init__(self, source, dest = None): | |
threading.Thread.__init__(self) | |
self.source = source | |
self.queue = queue.Queue(maxsize = 128) | |
self.dest = dest | |
self.daemon = True | |
def run(self): | |
print("Iterator start") | |
for item in self.source: | |
if self.dest == None: | |
self.queue.put(item) | |
else: | |
self.dest.put(item) | |
print("Iterator end") | |
def to_generator(self): | |
while True: | |
try: | |
item = self.queue.get(timeout=1) | |
yield item | |
except GeneratorExit: | |
break | |
except: | |
if self.is_alive(): | |
continue | |
else: | |
break | |
class IntersectorThread(threading.Thread): | |
def __init__(self, p, source_left, source_right): | |
threading.Thread.__init__(self) | |
self.left_queue = queue.Queue(maxsize = 8) | |
self.right_queue = queue.Queue(maxsize = 16) | |
self.left_thread = IteratorThread(source_left, self.left_queue) | |
self.right_thread = IteratorThread(source_right, self.right_queue) | |
self.p = p | |
self.queue = queue.Queue(maxsize = 256) | |
self.left_thread.start() | |
self.right_thread.start() | |
self.daemon = True | |
def run(self): | |
rs = {} | |
check_left = True | |
check_right = True | |
while True: | |
counter = 2 | |
if check_left: | |
try: | |
l_item = self.p(self.left_queue.get(timeout=1)) | |
counter -= 1 | |
if l_item in rs: | |
if rs[l_item] == 2: | |
rs[l_item] = 3 | |
self.queue.put(l_item) | |
else: | |
rs[l_item] = 1 | |
except Exception as e: | |
print(e) | |
if check_right: | |
try: | |
r_item = self.p(self.right_queue.get(timeout=1)) | |
counter -= 1 | |
if r_item in rs: | |
if rs[r_item] == 1: | |
rs[r_item] = 3 | |
self.queue.put(r_item) | |
else: | |
rs[r_item] = 2 | |
except Exception as e: | |
print(e) | |
if counter > 0: | |
if not self.left_thread.is_alive() and self.right_thread.is_alive(): | |
check_left = False | |
continue | |
if not self.right_thread.is_alive() and self.left_thread.is_alive(): | |
check_right = False | |
continue | |
else: | |
print("Both subthreads are dead") | |
break | |
def to_generator(self): | |
while True: | |
try: | |
item = self.queue.get(timeout=1) | |
yield item | |
except GeneratorExit: | |
break | |
except: | |
if self.left_thread.is_alive() or self.right_thread.is_alive(): | |
continue | |
else: | |
break | |
def _par(it): | |
th = IteratorThread(it) | |
th.start() | |
return th.to_generator() | |
def _par_consume(it): | |
return _consume(_par(it)) | |
def _consume(it): | |
xs = [] | |
for x in it: | |
xs.append(x) | |
return xs | |
def _source(src): | |
f = open(sys.argv[1],'rb') | |
it = bson.decode_file_iter(f) | |
return _feed(it) | |
def equals(p, r): | |
def equals_(x): | |
return p(x) == r(x) | |
return equals_ | |
def filter(p): | |
def filter_(xs): | |
for x in xs: | |
if p(x): | |
yield x | |
else: | |
continue | |
return filter_ | |
def _copy(dic, cond): | |
ndic = {} | |
for key in dic: | |
if key == 'conditions': | |
continue | |
ndic[key] = dic[key] | |
ndic['target'] = ndic['path'][-1] | |
ndic['source'] = ndic['path'][0] | |
ndic['condition'] = cond | |
return ndic | |
def _unwind(dic): | |
xs = [] | |
for condition in dic['conditions']: | |
x = _copy(dic, condition) | |
xs.append(x) | |
return xs | |
def _feed(it): | |
print("New feeder") | |
for doc in it: | |
docs = _unwind(doc) | |
for _doc in docs: | |
if not _doc['action_ids'][0]['valid']: | |
continue | |
yield _doc | |
print("Feeder done") | |
def intersect_on(p, a, b): | |
def intersect_on_(src): | |
intersector = IntersectorThread(p, a(_source(src)), b(_source(src))) | |
intersector.start() | |
return intersector.to_generator() | |
return intersect_on_ | |
def run(f, src, take = None): | |
if take != None: | |
i = 0 | |
that = f(_source(src)) | |
for item in that: | |
yield item | |
i += 1 | |
if i >= take: | |
break | |
else: | |
for item in f(_source(src)): | |
yield item | |
query_works = filter(equals(condition, const("ecn.connectivity.works"))) | |
query_broken = filter(equals(condition, const("ecn.connectivity.broken"))) | |
both = intersect_on(target, query_works, query_broken) | |
result = run(both, sys.argv[1], 8192) | |
for r in result: | |
print(r) | |
print("done") | |
quit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment