Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save hendra-herviawan/5ce0dead3b51e51a41696862e23effa7 to your computer and use it in GitHub Desktop.
Save hendra-herviawan/5ce0dead3b51e51a41696862e23effa7 to your computer and use it in GitHub Desktop.
pandas DataFrame apply multiprocessing
import multiprocessing
import pandas as pd
import numpy as np
def _apply_df(args):
df, func, kwargs = args
return df.apply(func, **kwargs)
def apply_by_multiprocessing(df, func, **kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes=workers)
result = pool.map(_apply_df, [(d, func, kwargs)
for d in np.array_split(df, workers)])
pool.close()
return pd.concat(list(result))
def square(x):
return x**x
if __name__ == '__main__':
df = pd.DataFrame({'a':range(10), 'b':range(10)})
apply_by_multiprocessing(df, square, axis=1, workers=4)
## run by 4 processors
%%time
import re
from multiprocessing import Pool
num_cores = 2
def tgty(x):
_brand_name = []
if (isinstance(x['brand_name'], str)):
_brand_name = x['brand_name']
else :
#_brand_name = set(x['name'].split(" ")).intersection(brand_name)
_brand_name = re.findall(brand_name_re, x['name'])
if ('A' in _brand_name) or (_brand_name == []):
#print(x['name'], _brand_name)
_brand_name = x['brand_name']
else :
_brand_name = _brand_name[0]
#print(x['name'], _brand_name)
return _brand_name
def _apply_df(args):
_df, func, kwargs = args
return _df.apply(lambda x: func(x), axis=1)
def apply_by_multiprocessing(_df, func, **kwargs):
workers = kwargs.pop('workers')
with Pool(processes=workers) as pool:
result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(_df, workers)])
return pd.concat(list(result))
#name['name'] = name['name'].astype(str)
#name['brand_name'] = name.apply(lambda x: tgtg(x), axis = 1)
#df['brand_name']
XXXXXX = apply_by_multiprocessing(df, tgty, workers=num_cores)
def applydf2(c, remove_stopwords=True):
data = pd.Series()
l = c.tolist()
#data = c.apply(lambda x: KaggleWord2VecUtility.review_to_sentences(x, remove_stopwords))
data = []
for x in l:
data += KaggleWord2VecUtility.review_to_wordlist(x, remove_stopwords) # paragraf to santance, santance to word
return data
def parallelize_dataframe2(func, c):
df_split = np.array_split(c, num_cores)
pool = Pool(num_cores)
#df = pd.concat(pool.map(func, df_split))
#df = sum(pool.map(func, df_split), [])
df = pool.map(func, df_split)
pool.close()
pool.join()
return df
sentences1 = parallelize_dataframe(applydf, train["review"])
from multiprocessing import Process, Pool, Queue
def Myfunc2(s):
paramsCount = {'min_df':100, 'stop_words':'english', 'binary':False,
'ngram_range':(1,2), 'max_features':None, 'use_idf':False}
print ("TfidfVectorizer: {}".format(s))
return {s+'_count': TfidfVectorizer(**paramsCount).fit_transform(df[s])}
if __name__ == '__main__':
embedding = {}
proc = {}
p = Pool(4)
proc['name_count'] = p.apply_async(Myfunc1, args=(["name"]) )import pandas as pd
import multiprocessing as mp
LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time
def process_frame(df):
# process data frame
return len(df)
if __name__ == '__main__':
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
pool = mp.Pool(4) # use 4 processes
funclist = []
for df in reader:
# process each data frame
f = pool.apply_async(process_frame,[df])
funclist.append(f)
result = 0
for f in funclist:
result += f.get(timeout=10) # timeout in 10 seconds
print "There are %d rows of data"%(result)
proc['name_desciption_count'] = p.apply_async(Myfunc1, args=(['name_desciption'] ))
proc['item_description_count'] = p.apply_async(Myfunc1, args=(['item_description'] ))
proc['name_tf'] = p.apply_async(Myfunc2, args=(["name"]) )
proc['name_description_tf'] = p.apply_async(Myfunc2, args=(["name_desciption"]) )
proc['item_description_tf'] = p.apply_async(Myfunc2, args=(["item_description"]) )
for pp in proc:
embedding.update(proc[pp].get()) # will block
p.close()
X_train,X_valid,y_train,y_valid = hstackEmbedding(embedding,y)
del embedding; gc.collect()
# http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html
import pandas as pd
import multiprocessing as mp
LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time
def process_frame(df):
# process data frame
return len(df)
if __name__ == '__main__':
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
pool = mp.Pool(4) # use 4 processes
funclist = []
for df in reader:
# process each data frame
f = pool.apply_async(process_frame,[df])
funclist.append(f)
result = 0
for f in funclist:
result += f.get(timeout=10) # timeout in 10 seconds
print "There are %d rows of data"%(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment