Skip to content

Instantly share code, notes, and snippets.

@FMNSSun
Created June 6, 2017 10:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FMNSSun/641ce44a8963d07dfe6d941c56c210b3 to your computer and use it in GitHub Desktop.
Save FMNSSun/641ce44a8963d07dfe6d941c56c210b3 to your computer and use it in GitHub Desktop.
import bson
import datetime
import sys
import queue
import threading
import time
path = lambda x: x['path']
target = lambda x: x['target']
condition = lambda x: x['condition']
paths = lambda x: map(path, x)
targets = lambda x: map(target, x)
conditions = lambda x: map(condition, x)
const = lambda a: lambda b: a
class IteratorThread(threading.Thread):
def __init__(self, source, dest = None):
threading.Thread.__init__(self)
self.source = source
self.queue = queue.Queue(maxsize = 128)
self.dest = dest
self.daemon = True
def run(self):
print("Iterator start")
for item in self.source:
if self.dest == None:
self.queue.put(item)
else:
self.dest.put(item)
print("Iterator end")
def to_generator(self):
while True:
try:
item = self.queue.get(timeout=1)
yield item
except GeneratorExit:
break
except:
if self.is_alive():
continue
else:
break
class IntersectorThread(threading.Thread):
def __init__(self, p, source_left, source_right):
threading.Thread.__init__(self)
self.left_queue = queue.Queue(maxsize = 8)
self.right_queue = queue.Queue(maxsize = 16)
self.left_thread = IteratorThread(source_left, self.left_queue)
self.right_thread = IteratorThread(source_right, self.right_queue)
self.p = p
self.queue = queue.Queue(maxsize = 256)
self.left_thread.start()
self.right_thread.start()
self.daemon = True
def run(self):
rs = {}
check_left = True
check_right = True
while True:
counter = 2
if check_left:
try:
l_item = self.p(self.left_queue.get(timeout=1))
counter -= 1
if l_item in rs:
if rs[l_item] == 2:
rs[l_item] = 3
self.queue.put(l_item)
else:
rs[l_item] = 1
except Exception as e:
print(e)
if check_right:
try:
r_item = self.p(self.right_queue.get(timeout=1))
counter -= 1
if r_item in rs:
if rs[r_item] == 1:
rs[r_item] = 3
self.queue.put(r_item)
else:
rs[r_item] = 2
except Exception as e:
print(e)
if counter > 0:
if not self.left_thread.is_alive() and self.right_thread.is_alive():
check_left = False
continue
if not self.right_thread.is_alive() and self.left_thread.is_alive():
check_right = False
continue
else:
print("Both subthreads are dead")
break
def to_generator(self):
while True:
try:
item = self.queue.get(timeout=1)
yield item
except GeneratorExit:
break
except:
if self.left_thread.is_alive() or self.right_thread.is_alive():
continue
else:
break
def _par(it):
th = IteratorThread(it)
th.start()
return th.to_generator()
def _par_consume(it):
return _consume(_par(it))
def _consume(it):
xs = []
for x in it:
xs.append(x)
return xs
def _source(src):
f = open(sys.argv[1],'rb')
it = bson.decode_file_iter(f)
return _feed(it)
def equals(p, r):
def equals_(x):
return p(x) == r(x)
return equals_
def filter(p):
def filter_(xs):
for x in xs:
if p(x):
yield x
else:
continue
return filter_
def _copy(dic, cond):
ndic = {}
for key in dic:
if key == 'conditions':
continue
ndic[key] = dic[key]
ndic['target'] = ndic['path'][-1]
ndic['source'] = ndic['path'][0]
ndic['condition'] = cond
return ndic
def _unwind(dic):
xs = []
for condition in dic['conditions']:
x = _copy(dic, condition)
xs.append(x)
return xs
def _feed(it):
print("New feeder")
for doc in it:
docs = _unwind(doc)
for _doc in docs:
if not _doc['action_ids'][0]['valid']:
continue
yield _doc
print("Feeder done")
def intersect_on(p, a, b):
def intersect_on_(src):
intersector = IntersectorThread(p, a(_source(src)), b(_source(src)))
intersector.start()
return intersector.to_generator()
return intersect_on_
def run(f, src, take = None):
if take != None:
i = 0
that = f(_source(src))
for item in that:
yield item
i += 1
if i >= take:
break
else:
for item in f(_source(src)):
yield item
query_works = filter(equals(condition, const("ecn.connectivity.works")))
query_broken = filter(equals(condition, const("ecn.connectivity.broken")))
both = intersect_on(target, query_works, query_broken)
result = run(both, sys.argv[1], 8192)
for r in result:
print(r)
print("done")
quit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment