Skip to content

Instantly share code, notes, and snippets.

@mmuru
Created February 16, 2022 16:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmuru/0d194ce09678e1ddd8515078276e12ac to your computer and use it in GitHub Desktop.
Save mmuru/0d194ce09678e1ddd8515078276e12ac to your computer and use it in GitHub Desktop.
ray_dataset_partition_pickling_issue
# The following code works on ray 1.10.0 and 1.9.2 but does not work on ray2.0.0.dev0 [checked on 2022-02-16 4:45AM PST]
# mac
# pip install -U 'ray[default]@https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-macosx_10_15_intel.whl
# pip install pyarrow
# pip install pandas
# linux
# pip install -U 'ray[default]@https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl'
# pip install pyarrow
# pip install pandas
import ray
import pyarrow as pa
import pandas as pd
from pyarrow import dataset as ds
import pyarrow.parquet as pq
df = pd.DataFrame(
{
"year": [2021, 2021, 2022, 2022],
"month": [1, 2, 1, 2],
"name": ["name1", "name2", "name3", "name4"],
"enabled": [True, False, False, False],
}
)
print("df:", df)
table = pa.Table.from_pandas(df)
table_location = "ray_test_ds"
table_location_part = "ray_test_ds_part"
print("Writing to non-partitioned parquet: ", table_location)
pq.write_to_dataset(table, root_path=table_location)
print("Writing to partitioned parquet: ", table_location_part)
pq.write_to_dataset(
table, root_path=table_location_part, partition_cols=["year", "month"]
)
print("Reading non-partitioned parquet: ", table_location)
ray_dataset = ray.data.read_parquet(table_location)
ray_dataset.show()
print("***************************************************************")
print(
"Reading partitioned parquet without passing partition info: ", table_location_part
)
ray_dataset_part1 = ray.data.read_parquet(table_location_part)
ray_dataset_part1.show()
print("***************************************************************")
print("Reading partitioned parquet with partition info: ", table_location_part)
schema_columns_list = [("year", pa.int32()), ("month", pa.int32())]
partitioning = ds.partitioning(pa.schema(schema_columns_list), flavor="hive")
arrow_parquet_args = {"dataset_kwargs": {"partitioning": partitioning}}
ray_dataset_part2 = ray.data.read_parquet(table_location_part, **arrow_parquet_args)
ray_dataset_part2.show()
# pickling error in ray-2.0.0-dev
"""
Traceback (most recent call last):
File "python/ray/_raylet.pyx", line 415, in ray._raylet.prepare_args_internal
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 412, in serialize
return self._serialize_to_msgpack(value)
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 391, in _serialize_to_msgpack
metadata, python_objects
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 352, in _serialize_to_pickle5
raise e
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 348, in _serialize_to_pickle5
value, protocol=5, buffer_callback=writer.buffer_callback
File "/usr/local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump
return Pickler.dump(self, obj)
File "stringsource", line 2, in pyarrow._dataset.HivePartitioning.__reduce_cython__
TypeError: self.hive_partitioning,self.partitioning,self.wrapped cannot be converted to a Python object for pickling
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment