Skip to content

Instantly share code, notes, and snippets.

@brandondube
Created January 29, 2022 19:08
Show Gist options
  • Save brandondube/302f7e43f57d1e2d9d99347d29385fc1 to your computer and use it in GitHub Desktop.
Save brandondube/302f7e43f57d1e2d9d99347d29385fc1 to your computer and use it in GitHub Desktop.
import time
import numpy as np
from matplotlib import pyplot as plt
# replace with np.fft or scipy.fft if you don't have it installed,
# not central to the demonstration, but better multi-threading
# "the point" is to examine parallelization over N "whole tasks"
# using ray or another lib, vs using low-level parallelization
# by threading each array op
# ray's initialization forces MKL into single-threaded mode, but it
# is still faster than numpy or scipy fft in that condition (only minutely)
import mkl_fft
import ray
ray.init(num_cpus=8)
class State:
def __init__(self, a, b, c, d, e):
self.a = a
self.b = b
self.c = c
self.d = d
self.e = e
def process(s):
# define as a method of the class in order to make Actors work, not
# really how I want my code to look
a, b, c, d, e = s.a, s.b, s.c, s.d, s.e
aa = mkl_fft.fft2(a)
bb = aa * b
cc = mkl_fft.ifft2(bb / c)
dd = np.exp(cc) * d
return dd + e
def process(s):
# this is the line that crashes
s = ray.get(s)
a, b, c, d, e = s.a, s.b, s.c, s.d, s.e
aa = mkl_fft.fft2(a) / a.size
bb = aa * b
cc = mkl_fft.ifft2(bb / c)
dd = np.exp(cc) * d
return dd + e
def process2(a, b, c, d, e):
aa = mkl_fft.fft2(a) / a.size
bb = aa * b
cc = mkl_fft.ifft2(bb / c)
dd = np.exp(cc) * d
return dd + e
def loop_put(*args):
return [ray.put(a) for a in args]
rState = ray.remote(State)
rP = ray.remote(process)
rP2 = ray.remote(process2)
a, b, c, d, e = [np.random.rand(4096, 4096) for _ in range(5)]
aa, bb, cc, dd, ee = loop_put(a, b, c, d, e)
rs = rState.remote(a, b, c, d, e)
s = State(a, b, c, d, e)
rs2 = ray.put(s)
# basic py
ntrials = 8*10
# basic python loop
start = time.perf_counter()
for _ in range(ntrials):
s.process()
end = time.perf_counter()
dT = end-start
print('for loop', dT)
start = time.perf_counter()
futures = [rs.process.remote() for _ in range(ntrials)]
ray.get(futures)
end = time.perf_counter()
dT = end-start
print('actor', dT)
start = time.perf_counter()
futures = [rP2.remote(aa, bb, cc, dd, ee) for _ in range(ntrials)]
ray.get(futures)
end = time.perf_counter()
dT = end-start
print('individual state', dT)
# this crashed because I had a ray.get on the first line of process
start = time.perf_counter()
futures = [rP.remote(rs2) for _ in range(ntrials)]
ray.get(futures)
end = time.perf_counter()
dT = end-start
print('cls state', dT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment