Skip to content

Instantly share code, notes, and snippets.

@jsams
Created May 13, 2018 22:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsams/b367973df744f28f73e473fb9686cb2d to your computer and use it in GitHub Desktop.
Save jsams/b367973df744f28f73e473fb9686cb2d to your computer and use it in GitHub Desktop.
struct ChanWrap
iter_chan::RemoteChannel
end
Base.next(d::ChanWrap) = take!(d.iter_chan)
const rchan = RemoteChannel(() -> Channel{StructCompData}(args["max_channel_size"]))
@everywhere function produce_data(iter_chan, dbfile, nworkers, batch_size,
niters; kwargs...)
total_iters = batch_size * (niters + 2)
niters_per_process = ceil(Int, total_iters / nworkers)
db = MNLSocial.SQLite.DB(dbfile) # create sql file / connect to it if it exists
MNLSocial.SQLite.@register db rand01() = rand()
sti = MNLSocial.StructIter(db=db, queue_size=niters_per_process; kwargs...)
for i in 1:niters_per_process
d = next(sti)
put!(iter_chan, d)
end
end
for p in workers()
@async remote_do(produce_data, p, rchan, SQL_FILE,
args["nworkers"],
args["batch_size"], args["niters"];
iter_args...)
end
D = ChanWrap(rchan)
#=
then a function consumes ChanWrap and computes
=#
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment