Skip to content

Instantly share code, notes, and snippets.

@payoto
Last active May 4, 2020 15:29
Show Gist options
  • Save payoto/1688b4dd1737262870704f67d7981a10 to your computer and use it in GitHub Desktop.
Save payoto/1688b4dd1737262870704f67d7981a10 to your computer and use it in GitHub Desktop.
import numpy as np
from multiprocessing import Pool
from itertools import repeat
import time
import pandas as pd
# Little test of https://stackoverflow.com/questions/49367278/any-way-to-speed-up-numpy-cumsum
# DOES NOT WORK
def parallel_cumsum(arr_in, arr_out, axis=0):
# Compute cumsum in parallel over second dimension
with Pool(2) as pool:
# Use module function with np.rollaxis to avoid need to define
# worker to do slicing
pool.starmap(
np.cumsum,
zip(
np.moveaxis(arr_in, axis, 1),
repeat(0),
repeat(None),
np.moveaxis(arr_out, axis, 1)
)
)
def parallel_with_alloc(start_array, times, axis):
times["start_parallel"] = time.perf_counter()
out_parallel = np.zeros_like(start_array)
times["parallel_alloc"] = time.perf_counter() - times["start_parallel"]
parallel_cumsum(start_array, out_parallel, axis=axis)
times["parallel_total"] = time.perf_counter() - times["start_parallel"]
times["parallel_comp"] = times["parallel_total"] - times["parallel_alloc"]
return out_parallel
def parallel_no_alloc(start_array, times, axis):
times["start_parallel"] = time.perf_counter()
# out_parallel = np.zeros_like(start_array)
times["parallel_alloc"] = time.perf_counter() - times["start_parallel"]
parallel_cumsum(start_array, start_array, axis=axis)
times["parallel_total"] = time.perf_counter() - times["start_parallel"]
times["parallel_comp"] = times["parallel_total"] - times["parallel_alloc"]
return start_array
def test_time(parallel_func=parallel_with_alloc, n=5000, axis=1):
times = {
"size": (n, n),
}
times["start"] = time.perf_counter()
start_array = np.random.rand(n*n).reshape((n, n))
start_array2 = np.array(start_array)
# Make an output array of matching size, that can be populated piecemeal
# in each thread
out_parallel = parallel_with_alloc(start_array, times, axis)
# Serial execution
times["start_serial"] = time.perf_counter()
out_serial = start_array2.cumsum(axis=axis)
times["serial"] = time.perf_counter() - times["start_serial"]
if not (out_parallel == out_serial).all():
print(out_parallel)
print("")
print(out_serial)
print("")
out_serial2 = start_array2.cumsum(axis=1-axis)
print(out_serial2)
assert (out_parallel == out_serial).all(), \
f"Not all values in cumsum were equal for n={n}"
return times
def main():
sizes = [5, 1000, 2000]
timings_alloc = [
test_time(parallel_with_alloc, n, 1) for n in sizes
]
print("Timings with allocation")
print(pd.DataFrame(timings_alloc))
timings_noalloc = [
test_time(parallel_no_alloc, n, 1) for n in sizes
]
print("Timings with allocation")
print(pd.DataFrame(timings_noalloc))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment