Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active September 6, 2019 23:03
Show Gist options
  • Save rjzamora/098a2d89068d400c36b60260cf81928c to your computer and use it in GitHub Desktop.
Save rjzamora/098a2d89068d400c36b60260cf81928c to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/nfs/rzamora/miniconda3/envs/cudf_dev_10/lib/python3.7/site-packages/numba/cuda/envvars.py:16: NumbaDeprecationWarning: \n",
"Environment variables with the 'NUMBAPRO' prefix are deprecated, found use of NUMBAPRO_NVVM=/usr/local/cuda-9.2/nvvm/lib64/libnvvm.so.\n",
"\n",
"For more information visit http://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-numbapro-environment-variables\n",
" warnings.warn(errors.NumbaDeprecationWarning(msg))\n",
"/home/nfs/rzamora/miniconda3/envs/cudf_dev_10/lib/python3.7/site-packages/numba/cuda/envvars.py:16: NumbaDeprecationWarning: \n",
"Environment variables with the 'NUMBAPRO' prefix are deprecated, found use of NUMBAPRO_LIBDEVICE=/usr/local/cuda-9.2/nvvm/libdevice.\n",
"\n",
"For more information visit http://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-numbapro-environment-variables\n",
" warnings.warn(errors.NumbaDeprecationWarning(msg))\n"
]
}
],
"source": [
"import dask\n",
"from dask.threaded import get\n",
"from dask.base import tokenize\n",
"from dask.utils import random_state_data\n",
"from toolz import merge\n",
"\n",
"import cudf\n",
"import cupy\n",
"import numpy as np\n",
"\n",
"\n",
"ddf = dask.datasets.timeseries(\n",
" start = \"2000-01-01\",\n",
" end = \"2000-01-31\",\n",
" freq = \"1S\",\n",
" partition_freq=\"1D\",\n",
" seed = 42,\n",
" id_lam=30,\n",
")\n",
"gddf = ddf.map_partitions(cudf.from_pandas)\n",
"\n",
"\n",
"# Use cupy within `_percentiles_summary`\n",
"use_cupy = False\n",
"\n",
"\n",
"def _percentiles_summary(df, num_old, num_new, upsample, state):\n",
" if use_cupy:\n",
" x = cupy.array([])\n",
" vals_and_weights = (np.array([1004.0, 1004.0]), np.array([50.0, 50.0]))\n",
" return vals_and_weights\n",
"\n",
"def _combine(sequence_of_data):\n",
" return sequence_of_data\n",
"\n",
"def _partition_quantiles(df, npartitions, upsample=1.0):\n",
"\n",
" def _dtype_info(df):\n",
" return df.dtype, None\n",
"\n",
" return_type = dask.dataframe.Series\n",
" qs = np.linspace(0, 1, npartitions + 1)\n",
" token = tokenize(df, qs, upsample)\n",
" random_state = int(token, 16) % np.iinfo(np.int32).max\n",
" state_data = random_state_data(df.npartitions, random_state)\n",
"\n",
" df_keys = df.__dask_keys__()\n",
"\n",
" name0 = \"re-quantiles-0-\" + token\n",
" dtype_dsk = {(name0, 0): (_dtype_info, df_keys[0])}\n",
"\n",
" name1 = \"re-quantiles-1-\" + token\n",
" val_dsk = {\n",
" (name1, i): (\n",
" _percentiles_summary,\n",
" key,\n",
" df.npartitions,\n",
" npartitions,\n",
" upsample,\n",
" state,\n",
" )\n",
" for i, (state, key) in enumerate(zip(state_data, df_keys))\n",
" }\n",
" \n",
" val_dsk[\"combine\"] = (_combine, [(name1, i) for i, key in enumerate(df_keys)])\n",
" dsk = merge(df.dask, dtype_dsk, val_dsk)\n",
" return dsk \n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"925 ms ± 16 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"dsk = _partition_quantiles(gddf[\"id\"], gddf.npartitions)\n",
"%timeit get(dsk, \"combine\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment