-
-
Save mmuru/0d194ce09678e1ddd8515078276e12ac to your computer and use it in GitHub Desktop.
ray_dataset_partition_pickling_issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The following code works on ray 1.10.0 and 1.9.2 but does not work on ray2.0.0.dev0 [checked on 2022-02-16 4:45AM PST] | |
# mac | |
# pip install -U 'ray[default]@https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-macosx_10_15_intel.whl | |
# pip install pyarrow | |
# pip install pandas | |
# linux | |
# pip install -U 'ray[default]@https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl' | |
# pip install pyarrow | |
# pip install pandas | |
import ray | |
import pyarrow as pa | |
import pandas as pd | |
from pyarrow import dataset as ds | |
import pyarrow.parquet as pq | |
df = pd.DataFrame( | |
{ | |
"year": [2021, 2021, 2022, 2022], | |
"month": [1, 2, 1, 2], | |
"name": ["name1", "name2", "name3", "name4"], | |
"enabled": [True, False, False, False], | |
} | |
) | |
print("df:", df) | |
table = pa.Table.from_pandas(df) | |
table_location = "ray_test_ds" | |
table_location_part = "ray_test_ds_part" | |
print("Writing to non-partitioned parquet: ", table_location) | |
pq.write_to_dataset(table, root_path=table_location) | |
print("Writing to partitioned parquet: ", table_location_part) | |
pq.write_to_dataset( | |
table, root_path=table_location_part, partition_cols=["year", "month"] | |
) | |
print("Reading non-partitioned parquet: ", table_location) | |
ray_dataset = ray.data.read_parquet(table_location) | |
ray_dataset.show() | |
print("***************************************************************") | |
print( | |
"Reading partitioned parquet without passing partition info: ", table_location_part | |
) | |
ray_dataset_part1 = ray.data.read_parquet(table_location_part) | |
ray_dataset_part1.show() | |
print("***************************************************************") | |
print("Reading partitioned parquet with partition info: ", table_location_part) | |
schema_columns_list = [("year", pa.int32()), ("month", pa.int32())] | |
partitioning = ds.partitioning(pa.schema(schema_columns_list), flavor="hive") | |
arrow_parquet_args = {"dataset_kwargs": {"partitioning": partitioning}} | |
ray_dataset_part2 = ray.data.read_parquet(table_location_part, **arrow_parquet_args) | |
ray_dataset_part2.show() | |
# pickling error in ray-2.0.0-dev | |
""" | |
Traceback (most recent call last): | |
File "python/ray/_raylet.pyx", line 415, in ray._raylet.prepare_args_internal | |
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 412, in serialize | |
return self._serialize_to_msgpack(value) | |
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 391, in _serialize_to_msgpack | |
metadata, python_objects | |
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 352, in _serialize_to_pickle5 | |
raise e | |
File "/usr/local/lib/python3.7/site-packages/ray/serialization.py", line 348, in _serialize_to_pickle5 | |
value, protocol=5, buffer_callback=writer.buffer_callback | |
File "/usr/local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps | |
cp.dump(obj) | |
File "/usr/local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump | |
return Pickler.dump(self, obj) | |
File "stringsource", line 2, in pyarrow._dataset.HivePartitioning.__reduce_cython__ | |
TypeError: self.hive_partitioning,self.partitioning,self.wrapped cannot be converted to a Python object for pickling | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment