Skip to content

Instantly share code, notes, and snippets.

@internaut internaut/parallelized.py
Last active Feb 2, 2018

Embed
What would you like to do?
Runtime optimization through vectorization and parallelization
"""
Runtime optimization through vectorization and parallelization.
Script 3: Parallel and vectorized calculation of haversine distance.
Please note that this might be slower than the single-core vectorized version because of the overhead that is caused
by multiprocessing.
January 2018
Markus Konrad <markus.konrad@wzb.eu>
"""
import sys
import os
import multiprocessing as mp
import numpy as np
import pandas as pd
from vectorized import vec_haversine # we use the same vectorized haversine function
operation = 'haversine' # haversine or hash
def process_chunk(proc_chunk):
"""
Process the partial array/dataframe `proc_chunk` passed to this process worker.
Return an array/dataframe of the same length and same indices.
"""
# pid = os.getpid()
# print('process %d: got chunk of size %d with rows %d to %d'
# % (pid, len(proc_chunk), proc_chunk.index[0], proc_chunk.index[-1]))
if operation == 'haversine':
chunk_res = vec_haversine(proc_chunk.origin_lat, proc_chunk.origin_lng,
proc_chunk.destination_lat, proc_chunk.destination_lng)
chunk_res.index = proc_chunk.index
else: # hash
chunk_res = proc_chunk.apply(lambda row: hash(tuple(row)), axis=1)
return chunk_res
if __name__ == '__main__':
script_signature = '%s <haversine|hash> <num. test rows>' % sys.argv[0]
if len(sys.argv) != 3:
print('run as: ', script_signature, file=sys.stderr)
exit(1)
operation = sys.argv[1]
assert operation in ('haversine', 'hash')
n_values = int(sys.argv[2])
coords = np.array([np.random.uniform(-90, 90, n_values), # random origin latitude
np.random.uniform(-180, 180, n_values), # random origin longitude
np.random.uniform(-90, 90, n_values), # random destination latitude
np.random.uniform(-180, 180, n_values)]).T # random destination longitude
df_coords = pd.DataFrame(coords, columns=['origin_lat', 'origin_lng', 'destination_lat', 'destination_lng'])
# set the number of processes
n_proc = mp.cpu_count()
# this often can't be devided evenly (handle this in the for-loop below)
chunksize = len(coords) // n_proc
# devide into chunks
proc_chunks = []
for i_proc in range(n_proc):
chunkstart = i_proc * chunksize
# make sure to include the division remainder for the last process
chunkend = (i_proc + 1) * chunksize if i_proc < n_proc - 1 else None
proc_chunks.append(df_coords.iloc[slice(chunkstart, chunkend)])
assert sum(map(len, proc_chunks)) == len(df_coords) # make sure all data is in the chunks
# distribute work to the worker processes
with mp.Pool(processes=n_proc) as pool:
# starts the sub-processes without blocking
# pass the chunk to each worker process
proc_results = [pool.apply_async(process_chunk, args=(chunk,)) for chunk in proc_chunks]
# blocks until all results are fetched
result_chunks = [r.get() for r in proc_results]
# concatenate results from worker processes
results = pd.concat(result_chunks)
results = pd.concat((df_coords, results), axis=1)
assert len(results) == len(df_coords) # make sure we got a result for each coordinate pair
# print('results:')
# print(results)
"""
haversine is not faster with parallel processing:
In [10]: %timeit %run vectorized.py dataframe 1000000
230 ms ± 10.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [11]: %timeit %run parallelized.py haversine 1000000
388 ms ± 70.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
hashing is faster:
In [8]: %timeit %run unoptimized.py hash dataframe 1000000
16.4 s ± 270 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [9]: %timeit %run parallelized.py hash 1000000
8.37 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
"""
"""
Runtime optimization through vectorization and parallelization.
Script 1: Unoptimized script with row-wise apply of haversine calculation.
January 2018
Markus Konrad <markus.konrad@wzb.eu>
# example with real places:
coords = np.array([
[52.516667, 13.388889, 51.507222, -0.1275], # Berlin-London
[52.516667, 13.388889, 55.75, 37.616667], # Berlin-Moscow
[55.75, 37.616667, 51.507222, -0.1275], # Moscow-London
])
np.apply_along_axis(haversine, 1, coords)
####
labels = [
'Berlin-London',
'Berlin-Moscow',
'Moscow-London',
]
df_coords = pd.DataFrame(coords, index=labels,
columns=['origin_lat', 'origin_lng',
'destination_lat', 'destination_lng'])
df_coords.apply(haversine, axis=1)
"""
import math
import sys
import pandas as pd
import numpy as np
def haversine(row):
"""
haversine: calculate great circle distance between two points on earth in km
"""
a_lat, a_lng, b_lat, b_lng = row
R = 6371 # earth radius in km
a_lat = math.radians(a_lat)
a_lng = math.radians(a_lng)
b_lat = math.radians(b_lat)
b_lng = math.radians(b_lng)
d_lat = b_lat - a_lat
d_lng = b_lng - a_lng
a = math.pow(math.sin(d_lat / 2), 2) + math.cos(a_lat) * math.cos(b_lat) * math.pow(math.sin(d_lng / 2), 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
if __name__ == '__main__':
script_signature = '%s <haversine|hash> <array|dataframe> <num. test rows>' % sys.argv[0]
if len(sys.argv) != 4:
print('run as: ', script_signature, file=sys.stderr)
exit(1)
operation = sys.argv[1]
assert operation in ('haversine', 'hash')
datastructure = sys.argv[2]
assert datastructure in ('array', 'dataframe')
n_values = int(sys.argv[3])
coords = np.array([np.random.uniform(-90, 90, n_values), # random origin latitude
np.random.uniform(-180, 180, n_values), # random origin longitude
np.random.uniform(-90, 90, n_values), # random destination latitude
np.random.uniform(-180, 180, n_values)]).T # random destination longitude
if operation == 'haversine':
fn = haversine
else:
fn = lambda row: hash(tuple(row))
if datastructure == 'array':
result = np.apply_along_axis(fn, 1, coords)
else:
df_coords = pd.DataFrame(coords)
result = df_coords.apply(fn, axis=1)
"""
In [3]: %timeit %run unoptimized.py array 10000
53.8 ms ± 269 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [4]: %timeit %run unoptimized.py dataframe 10000
174 ms ± 6.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
"""
"""
Runtime optimization through vectorization and parallelization.
Script 2: Vectorized calculation of haversine distance.
January 2018
Markus Konrad <markus.konrad@wzb.eu>
# example with real places:
coords = np.array([
[52.516667, 13.388889, 51.507222, -0.1275], # Berlin-London
[52.516667, 13.388889, 55.75, 37.616667], # Berlin-Moscow
[55.75, 37.616667, 51.507222, -0.1275], # Moscow-London
])
vec_haversine(coords[:, 0], coords[:, 1], coords[:, 2], coords[:, 3])
####
labels = [
'Berlin-London',
'Berlin-Moscow',
'Moscow-London',
]
df_coords = pd.DataFrame(coords, index=labels,
columns=['origin_lat', 'origin_lng',
'destination_lat', 'destination_lng'])
vec_haversine(df_coords.origin_lat, df_coords.origin_lng,
df_coords.destination_lat, df_coords.destination_lng)
"""
import sys
import numpy as np
import pandas as pd
def vec_haversine(a_lat, a_lng, b_lat, b_lng):
"""
Vectorized version of haversine / great circle distance for point to point distances on earth. Calculates the
haversine distance in kilometers for a bunch of geocoordinate pairs between the points defined by
`a_lat`, `a_lng` and `b_lat`, `b_lng`.
"""
R = 6371 # earth radius in km
a_lat = np.radians(a_lat)
a_lng = np.radians(a_lng)
b_lat = np.radians(b_lat)
b_lng = np.radians(b_lng)
d_lat = b_lat - a_lat
d_lng = b_lng - a_lng
d_lat_sq = np.sin(d_lat / 2) ** 2
d_lng_sq = np.sin(d_lng / 2) ** 2
a = d_lat_sq + np.cos(a_lat) * np.cos(b_lat) * d_lng_sq
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
return R * c # returns distance between a and b in km
if __name__ == '__main__':
script_signature = '%s <array|dataframe> <num. test rows>' % sys.argv[0]
if len(sys.argv) != 3:
print('run as: ', script_signature, file=sys.stderr)
exit(1)
datastructure = sys.argv[1]
assert datastructure in ('array', 'dataframe')
n_values = int(sys.argv[2])
coords = np.array([np.random.uniform(-90, 90, n_values), # random origin latitude
np.random.uniform(-180, 180, n_values), # random origin longitude
np.random.uniform(-90, 90, n_values), # random destination latitude
np.random.uniform(-180, 180, n_values)]).T # random destination longitude
if datastructure == 'array':
result = vec_haversine(coords[:, 0], coords[:, 1], coords[:, 2], coords[:, 3])
else:
df_coords = pd.DataFrame(coords, columns=['origin_lat', 'origin_lng', 'destination_lat', 'destination_lng'])
result = vec_haversine(df_coords.origin_lat, df_coords.origin_lng,
df_coords.destination_lat, df_coords.destination_lng)
"""
In [8]: %timeit %run vectorized.py array 10000
2.9 ms ± 72.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [9]: %timeit %run vectorized.py dataframe 10000
4.83 ms ± 134 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.