Created
August 8, 2015 03:37
-
-
Save wildart/767eb10fba1d0660f964 to your computer and use it in GitHub Desktop.
Parallel access to LMDB database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module ParHelperFuncs | |
using LMDB | |
export getSamplesFromDb, miniBatchSum | |
# we pull samples from the database | |
function getSamplesFromDb(env, idxs::Array{Int}) | |
txn = start(env) | |
dbi = open(txn) | |
xs = Int[] | |
println(idxs) | |
for idx in idxs | |
key = string(idx) | |
val = get(txn, dbi, key, String); | |
println("k:$key, v:$val") | |
val = parse(Int, val) | |
push!(xs, val) | |
end | |
abort(txn) | |
close(env, dbi) | |
return xs | |
end | |
function miniBatchSum(idxs, dbname::String) | |
# open the database | |
println("Opening ", dbname) | |
env = create() | |
open(env, dbname) | |
xs = getSamplesFromDb(env, idxs) | |
close(env) | |
# the cost is the sum of all the values we get from the db | |
cost = 0.0 | |
for x in xs | |
cost += x; | |
end | |
return cost | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# restart processes | |
if nprocs() > 1 | |
rmprocs(workers()) # remove all worker processes | |
end | |
wpids = addprocs(2) # add processes | |
println("Spawned ", nprocs(), " processes, ", nworkers()," workers") | |
println("Proc IDs: ", procs()) | |
# load LMDB on all processes | |
@everywhere using LMDB | |
# create a sample database | |
nsamples = 100 | |
dbname = "simpleseq.db" | |
isdir(dbname) && rm(dbname, recursive=true) | |
!isdir(dbname) && mkdir(dbname) | |
# the data are just {1:1, 2:2 ... } | |
create() do env | |
open(env, dbname) | |
start(env) do txn | |
open(txn) do dbi | |
for i=1:nsamples | |
put!(txn, dbi, string(i), string(i)) | |
end | |
commit(txn) | |
end | |
end | |
end | |
# load up the functions (see below for specification) | |
@everywhere using ParHelperFuncs | |
# the following single process call works | |
println("miniBatchSum([1,2,3], dbname)") | |
println(miniBatchSum([1,2,3], dbname)) | |
# the following (which does it in parallel) does not work | |
# we generate some ids to split across the nodes | |
# each node will process sample_size values | |
# the ids are put into proc_idxs | |
sample_size = 10; | |
idxs = randperm(nsamples); | |
idxs = idxs[1:(nworkers()*sample_size)] | |
proc_idxs = Any[] | |
st_idx = 1; | |
en_idx = sample_size; | |
for i=1:nworkers() | |
push!(proc_idxs, idxs[st_idx:en_idx]); | |
st_idx = en_idx+1; | |
en_idx = en_idx+sample_size; | |
end | |
println(proc_idxs) | |
# spawn and run across all worker nodes | |
k = 1; | |
remrefs = Array(Any, nworkers()); | |
for proc in workers() | |
println("Remote call to: ", proc); | |
remrefs[k] = remotecall(proc, miniBatchSum, proc_idxs[k], dbname); | |
k += 1; | |
end | |
# collect the results | |
k = 1; | |
results = Array(Any, nworkers()); | |
for k = 1:length(remrefs) | |
wait(remrefs[k]); | |
results[k] = fetch(remrefs[k]); | |
k += 1; | |
end | |
println(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Run above code with Julia 0.4: