Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Created September 5, 2019 23:16
Show Gist options
  • Save rjzamora/7a7e836f278cfdac606716d054d86951 to your computer and use it in GitHub Desktop.
Save rjzamora/7a7e836f278cfdac606716d054d86951 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"from dask import delayed\n",
"import dask.dataframe as dd\n",
"import cudf\n",
"import cupy\n",
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"ddf = dask.datasets.timeseries(\n",
" start = \"2000-01-01\",\n",
" end = \"2000-01-31\",\n",
" freq = \"1S\",\n",
" partition_freq=\"1D\",\n",
" seed = 42,\n",
" id_lam=30,\n",
")\n",
"gddf = ddf.map_partitions(cudf.from_pandas)\n",
"\n",
"def _quantile_series_np(df, npartitions):\n",
" a = df[\"id\"].values\n",
" q = np.linspace(0, 100, num=npartitions, dtype=\"float64\")\n",
" pct = np.percentile(a, q, interpolation=\"linear\")\n",
" return cudf.Series(pct)\n",
"\n",
"def _quantile_series_cp(df, npartitions):\n",
" a = cupy.asarray(df[\"id\"]._column._data.mem)\n",
" q = cupy.linspace(0, 100, num=npartitions, dtype=\"float64\")\n",
" pct = cupy.percentile(a, q, interpolation=\"linear\")\n",
" return cudf.Series(pct)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.34 s ± 16.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"q = gddf.map_partitions(_quantile_series_np, gddf.npartitions)\n",
"%timeit q.compute()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.25 s ± 12.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"q = gddf.map_partitions(_quantile_series_cp, gddf.npartitions)\n",
"%timeit q.compute()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"4.27 ms ± 22.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"npartitions = 30\n",
"part_rows = 10000\n",
"\n",
"@delayed(pure=True)\n",
"def my_builder():\n",
" a = np.random.randint(1000, size=part_rows)\n",
" q = tuple(np.linspace(0, 100, num=npartitions, dtype=\"float64\").tolist())\n",
" pct = np.percentile(a, q, interpolation=\"linear\")\n",
" return pd.Series(pct)\n",
"\n",
"parts = []\n",
"for i in range(npartitions):\n",
" parts.append(my_builder())\n",
"df = dd.from_delayed(parts, meta=pd.Series([0]))\n",
"%timeit df.compute()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"19.4 ms ± 119 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"@delayed(pure=True)\n",
"def my_builder():\n",
" a = cupy.random.randint(1000, size=part_rows)\n",
" q = cupy.linspace(0, 100, num=npartitions, dtype=\"float64\")\n",
" cupy.cuda.stream.get_current_stream().synchronize()\n",
" pct = cupy.percentile(a, q, interpolation=\"linear\")\n",
" cupy.cuda.stream.get_current_stream().synchronize()\n",
" return cudf.Series(pct)\n",
"\n",
"parts = []\n",
"for i in range(npartitions):\n",
" parts.append(my_builder())\n",
"df = dd.from_delayed(parts, meta=cudf.Series([0]))\n",
"%timeit df.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment