Skip to content

Instantly share code, notes, and snippets.

@braingineer
Last active February 24, 2017 21:45
Show Gist options
  • Save braingineer/35180f30280d75684d81de5ac7714d9d to your computer and use it in GitHub Desktop.
Save braingineer/35180f30280d75684d81de5ac7714d9d to your computer and use it in GitHub Desktop.
parallel spacy parse for jupyter notebooks
import eidos
from toolz import partition
import spacy
nlp = spacy.load('en')
def parse(nlp, input_, n_threads, batch_size):
nlp.matcher = None
out = []
for doc in nlp.pipe(input_, batch_size=batch_size, n_threads=n_threads):
out.append(doc.to_bytes())
return out
n_workers, n_threads, spacy_batchsize, worker_batchsize = 5, 2, 500, 5000
factory = eidos.parallel.SpacyWorkerFactory(parse)
workers = [next(factory) for _ in range(n_workers)]
coop = eidos.parallel.Cooperate(workers, ((part, n_threads, spacy_batchsize)
for part in partition(worker_batchsize, raw_data)))
results = coop.run()
data = [spacy.tokens.doc.Doc(nlp.vocab).from_bytes(doc_bytestring)
for worker_group in results
for doc_bytestring in worker_group]
import multiprocessing
import eidos
class Cooperate(multiprocessing.Process):
def __init__(self, workers, iterable, use_progress=True):
self.job_queue = multiprocessing.Queue()
self.solution_queue = multiprocessing.Queue()
self.cached = dict(enumerate(iterable))
list(map(self.job_queue.put, self.cached.items()))
self.workers = [worker.link(self.job_queue, self.solution_queue, w_i)
for w_i, worker in enumerate(workers)]
self.total_n = self.job_queue.qsize()
if use_progress:
self.progress = eidos.ProgressBar(total=self.job_queue.qsize())
def run(self):
for w in self.workers:
w.start()
self.out = []
while self.job_queue.qsize() > 0 or self.solution_queue.qsize() > 0:
solution_index, solution = self.solution_queue.get()
if self.progress:
self.progress.update(1)
del self.cached[solution_index]
self.out.append(solution)
print("I think I'm done;")
print("solution queue: {}".format(self.solution_queue.qsize()))
print("job queue: {}".format(self.job_queue.qsize()))
print("total_n: {}".format(self.total_n))
print("out array size: {}".format(len(self.out)))
print("left in cache: {}".format(len(self.cached)))
if len(self.out) != self.total_n:
print("leftover jobs; sigsegv and annoying bug.")
print("safer to quit and let you decide what to do")
print("consider calling this.flush() to get remaining")
return self.out
def flush(self):
print('solution_queue: ', self.solution_queue.qsize())
print('cache :', len(self.cached))
while self.solution_queue.qsize() > 0:
solution_index, solution = self.solution_queue.get()
self.out.append(solution)
del self.cached[solution_index]
print("Manually running {} left in cache".format(len(self.cached)))
for key in list(self.cached.keys()):
self.out.append(self.workers[0].handle(self.cached.pop(key)))
class QueueWorker(multiprocessing.Process):
def __init__(self, func, unpack=False, name="worker"):
self.func = func
self.unpack = unpack
self.linked = False
self.name = name
super(QueueWorker, self).__init__()
def link(self, job_queue, solution_queue, worker_id):
self.worker_id = worker_id
self.job_queue = job_queue
self.solution_queue = solution_queue
self.linked = True
return self
def run(self):
assert self.linked, "Workers should be called with job_queue, solution_queue, and worker_id first"
while self.job_queue.qsize() > 0:
job_index, job = self.job_queue.get()
self.solution_queue.put((job_index, self.handle(job)))
def handle(self, job):
if self.unpack:
out = self.func(*job)
else:
out = self.func(job)
return out
class SharedStateFactory(object):
def __init__(self, name, obj, cls, cls_kwargs):
self.obj_name = name
self.shared_obj = obj
self.cls = cls
self.cls_kwargs = cls_kwargs
def make(self):
out = self.cls(**self.cls_kwargs)
out.__dict__[self.obj_name] = self.shared_obj
return out
def __call__(self):
return self.make()
class SpacyWorker(QueueWorker):
'''
import eidos
from toolz import partition
def parse(nlp, input_, n_threads, batch_size):
nlp.matcher = None
out = []
for doc in nlp.pipe(input_, batch_size=batch_size, n_threads=n_threads):
out.append(doc.to_bytes())
return out
n_workers, n_threads, spacy_batchsize, worker_batchsize = 5, 2, 500, 5000
factory = eidos.parallel.SpacyWorkerFactory(parse)
workers = [next(factory) for _ in range(n_workers)]
coop = eidos.parallel.Cooperate(workers, ((part, n_threads, spacy_batchsize)
for part in partition(worker_batchsize, raw_data)))
results = coop.run()
data = [spacy.tokens.doc.Doc(nlp.vocab).from_bytes(doc_bytestring)
for worker_group in results
for doc_bytestring in worker_group]
'''
def link(self, *args, **kwargs):
if not hasattr(self, '_nlp'):
import spacy
self._nlp = spacy.load('en')
print("{} has loaded spacy".format(self))
return super(SpacyWorker, self).link(*args, **kwargs)
def handle(self, job):
if self.unpack:
job = tuple([self._nlp] + list(job))
#print("{} args with nlp".format(len(job)))
out = self.func(*job)
else:
out = self.func(self._nlp, job)
return out
def SpacyWorkerFactory(func, unpack=True, shared_nlp=False, name=None):
if name is None:
name = "spacy-worker"
if shared_nlp:
print("sharing nlp object")
import spacy
nlp = spacy.load('en')
factory = SharedStateFactory('_nlp', nlp, SpacyWorker, dict(func=func, unpack=unpack, name=name))
while True:
yield factory()
else:
print("No shared NLP object")
while True:
yield SpacyWorker(func, unpack, name=name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment