Skip to content

Instantly share code, notes, and snippets.

@ecdedios
Created May 31, 2020 03:20
Show Gist options
  • Save ecdedios/60e8e4e7e8592d8b32af6ed78af854b3 to your computer and use it in GitHub Desktop.
Save ecdedios/60e8e4e7e8592d8b32af6ed78af854b3 to your computer and use it in GitHub Desktop.
Using joblib to process chunks in parallel.
from joblib import Parallel, delayed
def chunker(iterable, total_length, chunksize):
return (iterable[pos: pos + chunksize] for pos in range(0, total_length, chunksize))
def flatten(list_of_lists):
"Flatten a list of lists to a combined list"
return [item for sublist in list_of_lists for item in sublist]
def process_chunk(texts):
preproc_pipe = []
for doc in nlp.pipe(texts, batch_size=20):
preproc_pipe.append([(ent.text) for ent in doc.ents if ent.label_ in ['NORP', 'PERSON', 'FAC', 'ORG', 'GPE', 'LOC', 'PRODUCT', 'EVENT']])
return preproc_pipe
def preprocess_parallel(texts, chunksize=100):
executor = Parallel(n_jobs=7, backend='multiprocessing', prefer="processes")
do = delayed(process_chunk)
tasks = (do(chunk) for chunk in chunker(texts, len(df), chunksize=chunksize))
result = executor(tasks)
return flatten(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment