Skip to content

Instantly share code, notes, and snippets.

@stephanie-wang
Created June 9, 2020 23:41
Show Gist options
  • Save stephanie-wang/d4533877c806be4cdf93ef716608d97b to your computer and use it in GitHub Desktop.
Save stephanie-wang/d4533877c806be4cdf93ef716608d97b to your computer and use it in GitHub Desktop.
Dependency resolution bug
@ray.remote
def f():
return 1
@ray.remote
def g(x):
return x + 1
x_id = f.remote() # ({'CPU': 1}, args=[])
g.remote(x_id) # ({'CPU': 1}, args=[x]) --> ({'CPU': 1}, args=[])
Currently:
- f gets sent to worker A, g pending dependency resolution
- receive f's reply ({x: 1})
- OnWorkerIdle(worker A) - no tasks in the queue
- task_finisher_->CompletePendingTask
- add {x: 1} into a in-process memory store
- dependency resolver inlines x into task g
- dependency resolver calls the callback for g, which adds g to the task queue
- request worker for g again
What we want:
- f gets sent to worker A, g pending dependency resolution
- receive f's reply ({x: 1})
- task_finisher_->CompletePendingTask
- add {x: 1} into a in-process memory store
- dependency resolver inlines x into task g
- dependency resolver calls the callback for g, which adds g to the task queue
- OnWorkerIdle(worker A) - g is in the task queue, so we can reuse worker A
Code:
- Main class for task submission: https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/direct_task_transport.cc
- Dependency resolution: https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/dependency_resolver.h
- Processing task reply and return values: https://github.com/ray-project/ray/blob/master/src/ray/core_worker/task_manager.h
- Unit tests: https://github.com/ray-project/ray/blob/master/src/ray/core_worker/test/direct_task_transport_test.cc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment