Skip to content

Instantly share code, notes, and snippets.

@ayushdg
Created April 30, 2020 19:46
Show Gist options
  • Save ayushdg/caa21ac0af8eaae31d3a518a3ccad595 to your computer and use it in GitHub Desktop.
Save ayushdg/caa21ac0af8eaae31d3a518a3ccad595 to your computer and use it in GitHub Desktop.
Concat partitions within worker
def concat_dfs(df_list):
"""
Concat a list of cudf dataframes
"""
return cudf.concat(df_list)
def get_delayed_dict(ddf):
"""
Returns a dicitionary with the dataframe tasks as keys and
the dataframe delayed objects as values
"""
df_delayed = {}
for delayed_obj in ddf.to_delayed():
df_delayed[str(delayed_obj.key)] = delayed_obj
return df_delayed
def concat_within_workers(client, ddf):
df_delayed = get_delayed_dict(ddf)
result = []
for worker, tasks in client.has_what().items():
worker_task_list = []
for task in list(tasks):
if task in df_delayed:
worker_task_list.append(df_delayed[task])
concat_tasks = delayed(concat_dfs)(worker_task_list)
result.append(client.persist(collections=concat_tasks,workers=worker))
return dask_cudf.from_delayed(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment