Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Created December 20, 2023 18:01
Show Gist options
  • Save rjzamora/2c55d7e1c614b7df8f27dd92205a43fa to your computer and use it in GitHub Desktop.
Save rjzamora/2c55d7e1c614b7df8f27dd92205a43fa to your computer and use it in GitHub Desktop.
Experimenting with simpler ``blocksize`` logic for ``read_parquet``
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "e25aa48f-d4c4-498d-b30a-0ff0704a7fce",
"metadata": {},
"outputs": [],
"source": [
"import math\n",
"import dask_expr as dx\n",
"import pyarrow.dataset as ds\n",
"from fsspec.core import get_fs_token_paths\n",
"from dask.utils import parse_bytes\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "53fe730c-b21e-4c43-a792-73f7576c2b9b",
"metadata": {},
"outputs": [],
"source": [
"###\n",
"### Helper functions\n",
"###\n",
"\n",
"# Main function to construct a single DataFrame partition\n",
"def read_partition(\n",
" partition_info,\n",
" columns=None,\n",
" filters=None,\n",
" filesystem=None,\n",
" dataset_kwargs=None,\n",
" convert_kwargs=None,\n",
"):\n",
" # MAIN IDEA:\n",
" # We shouldn't need to parse parquet metadata ahead of time to\n",
" # aggregate files or split files when defining the mapping\n",
" # between files and output partitions. To aggregate files,\n",
" # we can simply assing a list of paths to a given partitions.\n",
" # To split files, we can the same path to multiple partitions.\n",
" # This is fine, as long as `read_partition` knows how many\n",
" # other partitions are being generated from the same path\n",
" # (`num_ranks`), and the index of THIS partition (`rank`).\n",
" # This way, the necessary metadata can be parsed at IO time.\n",
" # (where the metadata is already parsed anyway)\n",
" paths, rank, num_ranks = partition_info\n",
"\n",
" if num_ranks > 1:\n",
" # Probably no good reason to support this.\n",
" # We should either split a file or map multiple files to a partition\n",
" assert len(paths) == 1\n",
"\n",
" dataset = ds.dataset(paths, filesystem=filesystem, **(dataset_kwargs or {}))\n",
" if num_ranks == 1:\n",
" # Simple case: This rank is reading all of dataset\n",
" table = dataset.to_table(\n",
" columns=columns,\n",
" filter=filters,\n",
" )\n",
" else:\n",
" # Other case: This rank is responsible for a subset of dataset\n",
" table = _get_partial_table(\n",
" dataset,\n",
" rank,\n",
" num_ranks,\n",
" columns,\n",
" filters,\n",
" )\n",
"\n",
" # Convert to pandas\n",
" return table.to_pandas(**(convert_kwargs or {}))\n",
"\n",
"\n",
"def _get_partial_table(\n",
" dataset,\n",
" rank,\n",
" num_ranks,\n",
" columns,\n",
" filters,\n",
"):\n",
" # This is used by `read_partition` (above)\n",
" # NOTE: dataset must be a single file (for now)\n",
" file_fragment = next(dataset.get_fragments())\n",
" num_row_groups = file_fragment.num_row_groups\n",
" rg_stride = math.ceil(num_row_groups / num_ranks)\n",
" rg_start = rg_stride * rank\n",
" rg_end = min(rg_start + rg_stride, num_row_groups)\n",
" if rg_end > rg_start:\n",
" fragment = file_fragment.format.make_fragment(\n",
" # TODO: Do these args cover everything?\n",
" file_fragment.path,\n",
" file_fragment.filesystem,\n",
" file_fragment.partition_expression,\n",
" row_groups=range(rg_start, rg_end),\n",
" )\n",
" table = fragment.to_table(columns=columns, filter=filters)\n",
" else:\n",
" # Nothing to read for this task?\n",
" table = dataset.schema.empty_table()\n",
" return table\n",
"\n",
"\n",
"# Aggregate small files\n",
"def aggregate_small_partitions(partitions, blocksize, columns, column_factors, file_sizes):\n",
" if blocksize is None:\n",
" return partitions\n",
"\n",
" size_factor = column_factors[columns].sum() if columns else column_factors.sum()\n",
" aggregated_partitions = []\n",
" blocksize = parse_bytes(blocksize)\n",
" group = None\n",
" group_size = 0\n",
" for part in partitions:\n",
" paths = part[0]\n",
" assert len(paths) > 0\n",
" size = 0\n",
" for path in paths:\n",
" size += size_factor * file_sizes.get(path, blocksize)\n",
" if group is None:\n",
" group = paths.copy()\n",
" group_size = size\n",
" elif group_size + size > blocksize:\n",
" aggregated_partitions.append((group, 0, 1))\n",
" group = paths.copy()\n",
" group_size = size\n",
" else:\n",
" # TODO: May need to enforce other \"aggregation rules\" here\n",
" group += paths.copy()\n",
" group_size += size\n",
" aggregated_partitions.append((group, 0, 1))\n",
" return aggregated_partitions\n",
"\n",
"\n",
"# Split large files\n",
"def split_large_partitions(partitions, blocksize, columns, column_factors, file_sizes):\n",
" if blocksize is None:\n",
" return partitions\n",
" size_factor = column_factors[columns].sum() if columns else column_factors.sum()\n",
" split_partitions = []\n",
" blocksize = parse_bytes(blocksize)\n",
" for part in partitions:\n",
" paths = part[0]\n",
" if len(paths) > 1:\n",
" split_partitions.append(part)\n",
" continue\n",
" assert len(paths) == 1\n",
" size = size_factor * file_sizes.get(paths[0], blocksize)\n",
" if size > blocksize:\n",
" # This is a large file. Split it into multiple partitions\n",
" num_ranks = round(size / blocksize)\n",
" for rank in range(num_ranks):\n",
" split_partitions.append((paths, rank, num_ranks))\n",
" else:\n",
" split_partitions.append(part)\n",
" return split_partitions"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "3beb6900-fa8f-4619-afaa-f987f3142458",
"metadata": {},
"outputs": [],
"source": [
"# Define rough `read_parquet` API\n",
"def read_parquet(\n",
" urlpath,\n",
" blocksize=\"128 MiB\",\n",
" columns=None,\n",
" storage_options=None,\n",
" dataset_kwargs=None,\n",
"):\n",
" ###\n",
" ### STEP 1: Extract filesystem and PyArrow Dataset\n",
" ###\n",
" storage_options = storage_options or {}\n",
" # TODO: Handle PyArrow filesystem\n",
" fs, _, paths = get_fs_token_paths(\n",
" urlpath, mode=\"rb\", storage_options=storage_options\n",
" )\n",
" # TODO: Make sure we correctly handle dir/path/list/etc.\n",
" dataset = ds.dataset(paths[0], filesystem=fs, **(dataset_kwargs or {}))\n",
" meta = dataset.schema.empty_table().to_pandas()\n",
" \n",
" ###\n",
" ### STEP 2: Perform default partitioning\n",
" ###\n",
" # NOTE: When this is written as a dedicated Expr,\n",
" # this step won't be necessary until graph construction\n",
" partitions = [([path], 0, 1) for path in dataset.files]\n",
" \n",
" ###~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n",
" ### Handle blocksize optimizations\n",
" ###~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n",
" if blocksize is not None:\n",
" # NOTE: When this is written as a dedicated Expr,\n",
" # this step won't be necessary until graph construction\n",
"\n",
" # Get file sizes from the filesystem\n",
" # (this is MUCH faster than parsing parquet metadata)\n",
" file_sizes = {path: fs.info(path)[\"size\"] for path in dataset.files}\n",
" fn, du = next(iter(file_sizes.items()))\n",
"\n",
" # Use a single file to calucluate a rough relationship\n",
" # between columns and uncompressed storage size\n",
" for file_frag in dataset.get_fragments():\n",
" total_uncompressed_file_size = 0\n",
" column_factors = {name: 0 for name in dataset.schema.names}\n",
" for row_group in file_frag.row_groups:\n",
" for cc in range(row_group.metadata.num_columns):\n",
" col_chunk = row_group.metadata.column(cc)\n",
" name = col_chunk.path_in_schema\n",
" col_chunk_size = col_chunk.total_uncompressed_size\n",
" column_factors[name] += col_chunk_size\n",
" total_uncompressed_file_size += col_chunk_size\n",
" break # Only want to sample the first file\n",
" expansion_factor = total_uncompressed_file_size / du\n",
" for column_name in list(column_factors.keys()):\n",
" column_factors[column_name] /= total_uncompressed_file_size\n",
" column_factors[column_name] *= expansion_factor\n",
" # `column_factors` now provides a reasonable estimate for how\n",
" # much each file will expand in memory from each column.\n",
" # For example, if we want to know how much a 1MiB file is\n",
" # likely to expand in memory, we can do the following:\n",
" # `int(parse_bytes(\"1MiB\") * column_factors[columns].sum())`\n",
" column_factors = pd.Series(column_factors)\n",
"\n",
" # Apply optimizations to modify the default `partitions`\n",
" partitions = aggregate_small_partitions(partitions, blocksize, columns, column_factors, file_sizes)\n",
" partitions = split_large_partitions(partitions, blocksize, columns, column_factors, file_sizes)\n",
"\n",
" # Using `from_map` for now, but this would need to be a new `Expr`\n",
" # to delay the `blocksize` optimizations above from running until\n",
" # after column projection\n",
" return dx.from_map(read_partition, partitions, columns=columns, filesystem=fs, meta=meta, enforce_metadata=False)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "9d0f0784-363b-4588-af51-4c1d218d45c2",
"metadata": {},
"outputs": [],
"source": [
"if True:\n",
" # Remote data\n",
" path = \"s3://coiled-data/tpch/scale-1000/customer\"\n",
" storage_options = {\"anon\": True}\n",
" columns = [\"c_custkey\", \"c_address\"]\n",
" blocksize = \"32MiB\"\n",
"else:\n",
" # Local data\n",
" path = \"/datasets/rzamora/crit_pq_int\"\n",
" storage_options = None\n",
" columns = [\"C1\", \"C2\"]\n",
" blocksize = \"128MiB\""
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "88533ddf-c002-4ef9-965d-65532f740d9b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"partition size: 150000\n",
"CPU times: user 1.28 s, sys: 351 ms, total: 1.63 s\n",
"Wall time: 4.49 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"df = read_parquet(path, blocksize=blocksize, storage_options=storage_options)\n",
"print(f\"partition size: {len(df.partitions[0].compute())}\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "f1133022-7eb3-4311-90c8-c5f427565ed7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"partition size: 900000\n",
"CPU times: user 931 ms, sys: 222 ms, total: 1.15 s\n",
"Wall time: 3.17 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# When columns are projected, we can increase the partiton size\n",
"df2 = read_parquet(path, columns=columns, blocksize=blocksize, storage_options=storage_options)\n",
"print(f\"partition size: {len(df2.partitions[0].compute())}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6c6e961c-89a7-43a3-b016-28c2ceaf51d7",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment