Last active
September 5, 2023 10:57
-
-
Save costrouc/8126f6a8eef8be29de780bd964bc4ab0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rq import Queue | |
from redis import Redis | |
import tasks | |
import time | |
import contextlib | |
@contextlib.contextmanager | |
def timer(message: str): | |
start_time = time.time() | |
yield | |
print(f'{message} took {time.time() - start_time:.2f} [s]') | |
redis_conn = Redis() | |
queue = Queue(connection=redis_conn) | |
with timer('submitting tasks'): | |
def special_function(a: int, b: int, c: str): | |
return str(a + b) + c | |
job = tasks.task_enqueue(queue, special_function, 1, 2, 'a') | |
# result = job.latest_result(timeout=60) | |
while job.get_status() != 'finished': | |
time.sleep(0.1) | |
print('result', job.result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from rq import Queue | |
from redis import Redis | |
from dask import delayed | |
import tasks | |
@delayed | |
def add(a, b): | |
print('add', a, b) | |
return a + b | |
@delayed | |
def multiply(a, b): | |
import time | |
print('multiply', a, b) | |
time.sleep(1) | |
return a * b | |
f = add(multiply(1, 2), add(3, multiply(4, 5))) | |
redis_conn = Redis() | |
queue = Queue(connection=redis_conn) | |
job = tasks.convert_dask_delayed_to_rq(f, queue) | |
while job.get_status() != 'finished': | |
time.sleep(0.1) | |
print('result', job.result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
redis: | |
image: "redis:7" | |
command: | |
- redis-server | |
ports: | |
- "6379:6379" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: rqcloud | |
channels: | |
- conda-forge | |
dependencies: | |
- rq | |
- redis-py | |
- cloudpickle |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cloudpickle | |
from rq import get_current_job | |
def convert_dask_delayed_to_rq(f, queue): | |
last_job = None | |
jobs = {} | |
for task_id in f.dask._toposort_layers(): | |
depends_on = [] | |
for dependency in f.dask.dependencies[task_id]: | |
depends_on.append(jobs[dependency]) | |
_args = [] | |
for arg in f.dask[task_id][1:]: | |
if arg in jobs: | |
_args.append(f'task-{arg}') | |
else: | |
_args.append(arg) | |
job = task_enqueue( | |
queue, | |
f.dask[task_id][0], | |
*_args, | |
depends_on=depends_on, | |
job_id=task_id | |
) | |
jobs[task_id] = job | |
last_job = job | |
return last_job | |
def run_task(pickled_function: bytes, *args, **kwargs): | |
current_job = get_current_job() | |
dependencies = {} | |
for dependency in current_job.fetch_dependencies(): | |
dependencies[dependency.id] = dependency | |
print(dependencies) | |
_args = [] | |
for arg in args: | |
if isinstance(arg, str) and arg.startswith('task-'): | |
_args.append(dependencies[arg[5:]].result) | |
else: | |
_args.append(arg) | |
f = cloudpickle.loads(pickled_function) | |
return f(*_args, **kwargs) | |
def task_enqueue(queue, f, *args, **kwargs): | |
pickled_function = cloudpickle.dumps(f) | |
job = queue.enqueue(run_task, pickled_function, *args, **kwargs) | |
return job |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rq import Connection, Queue, Worker | |
if __name__ == '__main__': | |
with Connection(): | |
q = Queue() | |
Worker(q).work() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Let's make this a little more honest, since this is for demonstration purposes
run_task
here, internallyrq
just extracts the "path", i.e."tasks.run_task"
and puts that into the queue.run_task
function needs to be available on the worker, whiletask_enqueue
needs to be available on the node that handles the queue. Unless we operate on a single node, it is unlikely that both have access to same local file.