Skip to content

Instantly share code, notes, and snippets.

@TimRepke
Last active June 29, 2022 08:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TimRepke/58c8dc99e3c8788cd8102bd4ad91192d to your computer and use it in GitHub Desktop.
Save TimRepke/58c8dc99e3c8788cd8102bd4ad91192d to your computer and use it in GitHub Desktop.
Spark vs Python doc2vec

Spark vs Single-core python

Question: can parallel pre- and postprocessing speed up Gensim Doc2Vec?

  • Spark: 349s
  • Vanilla: 373s

(only one run, so not a very scientific comparison)

Run on a single machine with 16GB RAM and Intel i7-8550U CPU @ 1.80GHz

This gist contains the first three lines from the input file, which for this example has 200k lines and 457MB

from pyspark import SparkContext, SparkConf
import ujson as json
from gensim.models.doc2vec import TaggedDocument
from gensim.parsing import preprocessing as pp
from gensim import utils
from gensim.models.doc2vec import Doc2Vec
import logging
import timeit
import os
filter_funcs = [
lambda s: s.lower(),
pp.strip_tags,
pp.strip_punctuation,
pp.strip_multiple_whitespaces,
pp.strip_numeric,
pp.remove_stopwords,
lambda s: pp.strip_short(s, 3),
# pp.stem_text,
pp.strip_non_alphanum
]
def norm_doc(doc):
s = utils.to_unicode(doc['paperAbstract'])
for f in filter_funcs:
s = f(s)
doc['normed'] = s
return doc
def lookup_infer(doc):
try:
vector = bc_model.value.docvecs[doc['id']]
except KeyError:
td = to_tagged_doc(doc)
vector = bc_model.value.infer_vector(td.words, epochs=5)
doc['d2v'] = vector.tolist()
return doc
def filter_fields(doc):
return {
'id': doc['id'],
'd2v': doc['d2v'],
'normed': doc['normed']
}
def to_tagged_doc(doc):
return TaggedDocument(doc['normed'].split(), [doc['id']])
class TDs:
def __init__(self, data):
self.data = data
def __iter__(self):
print('restart localIterator')
docs = self.data.toLocalIterator()
for _, sample in enumerate(docs):
yield to_tagged_doc(sample)
print('finished iterating localIterator')
time0 = timeit.default_timer()
conf = SparkConf() \
.setAppName('pipeline') \
.setMaster('local[*]') \
.set("spark.files.overwrite", "true") \
.set('spark.hive.mapred.supports.subdirectories', 'true') \
.set('spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive', 'true') \
.set('spark.driver.memory', '2g') \
.set('spark.executor.memory', '1g') \
.set('spark.executor.cores', '2') \
.set('spark.executor.instances', '2')
ctx = SparkContext(conf=conf)
data_source = '../data/s2-200000.json'
model_file = '../data/pipeline/tmp/d2v.pickle'
data_sink = '../data/pipeline/tmp/d2v'
rdd = ctx.textFile(data_source, minPartitions=8).map(json.loads)
rdd = rdd.map(norm_doc)
if os.path.isfile(model_file):
model = Doc2Vec.load(model_file)
else:
t_docs = TDs(rdd)
model = Doc2Vec(documents=t_docs, min_count=2, max_vocab_size=10000,
vector_size=48, epochs=5, workers=8)
model.save(model_file)
bc_model = ctx.broadcast(model)
rdd = rdd.map(lookup_infer)
rdd = rdd.map(filter_fields)
rdd = rdd.map(json.dumps)
rdd.saveAsTextFile(data_sink)
print(f'Time: {timeit.default_timer() - time0:.1f}s')
import ujson as json
from gensim.models.doc2vec import TaggedDocument
from gensim.parsing import preprocessing as pp
from gensim import utils
from gensim.models.doc2vec import Doc2Vec
import logging
import os
import timeit
class NormData:
FILTER_FUNCS = [
lambda s: s.lower(),
pp.strip_tags,
pp.strip_punctuation,
pp.strip_multiple_whitespaces,
pp.strip_numeric,
pp.remove_stopwords,
lambda s: pp.strip_short(s, 3),
# pp.stem_text,
pp.strip_non_alphanum
]
def __init__(self, filename):
self.filename = filename
def __iter__(self):
print('start norm')
with open(self.filename, 'r') as f:
for line in f:
doc = json.loads(line)
s = utils.to_unicode(doc['paperAbstract'])
for func in self.FILTER_FUNCS:
s = func(s)
doc['normed'] = s
yield doc
class TDs:
def __init__(self, data):
self.data = data
def __iter__(self):
print('start td')
for doc in self.data:
yield TaggedDocument(doc['normed'].split(), [doc['id']])
class Vectoring:
def __init__(self, d2v_model, data):
self.model = d2v_model
self.data = data
def __iter__(self):
print('start vectoring')
for doc in self.data:
try:
vector = self.model.docvecs[doc['id']]
except KeyError:
td = TaggedDocument(doc['normed'].split(), [doc['id']])
vector = self.model.infer_vector(td.words, epochs=5)
doc['d2v'] = vector.tolist()
yield doc
class Filter:
def __init__(self, data):
self.data = data
def __iter__(self):
print('start filter')
for doc in self.data:
yield {
'id': doc['id'],
'd2v': doc['d2v'],
'normed': doc['normed']
}
def save(filename, data):
print('saving')
with open(filename, 'w') as f:
for doc in data:
f.write(json.dumps(doc) + '\n')
data_source = '../data/s2-200000.json'
model_file = '../data/pipeline/tmp/d2v.pickle'
data_sink = '../data/pipeline/tmp/d2v'
time0 = timeit.default_timer()
normed = NormData(data_source)
if os.path.isfile(model_file):
model = Doc2Vec.load(model_file)
else:
t_docs = TDs(normed)
model = Doc2Vec(documents=t_docs, min_count=2, max_vocab_size=10000,
vector_size=48, epochs=5, workers=8)
model.save(model_file)
vec = Vectoring(model, normed)
filtered = Filter(vec)
save(data_sink, filtered)
print(f'Time: {timeit.default_timer() - time0:.1f}s')
{"entities":["Jack Device Component"],"journalVolume":"109 49-50","journalPages":"26","pmid":"24568020v1","year":2013,"outCitations":[],"s2Url":"https://semanticscholar.org/paper/f2320c08c7d95bbf8bb72e4d6deaa6845ea4cf27","s2PdfUrl":"","id":"f2320c08c7d95bbf8bb72e4d6deaa6845ea4cf27","authors":[{"name":"Kate Jack","ids":["38280253"]}],"journalName":"Nursing times","paperAbstract":"","inCitations":[],"pdfUrls":[],"title":"60 seconds with Kate Jack.","doi":"","sources":["Medline"],"doiUrl":"","venue":"Nursing times"}
{"entities":["Decision Making","Laboratory Certification Document","Organization administrative structures"],"journalVolume":"31 2","journalPages":"127-30","pmid":"3514907v1","year":1986,"outCitations":[],"s2Url":"https://semanticscholar.org/paper/5432a99cdd9f8b248c50274cd3d2a6016f3d081e","s2PdfUrl":"","id":"5432a99cdd9f8b248c50274cd3d2a6016f3d081e","authors":[{"name":"W N Spellacy","ids":["5862934"]},{"name":"J Burger","ids":["1757639"]}],"journalName":"The Journal of reproductive medicine","paperAbstract":"The search for new administrators in complex systems is an important activity. The special requirements of academic organizations, particularly those with health centers, present some unique considerations that can confound this important and difficult process. Typically, national searches attract a sizable candidate list composed of persons with diverse backgrounds and experiences, and a committee is empowered to sort through their qualifications. A critical step in the planning of each search is the development of a process that allows participatory decision making while not requiring too much time. Too often the search becomes an unmanageable activity that confuses the searchers and frustrates the administration. A seven-step process has proven successful for use by committees to attract and sort through written candidate applications, to agree upon a preliminary ranking of candidates and to reach a consensus on a final list of recommendations. The process could be applied in almost any organizational setting.","inCitations":[],"pdfUrls":[],"title":"Organizing a search for an academic administrator.","doi":"","sources":["Medline"],"doiUrl":"","venue":"The Journal of reproductive medicine"}
{"entities":["Annexin A1","Annexins","Bacterial Infections","Chemotaxis","EPHA3 protein, human","FPR1 protein, human","FPR3 gene","Leukocytes","Ligands","Parathyroid Hormone Receptor","Peptide Receptor","Subfamily","fMet-Leu-Phe receptor"],"journalVolume":"172 12","journalPages":"7669-76","pmid":"15187149v1","year":2004,"outCitations":["c2b53b26c004fe57e85424df6ad101d283150648","d30e703f3d6d74bfc6845c9469d681e491bac5fe","6494f3ba9340ff0bf04bf6fcaee639a16c273d12","81e8afe731c3fceac456ef28027f96b40439e5e8","56101af10d0b74526d0365f850877a51c0e0866b","32ef6bd7082c817934c0163abb3e6d3e818c64d7","2f97ef8f51637774f47dcfdc5fc05ef728ed24b6","d21899d415d539ce3fad09032ad27afba84bcfd2","3929e76314143450e905f954c6226da5ced68954","5e69462299d07cac473aa1a864dd51cae398af9c","315a775b3219ae31367df439080806a1666b1624","e658ebc0cbb4443d1f79a1d6bc92111d9f536c98","e2164cb09414781bd46cb0d403a72a0ea5793682","43ef3abacf752f00d35ecdb4b08e32f0370d7abb","3123c56dadd34503128e73d556d0308748932c56","ec3405c7b34b3b9def5f87fe1ca0c9f0b37d4644","97ba43f452c3eaf3e994f39e426f86b3594369d1","4c23424e376c16e7fd501fc8742f51881e57d25b","497b0cfb888f36f32c4b4843aee01c5509b02123","f5ec2d092c6969299eeb2842060d6cab98613f73","c0f339442b3f84567f1f9ea05805480d5295c4a8","af02aed0e5bff33617f96d840e36ff065cf469ed","8559addff0abfeefbefeb7e7cff8667891adc0a3","941e38df2a1198b14dfc82aaa4cf6db12704004d","ad7b4b77fe7f98c6799d5a569db0380a6d352928","c5cd33ac650b3b7bbd6a4e8afc4ab89681ef16b0","136c57be9e4c36f8ba36e1966647724b27577ab9","6bc51ccd63cb18d5b950f0067d60d000c653696f","cbefdfe5df7696f92cd02ca8f27e768ab8597bed","bb2412da92ddae7063227e61c9e5cff080557e13","ed55dded3d9cb58e5b630840144548fe4c9b30f7","9aa2ae2573c701e5855977d7f354ee1b957e0fc3","050d7c0cc34d6dbd2485fca72e53498725f583fb","3e9a27b239b8b33259332b41b5541ee12e8fc8b1","614653777ab95341368b82744e40d079cce1599e","425ed8ee2d3c490ce1e2253d28fb6d5d447f2487","36063ad87d5de2293e63b26df1bb766ad10d19b0","32b823d9c492a2c1455b9cd2f292aa5d4002198f","c23f24bf3c29e2e48ea832809cd6da04293783d6","6d62795c42bc4397801968dd430303b56968d4b3","0fc68bb34163586d66d526c5e69792b038b01b8c","6a9dc9e2d9d3224d33dcc49d48c4cd60a4fd0a6a","bc0ab72643f01c9f3bbf957a9738c682da314d8a","32ef2f7cd4d56642e1991a39379738ae711fd231"],"s2Url":"https://semanticscholar.org/paper/155663331ea93379e99997bd43340eb54ab41a73","s2PdfUrl":"http://pdfs.semanticscholar.org/cb73/147dc0bf1b2eabc5513c5e7952ef4f36df16.pdf","id":"155663331ea93379e99997bd43340eb54ab41a73","authors":[{"name":"Stefanie Ernst","ids":["39900230"]},{"name":"Carsten Lange","ids":["38285107"]},{"name":"Andreas Wilbers","ids":["34505589"]},{"name":"Verena Goebeler","ids":["6302762"]},{"name":"Volker Gerke","ids":["6509600"]},{"name":"Ursula Rescher","ids":["4702003"]}],"journalName":"Journal of immunology","paperAbstract":"The human N-formyl peptide receptor (FPR) is a key modulator of chemotaxis directing granulocytes toward sites of bacterial infections. FPR is the founding member of a subfamily of G protein-coupled receptors thought to function in inflammatory processes. The other two members, FPR-like (FPRL)1 and FPRL2, have a greatly reduced affinity for bacterial peptides or do not bind them at all, with FPRL2 being considered an orphan receptor so far. In this study we show that a peptide derived from the N-terminal domain of the anti-inflammatory protein annexin 1 (lipocortin 1) can activate all three FPR family members at similar concentrations. The annexin 1 peptide initiates chemotactic responses in human monocytes that express all three FPR family members and also desensitizes the cells toward subsequent stimulation with bacterial peptide agonists. Experiments using HEK 293 cells stably expressing a single FPR family member reveal that all three receptors can be activated and desensitized by the N-terminal annexin 1 peptide. These observations identify the annexin 1 peptide as the first endogenous ligand of FPRL2 and indicate that annexin 1 participates in regulating leukocyte emigration into inflamed tissue by activating and desensitizing different receptors of the FPR family.","inCitations":["3738fad17126054f03cfe736b7156b6d6eef0481","927e690d772252cc95f7818a16a4cf43dab0edba","12a2c7031f12df3d00bfe5eaa9ae25851b825841","2f49fc254663c9b5b6e2fbc2b3e707eee5817c11","05b55180032103e7b31de248e227cb8d239d8acd","3f07ff1ee2813dd2c38b5bc28439769226bd1236","6c7ee2062b49226bfee08114f12cb1cd07509d9d","d1d3f4cc5671bbb339b43e63c065477c7c1f38c8","a986984b4e368907c36e0e078a935adcfd055076","bafabb8269cafefd06e039fed0833bab13f34afd","2d3bcb942422b792fa6242b4c79a7c5f2bc898e9","35767947edf698fbe772e0fdb545fe850b5b8d15","dbca6c002f78b9926459ce9b5bc87fc4738ad996","d32f93ced0ba660cf39c200b0186882ab1a149d1","b153a0589f5d2dbb8f21016cc202817bb1f937b4","5cbf95c2ff01c69e1be9da8a12324cd57ac73d82","17b36fa12edad261e163b414528b3edd01bddd28","24e442087f3533f64551f1a77b075e0be59b1f40","6057c5e6eba3b636711396ec2bb5e1d93c44632c","febd5b9457b8cfef884fa06b263d9117e23cf7be","031b6cb1f0f7a5c2d6a3aa08d83350b1579a2a08","2c282d7e1a95c41a0e6ea87f2ea44925fff0cc26","d18197d7be7433b5c253eac0d232ea00eac02c49","a647ec16597bedfbc94897042a45981b2694b32b","f9965f7617c5caea715fb749c4a964b141fe1ef2","10335c92eaf3130d9524170c2dfa1539590c3a30","b368b1c975450a779fa9ec9c75259914304f8928","2f01ccdec197a25d3a8840f7ac097d1f1c58c9ae","b677b11c5fe125e2d6b9a23f2f1c71e8f7253866","56a98d415a52893a9311de921f3332a4d5983f86","2759f0793ac28fcb6144a232d74a176ca0001a72","cfc4ccc8d6d7f541199324e80ff737ec32e90a04","210ae1d7c6386b4ffe07f6ae1065f567844da7d3","1910925eb67b1178271f889ed7ce3706ce51de01","e4d49a8d7d477165428e6c08b1066dc6d7251a4c","73ab48562b828beecd18d0b8b85eb8eac08b7f46","6bd3a7dfcd8b5582c6f3a37f6b21ee966d11d0a1","e10fd96b2c66623159ca50a126e6c398e9681f93","61d89c64d595992624b8d6e281174df893ec0311","06c2a40ad3b98bc414dff20c882867116082e7ac","38449740092b71d204a7f27aa68cf1f7a0a08b4b","c8f8030cadd63bfe6e24918aa12391fee171aa45","697729db06b9b4c70867cc6c8bce6416a7c5621d","0dfec70f31ae2294602602eba6066ac11e404a68","7b95f3b889c0428e3bfa16afa95d97d1f9d7c0f9","227944c357595eab0bb4bf8f73ddb0c8db35e2fa"],"pdfUrls":["http://www.jimmunol.org/content/jimmunol/172/12/7669.full.pdf","http://www.jimmunol.org/content/172/12/7669.full.pdf"],"title":"An annexin 1 N-terminal peptide activates leukocytes by triggering different members of the formyl peptide receptor family.","doi":"","sources":["Medline"],"doiUrl":"","venue":"Journal of immunology"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment