Skip to content

Instantly share code, notes, and snippets.

@RalphAS
Created October 15, 2016 03:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RalphAS/2a37b1c631923efa30ac4a7f02a2ee9d to your computer and use it in GitHub Desktop.
Save RalphAS/2a37b1c631923efa30ac4a7f02a2ee9d to your computer and use it in GitHub Desktop.
Multi-process resource control in Julia
# Julia doesn't have cross-process semaphores/mutexes.
# Here is a work-around using Tasks and Channels.
# The example controls access to a file written by workers.
#
# Design:
# each worker shares Req and Reply channels w/ master
# worker puts a token in Req channel when wanting access, waits on Reply channel
# when woken, writes to file, then posts Req and goes back to work.
# task on master (one per worker) waits on Req channel
# when woken, checks message. quits if "done"
# else (waits to) lock mutex and replies, waits again on Req
# when woken again, releases mutex
# Caveat:
# Tested w/ Julia 0.5.0 (Linux), but no guarantees.
# Author has not encountered deadlock or loss of sync during testing.
# released under MIT license
addprocs(3)
reqchans = [RemoteChannel(() -> Channel{Int}(1),1) for i in 1:nworkers()]
repchans = [RemoteChannel(() -> Channel{Int}(1),1) for i in 1:nworkers()]
@everywhere function work(req,rep)
fid = open("tmpx",false,true,false,false,true) # append
junk = "f"^264 # stuffing to help check sync
niter = 100
for it=1:niter
# pretend to do something
sleep(0.02+0.02*rand())
# println("proc ",myid()," posting request") # debug
# [acquire "semaphore"]
put!(req,1)
x = take!(rep) # implicit wait
# println("proc ",myid()," about to write") # debug
seekend(fid)
@printf(fid,"%i %s ",myid(),string(now()))
write(fid,junk)
println(fid)
flush(fid)
# println("proc ",myid()," yielding") # debug
# [release "semaphore"]
if it == niter
put!(req,3)
else
put!(req,2)
end
end
close(fid)
end
function run()
fid = open("tmpx","w")
@printf(fid,"# starting\n")
close(fid)
function wrapwork(c1,c2)
@async begin
remote(work)(c1,c2)
end
yield()
end
# the actual mutex lives in the main process
mon = ReentrantLock()
function monitor(req,rep)
while true
x = take!(req)
lock(mon)
put!(rep,1)
x = take!(req)
unlock(mon)
if x == 3
break
end
end
nothing
end
println("kickoff")
@sync begin
for (c1,c2) in zip(reqchans,repchans)
wrapwork(c1,c2)
t = @task monitor(c1,c2)
schedule(t)
end
end
println("done.")
nothing
end
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment