Skip to content

Instantly share code, notes, and snippets.

@bveeramani
Created September 11, 2022 06:26
Show Gist options
  • Save bveeramani/750e424f62c50cf50e981cb93fdb23be to your computer and use it in GitHub Desktop.
Save bveeramani/750e424f62c50cf50e981cb93fdb23be to your computer and use it in GitHub Desktop.
read_tf_records bug
============================= test session starts ==============================
platform darwin -- Python 3.9.12, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/bveeramani/GitHub/ray/python
plugins: anyio-3.6.1, lazy-fixture-0.6.3
collected 1 item
python/ray/data/tests/test_dataset_formats.py F [100%]
=================================== FAILURES ===================================
_____________________________ test_read_tf_records _____________________________
ray_start_regular_shared = RayContext(dashboard_url='127.0.0.1:8265', python_version='3.9.12', ray_version='3.0.0.dev0', ray_commit='{{RAY_COMMIT...1:63110', 'dashboard_agent_listen_port': 52365, 'node_id': 'b54ebfb4d748c8ad87515e023baf6fd414fca4cd9d6ef2e9b3413c8a'})
tmp_path = PosixPath('/private/var/folders/j8/553rq3812fv573lghv2m98n80000gn/T/pytest-of-bveeramani/pytest-120/test_read_tf_records0')
def test_read_tf_records(ray_start_regular_shared, tmp_path):
import tensorflow as tf
features = tf.train.Features(
feature={
"int64": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])),
"int64_list": tf.train.Feature(
int64_list=tf.train.Int64List(value=[1, 2, 3, 4])
),
"float": tf.train.Feature(float_list=tf.train.FloatList(value=[1.0])),
"float_list": tf.train.Feature(
float_list=tf.train.FloatList(value=[1.0, 2.0, 3.0, 4.0])
),
"bytes": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"abc"])),
"bytes_list": tf.train.Feature(
bytes_list=tf.train.BytesList(value=[b"abc", b"1234"])
),
}
)
example = tf.train.Example(features=features)
path = os.path.join(tmp_path, "data.tfrecords")
with tf.io.TFRecordWriter(path=path) as writer:
writer.write(example.SerializeToString())
> ds = ray.data.read_tf_records(path)
python/ray/data/tests/test_dataset_formats.py:3302:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
python/ray/data/read_api.py:776: in read_tf_records
return read_datasource(
python/ray/data/read_api.py:315: in read_datasource
block_list.ensure_metadata_for_first_block()
python/ray/data/_internal/lazy_block_list.py:373: in ensure_metadata_for_first_block
metadata = ray.get(metadata_ref)
python/ray/_private/client_mode_hook.py:105: in wrapper
return func(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
object_refs = [ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000002000000)]
@PublicAPI
@client_mode_hook(auto_init=True)
def get(
object_refs: Union[ray.ObjectRef, Sequence[ray.ObjectRef]],
*,
timeout: Optional[float] = None,
) -> Union[Any, List[Any]]:
"""Get a remote object or a list of remote objects from the object store.
This method blocks until the object corresponding to the object ref is
available in the local object store. If this object is not in the local
object store, it will be shipped from an object store that has it (once the
object has been created). If object_refs is a list, then the objects
corresponding to each object in the list will be returned.
Ordering for an input list of object refs is preserved for each object
returned. That is, if an object ref to A precedes an object ref to B in the
input list, then A will precede B in the returned list.
This method will issue a warning if it's running inside async context,
you can use ``await object_ref`` instead of ``ray.get(object_ref)``. For
a list of object refs, you can use ``await asyncio.gather(*object_refs)``.
Args:
object_refs: Object ref of the object to get or a list of object refs
to get.
timeout (Optional[float]): The maximum amount of time in seconds to
wait before returning.
Returns:
A Python object or a list of Python objects.
Raises:
GetTimeoutError: A GetTimeoutError is raised if a timeout is set and
the get takes longer than timeout to return.
Exception: An exception is raised if the task that created the object
or that created one of the objects raised an exception.
"""
worker = global_worker
worker.check_connected()
if hasattr(worker, "core_worker") and worker.core_worker.current_actor_is_asyncio():
global blocking_get_inside_async_warned
if not blocking_get_inside_async_warned:
logger.warning(
"Using blocking ray.get inside async actor. "
"This blocks the event loop. Please use `await` "
"on object ref with asyncio.gather if you want to "
"yield execution to the event loop instead."
)
blocking_get_inside_async_warned = True
with profiling.profile("ray.get"):
is_individual_id = isinstance(object_refs, ray.ObjectRef)
if is_individual_id:
object_refs = [object_refs]
if not isinstance(object_refs, list):
raise ValueError(
"'object_refs' must either be an object ref "
"or a list of object refs."
)
# TODO(ujvl): Consider how to allow user to retrieve the ready objects.
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
for i, value in enumerate(values):
if isinstance(value, RayError):
if isinstance(value, ray.exceptions.ObjectLostError):
worker.core_worker.dump_object_store_memory_usage()
if isinstance(value, RayTaskError):
> raise value.as_instanceof_cause()
E ray.exceptions.RayTaskError(ReferenceError): ray::_execute_read_task() (pid=40369, ip=127.0.0.1)
E File "/Users/bveeramani/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps
E cp.dump(obj)
E File "/Users/bveeramani/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump
E return Pickler.dump(self, obj)
E File "/Users/bveeramani/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 714, in reducer_override
E elif isinstance(obj, types.FunctionType):
E ReferenceError: weakly-referenced object no longer exists
python/ray/_private/worker.py:2279: RayTaskError(ReferenceError)
---------------------------- Captured stderr setup -----------------------------
2022-09-10 23:26:14,221 INFO worker.py:1508 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
----------------------------- Captured stdout call -----------------------------
(_execute_read_task pid=40369) Metal device set to: Apple M1 Pro
----------------------------- Captured stderr call -----------------------------
(_execute_read_task pid=40369) 2022-09-10 23:26:16.360923: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
(_execute_read_task pid=40369) 2022-09-10 23:26:16.361063: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)
=========================== short test summary info ============================
FAILED python/ray/data/tests/test_dataset_formats.py::test_read_tf_records - ray.exceptions.RayTaskError(ReferenceError): ray::_execute_read_task() (pid=40369, ip=127.0.0.1)
======================= 1 failed, 107 warnings in 8.75s ========================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment