Last active
September 11, 2019 18:40
-
-
Save ravila4/f47333fa3a0e9472d4f93a86d10085d6 to your computer and use it in GitHub Desktop.
Functions for parallelizing things
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Functions for parallelizing things | |
def init_spark(nproc=-1, appname="sparksession"): | |
"""Function to start a Spark executor.""" | |
from pyspark.sql import SparkSession | |
if nproc == -1: | |
# Use all CUPs | |
spark = SparkSession.builder.master( | |
"local[*]").appName(appname).getOrCreate() | |
else: | |
spark = SparkSession.builder.master( | |
"local[{}]".format(nproc) | |
).appName(appname).getOrCreate() | |
sc = spark.sparkContext | |
return sc | |
def map_parallel(iterable, func, nproc=-1): | |
"""A normal map using multiple processors.""" | |
import multiprocessing | |
if nproc == -1: | |
n_jobs = multiprocessing.cpu_count() | |
else: | |
n_jobs = nproc | |
pool = multiprocessing.Pool(nproc) | |
results = pool.map(func, iterable) | |
return results | |
def flatmap_parallel(iterable, func, nproc=-1): | |
"""A map function that returns a flattened list.""" | |
import multiprocessing | |
import itertools | |
if nproc == -1: | |
n_jobs = multiprocessing.cpu_count() | |
else: | |
n_jobs = nproc | |
pool = multiprocessing.Pool(nproc) | |
results = pool.map(func, iterable) | |
return list(itertools.chain(*results)) | |
def map_threads(iterable, func, nproc=-1): | |
"""Map a function using threads.""" | |
from multiprocessing.pool import ThreadPool | |
if nproc == -1: | |
n_jobs = multiprocessing.cpu_count() | |
else: | |
n_jobs = nproc | |
pool = ThreadPool(nproc) | |
results = pool.map(func, iterable) | |
return results | |
def apply_parallel(df, func, nproc=-1): | |
"""Splits a pandas Series into n chunks, | |
applies a given function, and returns the | |
concatenated output. | |
Args: (Series) A 1D pandas Series or DataFrame | |
Returns: (DataFrame) | |
""" | |
import multiprocessing | |
from joblib import Parallel, delayed | |
import pandas as pd | |
import numpy as np | |
if nproc == -1: | |
n_jobs = multiprocessing.cpu_count() | |
else: | |
n_jobs = nproc | |
groups = np.array_split(df, n_jobs) | |
results = Parallel(n_jobs)(delayed(lambda g: g.apply(func))(group) | |
for group in groups) | |
return pd.concat(results) | |
def chunk_dataframe(df, chunk_size=1): | |
import pandas as pd | |
chunks = [df.ix[df.index[i:i + chunk_size]] for | |
i in range(0, df.shape[0], chunk_size)] | |
return chunks | |
def skip_chunk(iterable, n_chunks): | |
"""Quick and easy way to chunk a list.""" | |
return [iterable[i::n_chunks] for i in range(n_chunks)] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment