Skip to content

Instantly share code, notes, and snippets.

@TimQuelch
Created July 20, 2021 07:23
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save TimQuelch/58beddfb781099e05b0d472de5f3be94 to your computer and use it in GitHub Desktop.
Simple framework to run parallel jobs on a distributed system with Julia
using Distributed
# Add worker processes. If on the HPC, use slurm, else use just regular CPU thread workers
if "SLURM_JOB_ID" in keys(ENV)
@info "Adding slurm procs"
using ClusterManagers
addprocs_slurm(parse(Int, ENV["SLURM_NTASKS"]), unbuffered="")
else
@info "Adding local procs"
addprocs()
end
# Create queues for jobs and results
const jobqueue = RemoteChannel(() -> Channel{Tuple}(32))
const resultsqueue = RemoteChannel(() -> Channel{NamedTuple}(32))
# This is the main loop for the workers. They take a job from the queue and runs
# it. The result is put in the results queue. take! operations will block if
# there are no jobs available, put! operations block if the results queue is
# full.
@everywhere function dowork(jobs, results)
while true
# fn, args... = take!(jobs) # only available in 1.6
job = take!(jobs) # Take the next available job
fn, args = job[1], job[2:end]
result = eval(fn)(args...) # Run the job
put!(results, result) # Send the result
end
end
# Starts event loop on all workers
start_workers() = foreach(pid -> remote_do(dowork, pid, jobqueue, resultsqueue), workers())
# Gracefully stops workers
stop_workers() = rmprocs(workers())
# Submits a single job. Blocks if jobqueue is full
submit_job(job) = put!(jobqueue, job)
# Launches an async task to submit many jobs. Does not block the main thread
submit_jobs(jobs) = @async foreach(submit_job, jobs)
include("distributedwork.jl")
# For results processing
using CSV
using DataFrames
# Define the function that will be doing the main computation. This needs to be
# defined on all workers. This can be defined as a function or be in an included
# module.
@everywhere function work_function(a, b, c)
exec_time = a * rand()
sleep(exec_time)
# Returns a named tuple
return (val=a*c, string="a, b, c = $a, $b, $c", exec_time)
end
# Start event loop on all workers. This should only be run after all required
# functions are defined on the workers (in this case, after work_function is
# defined)
start_workers()
# Define the jobs that need to be executed. The first field of the tuple is the
# name of the function to be called (must be defined on all workers with
# @everywhere). The other arguments are the arguments to pass to the function.
# This list can be generated programatically. The function doesn't necessarily
# need to be the same work function (although the results would need to be the
# same NamedTuple schema the way it is currently set up).
jobs = [
(:work_function, 1, 2, 3),
(:work_function, 2, 4, 8),
(:work_function, 8, 4, 2),
(:work_function, 3, 2, 1),
]
# Submit the jobs to the jobqueue. This launches an async task to submit each
# tuple to the queue channel
submit_jobs(jobs)
# Write each result to a CSV file as it becomes available. Alternatively each
# result row can be pushed to a DataFrame and further processing can be done.
n = length(jobs)
csvfile = "results.csv"
for i in 1:n
results = take!(resultsqueue)
@info "Obtained result ($i/$n)" pairs(results)
CSV.write(csvfile, [results]; append=isfile(csvfile))
end
# Stop all workers gracefully
stop_workers()
#!/bin/bash
#SBATCH --time=3:00
#SBATCH --ntasks=4
# This will depend on your HPC setup
module load julia/1.5
# Print all the SLURM environment variables
julia -e 'show(stdout, "text/plain", filter(kv -> occursin("SLURM", kv.first), ENV)); println();'
# Run the job with full optimisations. Add in --project="." if you have a
# Project/Manifest with dependencies
julia --optimize=3 run.jl
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment