Skip to content

Instantly share code, notes, and snippets.

@wildart
Created August 8, 2015 03:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wildart/767eb10fba1d0660f964 to your computer and use it in GitHub Desktop.
Save wildart/767eb10fba1d0660f964 to your computer and use it in GitHub Desktop.
Parallel access to LMDB database
module ParHelperFuncs
using LMDB
export getSamplesFromDb, miniBatchSum
# we pull samples from the database
function getSamplesFromDb(env, idxs::Array{Int})
txn = start(env)
dbi = open(txn)
xs = Int[]
println(idxs)
for idx in idxs
key = string(idx)
val = get(txn, dbi, key, String);
println("k:$key, v:$val")
val = parse(Int, val)
push!(xs, val)
end
abort(txn)
close(env, dbi)
return xs
end
function miniBatchSum(idxs, dbname::String)
# open the database
println("Opening ", dbname)
env = create()
open(env, dbname)
xs = getSamplesFromDb(env, idxs)
close(env)
# the cost is the sum of all the values we get from the db
cost = 0.0
for x in xs
cost += x;
end
return cost
end
end
# restart processes
if nprocs() > 1
rmprocs(workers()) # remove all worker processes
end
wpids = addprocs(2) # add processes
println("Spawned ", nprocs(), " processes, ", nworkers()," workers")
println("Proc IDs: ", procs())
# load LMDB on all processes
@everywhere using LMDB
# create a sample database
nsamples = 100
dbname = "simpleseq.db"
isdir(dbname) && rm(dbname, recursive=true)
!isdir(dbname) && mkdir(dbname)
# the data are just {1:1, 2:2 ... }
create() do env
open(env, dbname)
start(env) do txn
open(txn) do dbi
for i=1:nsamples
put!(txn, dbi, string(i), string(i))
end
commit(txn)
end
end
end
# load up the functions (see below for specification)
@everywhere using ParHelperFuncs
# the following single process call works
println("miniBatchSum([1,2,3], dbname)")
println(miniBatchSum([1,2,3], dbname))
# the following (which does it in parallel) does not work
# we generate some ids to split across the nodes
# each node will process sample_size values
# the ids are put into proc_idxs
sample_size = 10;
idxs = randperm(nsamples);
idxs = idxs[1:(nworkers()*sample_size)]
proc_idxs = Any[]
st_idx = 1;
en_idx = sample_size;
for i=1:nworkers()
push!(proc_idxs, idxs[st_idx:en_idx]);
st_idx = en_idx+1;
en_idx = en_idx+sample_size;
end
println(proc_idxs)
# spawn and run across all worker nodes
k = 1;
remrefs = Array(Any, nworkers());
for proc in workers()
println("Remote call to: ", proc);
remrefs[k] = remotecall(proc, miniBatchSum, proc_idxs[k], dbname);
k += 1;
end
# collect the results
k = 1;
results = Array(Any, nworkers());
for k = 1:length(remrefs)
wait(remrefs[k]);
results[k] = fetch(remrefs[k]);
k += 1;
end
println(results)
@wildart
Copy link
Author

wildart commented Aug 8, 2015

Run above code with Julia 0.4:

 julia ptest.jl

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment