Skip to content

Instantly share code, notes, and snippets.

@adgaudio
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adgaudio/a2318f42288a1ff5df14 to your computer and use it in GitHub Desktop.
Save adgaudio/a2318f42288a1ff5df14 to your computer and use it in GitHub Desktop.
Distributed Percentile and Distributed Median - a proof of concept and example
"""
This example demonstrates a distributed algorithm to identify the
percentile of a distributed data set.
Because this is a toy implementation, the data isn't actually
distributed across multiple machines.
"""
import numpy as np
def make_distributed_sample_data(start=0, stop=100, step=1):
import time
np.random.seed(int(time.time()))
_arr = np.arange(start=start, stop=stop, step=step)
np.random.shuffle(_arr)
arrs = np.reshape(_arr, (np.sqrt(stop * 1/step), )*2)
# arrs = np.array([[85, 24, 97, 37, 7, 43, 93, 17, 79, 40],
# [66, 73, 35, 55, 11, 82, 6, 72, 77, 47],
# [50, 75, 57, 87, 71, 96, 64, 0, 21, 36],
# [61, 34, 30, 12, 74, 80, 8, 9, 1, 25],
# [48, 62, 59, 10, 98, 52, 78, 31, 83, 3],
# [68, 67, 88, 19, 18, 81, 89, 69, 54, 90],
# [65, 53, 56, 84, 46, 28, 92, 15, 44, 20],
# [16, 29, 39, 86, 13, 38, 32, 27, 22, 33],
# [49, 76, 5, 63, 45, 95, 23, 4, 42, 41],
# [70, 2, 51, 26, 99, 94, 58, 91, 14, 60]])
# np.median(arrs) # == 49.5 because we don't include "100" as a value
# np.percentile(np.concatenate(arrs), [0, 25, 50, 75, 100])
# [0.0, 24.75, 49.5, 74.25, 99.0]
return arrs
def distributed_percentile(arrs, percentile):
"""
This example demonstrates a distributed algorithm to identify the
percentile of a data set.
arrs - a "distributed" list of arrays. Because this is a poc example,
"distributed" in this case just means in memory
percentile - number in range [0, 100]
"""
assert percentile > 0 and percentile < 100, "Use min() or max()"
#
# get total number of elements, n
n = sum(map(len, arrs))
#
# get the index of the closest middle element, k
k = int(n * (100 - percentile) / 100)
# DEBUG logic
# print "n=%s k=%s" % (n, k)
# debug_counter = 0 # helps not hang the program if you change this code
while True:
# DEBUG logic
# debug_counter += 1
# if debug_counter > np.prod(arrs.shape):
# raise Exception('hung')
# print np.concatenate(arrs)
#
# select a random element, pivot, from array and "broadcast" it to each
# distributed dataset.
# since this example doesn't actually use distributed nodes,
# just think of this pivot array as distributed
pivot = np.random.choice(np.concatenate(arrs))
#
# on each node, find m, the number of nodes > pivot
m = np.ones(len(arrs))
for ith_node, node_data in enumerate(arrs):
m[ith_node] = np.count_nonzero(node_data > pivot)
#
# find the total num nodes > pivot and call it m.sum()
# does m.sum() represent less than half of our data?
# if so, then on each node we discard the elements > pivot
# and reset k to accomodate the discarded items
# ie "if less than half of the elements are > pivot, discard the left
# side of the sorted arrs array and reset the index, k, to the correct
# location of the median"
if m.sum() < k:
arrs = discard_elements(
arrs, pivot, less_than_pivot=False)
k -= m.sum()
# if not, then on each node we discard the elements < pivot
# and reset k to accomodate the discarded items
# ie "if more than half of the elements are > pivot, discard the left
# side of the sorted arrs array. we don't need to reset the index, k"
elif m.sum() > k:
arrs = discard_elements(arrs, pivot, less_than_pivot=True)
else: # we found the median! m.sum() == k
# there are equal num values > and less than the pivot
break
return pivot
def discard_elements(arrs, pivot, less_than_pivot=True):
arrs2 = []
for ith_node in np.arange(len(arrs)):
data = arrs[ith_node]
if less_than_pivot:
discards = data < pivot
else:
discards = data > pivot
keep = np.logical_not(discards)
arrs2.append(data[keep])
return arrs2
def distributed_median(arrs, **kwargs):
return distributed_percentile(arrs, 50, **kwargs)
def test_distributed_median():
arrs = make_distributed_sample_data()
np.testing.assert_equal(distributed_median(arrs), 49)
def test_distributed_percentile():
arrs = make_distributed_sample_data()
np.testing.assert_equal(
distributed_percentile(arrs, 25), int(np.percentile(arrs, 25)))
np.testing.assert_equal(
distributed_percentile(arrs, 75), int(np.percentile(arrs, 75)))
arrs = make_distributed_sample_data(0, 25, .25)
np.testing.assert_equal(
distributed_percentile(arrs, 50), 12.25)
np.testing.assert_equal(
distributed_percentile(arrs, 20), 4.75)
if __name__ == '__main__':
arrs = make_distributed_sample_data()
print "50th percentile: %s" % distributed_percentile(arrs, 50)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment