Skip to content

Instantly share code, notes, and snippets.

@eavidan
Last active December 17, 2018 13:29
Show Gist options
  • Save eavidan/d5acdebd9743ae6dba3387131103dd5a to your computer and use it in GitHub Desktop.
Save eavidan/d5acdebd9743ae6dba3387131103dd5a to your computer and use it in GitHub Desktop.
an implementation of distributed bucket sort for pandas using Ray
import ray
import pandas as pd
import numpy as np
ray.init(local_mode=True)
def ge(a, b, by):
gt = a > b
eq = a == b
for col in by:
if gt[col]:
return True
elif not eq[col]:
return False
return True
@ray.remote
class Partition:
def __init__(self, pid, df):
self.pid = pid
self.df = df
self.sorted_df = pd.DataFrame()
def sample(self, n):
return self.df.sample(n)
@staticmethod
def _get_partition(row, cutoffs, by):
for part, cutoff in cutoffs:
if ge(cutoff, row, by=by):
return part
return None, None
def shuffle(self, cutoffs, max_partition, by):
oids = []
for index, row in self.df.iterrows():
pid, actor = self._get_partition(row, cutoffs, by=by)
if actor is None:
oid = max_partition[1].put.remote(row)
oids.append(oid)
else:
oid = actor.put.remote(row)
oids.append(oid)
# sync
ray.get(oids)
return 1
def put(self, row):
self.sorted_df = self.sorted_df.append(row)
return 1
def sort_local(self, by):
self.sorted_df = self.sorted_df.sort_values(by=by)
return 1
def get_sorted_df(self):
return self.sorted_df
def get_sorted_ix(self):
return self.sorted_df.index
n = 100
sample_size = 10
m = 4
columns = ['A', 'B', 'C', 'D']
by = ['B', 'C']
# generate df and break into m chunks
df = pd.DataFrame(np.random.randn(n, len(columns)), columns=columns)
partition_size = n/m
chunks = [(i, df[i::m]) for i in range(m)]
# create DF partitions (ray Actors)
partitions = [(pid, Partition.remote(pid, c)) for (pid, c) in chunks]
samples = ray.get([actor.sample.remote(n=sample_size) for pid, actor in partitions])
sorted_sample = pd.concat(samples).sort_values(by=by)
cutoffs = [sorted_sample.iloc[i*sample_size-1] for i in range(1, m)]
partitions_cutoffs = list(zip(partitions, cutoffs))
# shuffle - send each row to its relevant partition according to cutoffs
t = [actor.shuffle.remote(partitions_cutoffs, partitions[-1], by=by) for pid, actor in partitions]
ray.get(t)
# local sort - sort each partition DF
t = [actor.sort_local.remote(by=by) for pid, actor in partitions]
ray.get(t)
# collect sorted indexes from each partition and repartition original DF
new_index = []
for pid, actor in partitions:
new_index.extend(ray.get(actor.get_sorted_ix.remote()))
distributed_sort_df = df.reindex(index=new_index)
# validate sort against local df
local_sort_df = df.sort_values(by=by)
assert local_sort_df.equals(distributed_sort_df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment