Skip to content

Instantly share code, notes, and snippets.

@ian-r-rose
Last active October 21, 2022 19:22
Show Gist options
  • Save ian-r-rose/41d5199412154faf1eff5a2df2e8b94e to your computer and use it in GitHub Desktop.
Save ian-r-rose/41d5199412154faf1eff5a2df2e8b94e to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "a1d7c324-3e74-4ea3-8760-f680d8ab027d",
"metadata": {
"tags": []
},
"source": [
"# PyArrow String Array Serialization\n",
"\n",
"Traditionally, pandas has used Python strings for storing string data in DataFrames/Series\n",
"due to NumPy's lack of support for string dtypes.\n",
"However, recent versions of pandas now support Series backed by PyArrow strings.\n",
"PyArrow strings are intended to be much faster and have a much lighter memory footprint than Python strings,\n",
"especially for array-based data.\n",
"\n",
"However, there is a somewhat long-standing [bug](https://issues.apache.org/jira/browse/ARROW-10739) in PyArrow\n",
"which prevents efficient slicing of PyArrow string arrays, and limits their usefulness in tools like Dask.\n",
"By default, a string array slice serializes the entre buffer, making it quite expensive to distribute.\n",
"\n",
"Hopefully this PyArrow bug will be rectified some time soon,\n",
"but in the meantime a number of downstream libraries have attempted to work around it, including:\n",
"\n",
"* `dask`: [registers a copyreg serializer for `ArrowStringArrays`](https://github.com/dask/dask/pull/9024/files)\n",
"* `vaex`: [uses pyarrow's IPC](https://github.com/vaexio/vaex/blob/caed2cf106007c6a0141a02a5dfdf823fa38799e/packages/vaex-core/vaex/arrow/convert.py#L220-L262) to trim the buffers\n",
"* `pandas`: [Sets a custom reducer for `ArrowStringArrays`](https://github.com/pandas-dev/pandas/pull/49078/files) (merged this morning!)\n",
"\n",
"Given the changing landscape of PyArrow strings, we should check on the current state of things,\n",
"and what approach we should take in the future. A few questions I would like to evaluate:\n",
"\n",
"1. Does the current Dask approach work as intended? Do we see the correct slicing and scaling performance, including at scale?\n",
"1. The brand-new pandas fix should be in the next major release (2.0). Does it work well for Dask?\n",
"1. If the pandas fix works well, should we remove Dask's copyreg? Do they conflict at all?"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "9808574d-b86e-4195-a37a-9f3c26f82261",
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"\n",
"# Ensure we start with 1.5.0\n",
"%pip install pandas==1.5.0\n",
"\n",
"import IPython\n",
"IPython.get_ipython().kernel.do_shutdown(restart=True)"
]
},
{
"cell_type": "markdown",
"id": "3e86d944-d9be-4f72-b75a-6bb369b4f1fe",
"metadata": {},
"source": [
"## Reproduce the broken PyArrow State and confirm copyreg approach"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "c50b16b3-8d55-42d3-ba1f-70c5ccaf087e",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'1.5.0'"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import copyreg\n",
"import contextlib\n",
"\n",
"import dask\n",
"import dask.dataframe as dd # Trigger copyreg\n",
"import pandas\n",
"\n",
"pandas.__version__"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "73248c0b-65c7-42f3-9f1b-b2a2e5a23ca4",
"metadata": {},
"outputs": [],
"source": [
"# Remove the dask copyreg\n",
"\n",
"@contextlib.contextmanager\n",
"def disable_copyreg(klass):\n",
" disp = copyreg.dispatch_table.pop(klass, None)\n",
" try:\n",
" yield\n",
" finally:\n",
" copyreg.dispatch_table[klass] = disp"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "568e9882-a4bd-40a5-ab3f-8b225817d356",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0 ACTwdgXFQdhYmJyRpstdSaQoLMfRFBdBadSFUdjnMgLoFT...\n",
"1 ZOKJxDwgVxUmcckhmBnRmFPhmgXFtPXoSGOuJqHXjuJoxo...\n",
"2 enWPALborbSrBXSascFPuXcFaZtqimqMhUOeeaJUsBLvpA...\n",
"3 YvkriIFHHOJfYaLpiZJEbrKmhLEFtKxBOUmWGbUZCjnhWw...\n",
"4 NqjlVbUfNxJRWBjtHZVCOIIcgwnsCwJbEzYnUsPHEiBBKP...\n",
"Name: data, dtype: string"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pickle\n",
"import random\n",
"import string\n",
"\n",
"# Create ~50 MiB of sample string data\n",
"s = pandas.Series(\n",
" [\n",
" \"\".join(random.choices(string.ascii_letters, k=random.randint(100, 1000)))\n",
" for _ in range(100_000)\n",
" ],\n",
" dtype=\"string[pyarrow]\",\n",
" name=\"data\",\n",
")\n",
"s.head()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "380e35a9-0e1b-4e1c-bb97-09924d78a5bd",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"52\n",
"52\n",
"5\n"
]
}
],
"source": [
"print(len(pickle.dumps(s)) // 1024**2) # 52 MiB\n",
"\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" print(len(pickle.dumps(s[:10_000])) // 1024**2) # 52 MiB, oops!\n",
" \n",
"print(len(pickle.dumps(s[:10_000])) // 1024**2) # 5 MiB, good"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "0735961f-a29f-4ad5-8f7c-f284fd68d767",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[52, 52, 52, 52]\n",
"[13, 13, 13, 13]\n"
]
}
],
"source": [
"# Now use Dask to partition the data. Each partition will get the full dataset!\n",
"@dask.delayed\n",
"def get_partition_serialized_size(df):\n",
" return len(pickle.dumps(df)) // 1024**2\n",
"\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" result, = dask.compute(\n",
" [get_partition_serialized_size(p) for p in dd.from_pandas(s, npartitions=4).to_delayed()],\n",
" scheduler=\"processes\",\n",
" )\n",
" print(result) # [52, 52, 52, 52]\n",
"\n",
"result, = dask.compute(\n",
" [get_partition_serialized_size(p) for p in dd.from_pandas(s, npartitions=4).to_delayed()],\n",
" scheduler=\"processes\",\n",
")\n",
"print(result) # [13, 13, 13, 13]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "145e4fef-1b1f-4095-a6fb-87db10ad061d",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Do we get the same data back after round-tripping?\n",
"dd.assert_eq(s, dd.from_pandas(s, npartitions=4), scheduler=\"processes\")\n",
"dd.assert_eq(\n",
" s.str.upper(), \n",
" dd.from_pandas(s, npartitions=4).str.upper(),\n",
" scheduler=\"processes\",\n",
")\n",
"\n",
"dd.assert_eq(\n",
" s[50_000:].str.slice(0,10), \n",
" dd.from_pandas(s, npartitions=4)[50_000:].str.slice(0,10),\n",
" scheduler=\"processes\",\n",
")"
]
},
{
"cell_type": "markdown",
"id": "f29b6710-bc75-46ed-95e9-733cbbbe266b",
"metadata": {
"tags": []
},
"source": [
"## How does this affect things in a distributed context?\n",
"\n",
"Let's try a shuffling workload, which should involve a lot of sharding of the arrays."
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "00dce09f-45db-4c97-8cfe-20a09b8b70a8",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'1.5.0'"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.dataframe as dd\n",
"import distributed\n",
"import pandas\n",
"import numpy\n",
"\n",
"pandas.__version__"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "b5d36579-c141-4bda-94c7-237108df8fa2",
"metadata": {},
"outputs": [],
"source": [
"client = distributed.Client()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "0b9e3a2b-2931-422a-900c-cb5bb216be15",
"metadata": {},
"outputs": [],
"source": [
"def disable_copyreg():\n",
" import copyreg\n",
" import dask.dataframe\n",
" del copyreg.dispatch_table[pandas.arrays.ArrowStringArray]\n",
" return copyreg.dispatch_table.get(pandas.arrays.ArrowStringArray)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "88b57b50-7693-419d-b1d5-f8e3d5b9a566",
"metadata": {},
"outputs": [],
"source": [
"import random\n",
"import string\n",
"\n",
"def make_partition(_=None):\n",
" # Create ~50 MiB of sample string data\n",
" s1 = pandas.Series(\n",
" [\n",
" \"\".join(random.choices(string.ascii_letters, k=random.randint(100, 1000)))\n",
" for _ in range(1_000_000)\n",
" ],\n",
" dtype=\"string[pyarrow]\",\n",
" name=\"label\",\n",
" )\n",
" df = pandas.DataFrame(numpy.random.randint(0, 100, size=(len(s1), 10)))\n",
" df.insert(0, \"label\", s1)\n",
" return df\n",
"meta = make_partition()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "88aaa6a5-78a2-4c59-8536-dae189ac0d90",
"metadata": {},
"outputs": [],
"source": [
"ddf = dd.from_pandas(meta, npartitions=100)\n",
"#ddf = dd.from_map(make_partition, range(200), meta=meta)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "091d4537-aea1-4a3a-b60d-c844a3bed468",
"metadata": {},
"outputs": [],
"source": [
"sampler = distributed.diagnostics.MemorySampler()\n",
"\n",
"with sampler.sample(\"copyreg\"):\n",
" distributed.wait(ddf.set_index(\"label\").persist())"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "42a8d9e6-65e5-41c6-97ad-557d328e26f7",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'tcp://127.0.0.1:35063': None,\n",
" 'tcp://127.0.0.1:39933': None,\n",
" 'tcp://127.0.0.1:40263': None,\n",
" 'tcp://127.0.0.1:41969': None}"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client.restart()\n",
"client.run(disable_copyreg)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "35342c0e-3820-4c25-8637-fd4504eef1fa",
"metadata": {},
"outputs": [],
"source": [
"with sampler.sample(\"nocopyreg\"):\n",
" distributed.wait(ddf.set_index(\"label\").persist())"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "809100b4-7aa3-43f2-805e-2834ca81c130",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<AxesSubplot:xlabel='0'>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 640x480 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"sampler.to_pandas().plot()"
]
},
{
"cell_type": "markdown",
"id": "ac77cec5-a4d0-41cb-8bdc-d293485ce417",
"metadata": {
"tags": []
},
"source": [
"## Try with pandas 2.0dev"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "21847024-33f5-4b33-aca8-1296081ff228",
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"%pip install https://anaconda.org/scipy-wheels-nightly/pandas/2.0.0.dev0%2B404.g890d097534/download/pandas-2.0.0.dev0%2B404.g890d097534-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl\n",
"\n",
"import IPython\n",
"IPython.get_ipython().kernel.do_shutdown(restart=True)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "0bf0f8fe-970b-4fb6-88e8-d8427be48cba",
"metadata": {},
"outputs": [],
"source": [
"import copyreg\n",
"import contextlib\n",
"\n",
"import dask\n",
"import dask.dataframe as dd # Trigger copyreg\n",
"import pandas"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "7ebe26e5-f10f-4546-a413-e21e44a86769",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'2.0.0.dev0+404.g890d097534'"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pandas.__version__"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "fe1c677f-4570-4681-aad7-99e88083c276",
"metadata": {},
"outputs": [],
"source": [
"# Remove the dask copyreg\n",
"\n",
"@contextlib.contextmanager\n",
"def disable_copyreg(klass):\n",
" disp = copyreg.dispatch_table.pop(klass, None)\n",
" try:\n",
" yield\n",
" finally:\n",
" copyreg.dispatch_table[klass] = disp"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "72cc02fb-2642-4cb9-8caa-5a136da4e23e",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0 hgyKNYMNNjZYniPdgnBqhTkUYTsdsCBQFFdOaklVZlGAXt...\n",
"1 zzddGphWWZtbImSGkUyhcHRkYGyUXlSlGKLKireQqynVir...\n",
"2 LgYaqaYOVhnldETsnSeGEuWkUZqRDLSENMCLydBulxcIFi...\n",
"3 pnrvaWgLuaqrIUzrqiVwBImxuRKdYKocwgTfXgkmZWJTVT...\n",
"4 jqfurqjahQNEhlZEijOmeOauUCNJbnOXLFjfiBFpYmSABU...\n",
"Name: data, dtype: string"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pickle\n",
"import random\n",
"import string\n",
"\n",
"# Create ~50 MiB of sample string data\n",
"s = pandas.Series(\n",
" [\n",
" \"\".join(random.choices(string.ascii_letters, k=random.randint(100, 1000)))\n",
" for _ in range(100_000)\n",
" ],\n",
" dtype=\"string[pyarrow]\",\n",
" name=\"data\",\n",
")\n",
"s.head()"
]
},
{
"cell_type": "markdown",
"id": "c372c5df-4adc-44a2-abd8-ab0894130aee",
"metadata": {},
"source": [
"If we disable the copyreg approach with pandas `main`, things seem to work correctly!"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "e8de7d90-a578-4a10-bdb9-913893e27030",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"52\n",
"5\n"
]
}
],
"source": [
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" print(len(pickle.dumps(s)) // 1024**2) # 52 MiB \n",
" print(len(pickle.dumps(s[:10_000])) // 1024**2) # 5 MiB, good"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "ffce15af-30c0-42d0-a634-4658e47c2558",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[13, 13, 13, 13]\n"
]
}
],
"source": [
"# Now use Dask to partition the data.\n",
"@dask.delayed\n",
"def get_partition_serialized_size(df):\n",
" return len(pickle.dumps(df)) // 1024**2\n",
"\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" result, = dask.compute(\n",
" [get_partition_serialized_size(p) for p in dd.from_pandas(s, npartitions=4).to_delayed()],\n",
" scheduler=\"processes\",\n",
" )\n",
" print(result) # [13, 13, 13, 13]"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "b8c8fc7f-5abf-449c-b250-d56c636a5a23",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pickling using dask\n",
"5\n"
]
}
],
"source": [
"# Which takes priority?\n",
"disp = copyreg.dispatch_table.pop(pandas.arrays.ArrowStringArray, None)\n",
"try:\n",
" def wrap(*args, **kwargs):\n",
" print(\"pickling using dask\")\n",
" return disp(*args, **kwargs)\n",
" copyreg.dispatch_table[pandas.arrays.ArrowStringArray] = wrap\n",
" \n",
" print(len(pickle.dumps(s[:10_000])) // 1024**2) # 5 MiB, good\n",
"\n",
"finally:\n",
" copyreg.dispatch_table[pandas.arrays.ArrowStringArray] = disp\n"
]
},
{
"cell_type": "markdown",
"id": "e030a568-42af-4fb5-afe9-1358361ff493",
"metadata": {},
"source": [
"The `dask` copyreg takes priority over the pandas reducer: as soon as this is released we should probably disable the dask version in favor of the upstream one."
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "c4f4e034-2095-49f7-aa15-732bbcf94720",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Do the dask and pandas approaches get the same answer?\n",
"\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" a = dd.from_pandas(s, npartitions=4).str.upper().compute(scheduler=\"processes\")\n",
"b = dd.from_pandas(s, npartitions=4).str.upper().compute(scheduler=\"processes\")\n",
"\n",
"dd.assert_eq(a, b)\n",
"\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" a = dd.from_pandas(s, npartitions=4)[50_000:].str.slice(0,10).compute(scheduler=\"processes\")\n",
"b = dd.from_pandas(s, npartitions=4)[50_000:].str.slice(0,10).compute(scheduler=\"processes\")\n",
"\n",
"dd.assert_eq(a, b)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "53691870-53ef-4d61-8ae7-4ae1dbaa79d1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15 ms ± 322 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n",
"9.92 ms ± 361 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"# Which approach is faster?\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" %timeit pickle.dumps(s)\n",
"%timeit pickle.dumps(s)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "e90db760-9b90-4c6c-9056-b0826dcc3685",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pandas 55275647\n",
"Dask 55275489\n"
]
}
],
"source": [
"# Which approach is more space efficient?\n",
"with disable_copyreg(pandas.arrays.ArrowStringArray):\n",
" print(\"Pandas \", len(pickle.dumps(s)))\n",
"print(\"Dask \", len(pickle.dumps(s)))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment