Skip to content

Instantly share code, notes, and snippets.

@ravila4
Last active September 11, 2019 18:40
Show Gist options
  • Save ravila4/f47333fa3a0e9472d4f93a86d10085d6 to your computer and use it in GitHub Desktop.
Save ravila4/f47333fa3a0e9472d4f93a86d10085d6 to your computer and use it in GitHub Desktop.
Functions for parallelizing things
# Functions for parallelizing things
def init_spark(nproc=-1, appname="sparksession"):
"""Function to start a Spark executor."""
from pyspark.sql import SparkSession
if nproc == -1:
# Use all CUPs
spark = SparkSession.builder.master(
"local[*]").appName(appname).getOrCreate()
else:
spark = SparkSession.builder.master(
"local[{}]".format(nproc)
).appName(appname).getOrCreate()
sc = spark.sparkContext
return sc
def map_parallel(iterable, func, nproc=-1):
"""A normal map using multiple processors."""
import multiprocessing
if nproc == -1:
n_jobs = multiprocessing.cpu_count()
else:
n_jobs = nproc
pool = multiprocessing.Pool(nproc)
results = pool.map(func, iterable)
return results
def flatmap_parallel(iterable, func, nproc=-1):
"""A map function that returns a flattened list."""
import multiprocessing
import itertools
if nproc == -1:
n_jobs = multiprocessing.cpu_count()
else:
n_jobs = nproc
pool = multiprocessing.Pool(nproc)
results = pool.map(func, iterable)
return list(itertools.chain(*results))
def map_threads(iterable, func, nproc=-1):
"""Map a function using threads."""
from multiprocessing.pool import ThreadPool
if nproc == -1:
n_jobs = multiprocessing.cpu_count()
else:
n_jobs = nproc
pool = ThreadPool(nproc)
results = pool.map(func, iterable)
return results
def apply_parallel(df, func, nproc=-1):
"""Splits a pandas Series into n chunks,
applies a given function, and returns the
concatenated output.
Args: (Series) A 1D pandas Series or DataFrame
Returns: (DataFrame)
"""
import multiprocessing
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
if nproc == -1:
n_jobs = multiprocessing.cpu_count()
else:
n_jobs = nproc
groups = np.array_split(df, n_jobs)
results = Parallel(n_jobs)(delayed(lambda g: g.apply(func))(group)
for group in groups)
return pd.concat(results)
def chunk_dataframe(df, chunk_size=1):
import pandas as pd
chunks = [df.ix[df.index[i:i + chunk_size]] for
i in range(0, df.shape[0], chunk_size)]
return chunks
def skip_chunk(iterable, n_chunks):
"""Quick and easy way to chunk a list."""
return [iterable[i::n_chunks] for i in range(n_chunks)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment