Skip to content

Instantly share code, notes, and snippets.

@costrouc
Last active September 5, 2023 10:57
Show Gist options
  • Save costrouc/8126f6a8eef8be29de780bd964bc4ab0 to your computer and use it in GitHub Desktop.
Save costrouc/8126f6a8eef8be29de780bd964bc4ab0 to your computer and use it in GitHub Desktop.
from rq import Queue
from redis import Redis
import tasks
import time
import contextlib
@contextlib.contextmanager
def timer(message: str):
start_time = time.time()
yield
print(f'{message} took {time.time() - start_time:.2f} [s]')
redis_conn = Redis()
queue = Queue(connection=redis_conn)
with timer('submitting tasks'):
def special_function(a: int, b: int, c: str):
return str(a + b) + c
job = tasks.task_enqueue(queue, special_function, 1, 2, 'a')
# result = job.latest_result(timeout=60)
while job.get_status() != 'finished':
time.sleep(0.1)
print('result', job.result)
import time
from rq import Queue
from redis import Redis
from dask import delayed
import tasks
@delayed
def add(a, b):
print('add', a, b)
return a + b
@delayed
def multiply(a, b):
import time
print('multiply', a, b)
time.sleep(1)
return a * b
f = add(multiply(1, 2), add(3, multiply(4, 5)))
redis_conn = Redis()
queue = Queue(connection=redis_conn)
job = tasks.convert_dask_delayed_to_rq(f, queue)
while job.get_status() != 'finished':
time.sleep(0.1)
print('result', job.result)
version: '3'
services:
redis:
image: "redis:7"
command:
- redis-server
ports:
- "6379:6379"
name: rqcloud
channels:
- conda-forge
dependencies:
- rq
- redis-py
- cloudpickle
import cloudpickle
from rq import get_current_job
def convert_dask_delayed_to_rq(f, queue):
last_job = None
jobs = {}
for task_id in f.dask._toposort_layers():
depends_on = []
for dependency in f.dask.dependencies[task_id]:
depends_on.append(jobs[dependency])
_args = []
for arg in f.dask[task_id][1:]:
if arg in jobs:
_args.append(f'task-{arg}')
else:
_args.append(arg)
job = task_enqueue(
queue,
f.dask[task_id][0],
*_args,
depends_on=depends_on,
job_id=task_id
)
jobs[task_id] = job
last_job = job
return last_job
def run_task(pickled_function: bytes, *args, **kwargs):
current_job = get_current_job()
dependencies = {}
for dependency in current_job.fetch_dependencies():
dependencies[dependency.id] = dependency
print(dependencies)
_args = []
for arg in args:
if isinstance(arg, str) and arg.startswith('task-'):
_args.append(dependencies[arg[5:]].result)
else:
_args.append(arg)
f = cloudpickle.loads(pickled_function)
return f(*_args, **kwargs)
def task_enqueue(queue, f, *args, **kwargs):
pickled_function = cloudpickle.dumps(f)
job = queue.enqueue(run_task, pickled_function, *args, **kwargs)
return job
from rq import Connection, Queue, Worker
if __name__ == '__main__':
with Connection():
q = Queue()
Worker(q).work()
@pmeier
Copy link

pmeier commented Sep 5, 2023

Let's make this a little more honest, since this is for demonstration purposes

  1. Although we are enqueuing run_task here, internally rq just extracts the "path", i.e. "tasks.run_task" and puts that into the queue.
  2. The run_task function needs to be available on the worker, while task_enqueue needs to be available on the node that handles the queue. Unless we operate on a single node, it is unlikely that both have access to same local file.
diff --git a/tasks.py b/tasks.py
index 98f029c..123c6a8 100644
--- a/tasks.py
+++ b/tasks.py
@@ -1,5 +1,4 @@
 import cloudpickle
-from rq import get_current_job
 
 
 def convert_dask_delayed_to_rq(f, queue):
@@ -30,25 +29,7 @@ def convert_dask_delayed_to_rq(f, queue):
     return last_job
 
 
-def run_task(pickled_function: bytes, *args, **kwargs):
-    current_job = get_current_job()
-    dependencies = {}
-    for dependency in current_job.fetch_dependencies():
-        dependencies[dependency.id] = dependency
-    print(dependencies)
-
-    _args = []
-    for arg in args:
-        if isinstance(arg, str) and arg.startswith('task-'):
-            _args.append(dependencies[arg[5:]].result)
-        else:
-            _args.append(arg)
-
-    f = cloudpickle.loads(pickled_function)
-    return f(*_args, **kwargs)
-
-
 def task_enqueue(queue, f, *args, **kwargs):
     pickled_function = cloudpickle.dumps(f)
-    job = queue.enqueue(run_task, pickled_function, *args, **kwargs)
+    job = queue.enqueue("worker.run_task", pickled_function, *args, **kwargs)
     return job
diff --git a/worker.py b/worker.py
index d5d1fb6..77f78d4 100644
--- a/worker.py
+++ b/worker.py
@@ -1,6 +1,26 @@
-from rq import Connection, Queue, Worker
+from rq import Connection, Queue, Worker, get_current_job
+import cloudpickle
 
-if __name__ == '__main__':
+
+def run_task(pickled_function: bytes, *args, **kwargs):
+    current_job = get_current_job()
+    dependencies = {}
+    for dependency in current_job.fetch_dependencies():
+        dependencies[dependency.id] = dependency
+    print(dependencies)
+
+    _args = []
+    for arg in args:
+        if isinstance(arg, str) and arg.startswith("task-"):
+            _args.append(dependencies[arg[5:]].result)
+        else:
+            _args.append(arg)
+
+    f = cloudpickle.loads(pickled_function)
+    return f(*_args, **kwargs)
+
+
+if __name__ == "__main__":
     with Connection():
         q = Queue()
         Worker(q).work()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment