Simple framework to run parallel jobs on a distributed system with Julia
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Distributed | |
# Add worker processes. If on the HPC, use slurm, else use just regular CPU thread workers | |
if "SLURM_JOB_ID" in keys(ENV) | |
@info "Adding slurm procs" | |
using ClusterManagers | |
addprocs_slurm(parse(Int, ENV["SLURM_NTASKS"]), unbuffered="") | |
else | |
@info "Adding local procs" | |
addprocs() | |
end | |
# Create queues for jobs and results | |
const jobqueue = RemoteChannel(() -> Channel{Tuple}(32)) | |
const resultsqueue = RemoteChannel(() -> Channel{NamedTuple}(32)) | |
# This is the main loop for the workers. They take a job from the queue and runs | |
# it. The result is put in the results queue. take! operations will block if | |
# there are no jobs available, put! operations block if the results queue is | |
# full. | |
@everywhere function dowork(jobs, results) | |
while true | |
# fn, args... = take!(jobs) # only available in 1.6 | |
job = take!(jobs) # Take the next available job | |
fn, args = job[1], job[2:end] | |
result = eval(fn)(args...) # Run the job | |
put!(results, result) # Send the result | |
end | |
end | |
# Starts event loop on all workers | |
start_workers() = foreach(pid -> remote_do(dowork, pid, jobqueue, resultsqueue), workers()) | |
# Gracefully stops workers | |
stop_workers() = rmprocs(workers()) | |
# Submits a single job. Blocks if jobqueue is full | |
submit_job(job) = put!(jobqueue, job) | |
# Launches an async task to submit many jobs. Does not block the main thread | |
submit_jobs(jobs) = @async foreach(submit_job, jobs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include("distributedwork.jl") | |
# For results processing | |
using CSV | |
using DataFrames | |
# Define the function that will be doing the main computation. This needs to be | |
# defined on all workers. This can be defined as a function or be in an included | |
# module. | |
@everywhere function work_function(a, b, c) | |
exec_time = a * rand() | |
sleep(exec_time) | |
# Returns a named tuple | |
return (val=a*c, string="a, b, c = $a, $b, $c", exec_time) | |
end | |
# Start event loop on all workers. This should only be run after all required | |
# functions are defined on the workers (in this case, after work_function is | |
# defined) | |
start_workers() | |
# Define the jobs that need to be executed. The first field of the tuple is the | |
# name of the function to be called (must be defined on all workers with | |
# @everywhere). The other arguments are the arguments to pass to the function. | |
# This list can be generated programatically. The function doesn't necessarily | |
# need to be the same work function (although the results would need to be the | |
# same NamedTuple schema the way it is currently set up). | |
jobs = [ | |
(:work_function, 1, 2, 3), | |
(:work_function, 2, 4, 8), | |
(:work_function, 8, 4, 2), | |
(:work_function, 3, 2, 1), | |
] | |
# Submit the jobs to the jobqueue. This launches an async task to submit each | |
# tuple to the queue channel | |
submit_jobs(jobs) | |
# Write each result to a CSV file as it becomes available. Alternatively each | |
# result row can be pushed to a DataFrame and further processing can be done. | |
n = length(jobs) | |
csvfile = "results.csv" | |
for i in 1:n | |
results = take!(resultsqueue) | |
@info "Obtained result ($i/$n)" pairs(results) | |
CSV.write(csvfile, [results]; append=isfile(csvfile)) | |
end | |
# Stop all workers gracefully | |
stop_workers() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#SBATCH --time=3:00 | |
#SBATCH --ntasks=4 | |
# This will depend on your HPC setup | |
module load julia/1.5 | |
# Print all the SLURM environment variables | |
julia -e 'show(stdout, "text/plain", filter(kv -> occursin("SLURM", kv.first), ENV)); println();' | |
# Run the job with full optimisations. Add in --project="." if you have a | |
# Project/Manifest with dependencies | |
julia --optimize=3 run.jl |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment