Skip to content

Instantly share code, notes, and snippets.

@tanmaykm
Last active March 21, 2016 08:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tanmaykm/ec0f34cd74813dd2547a to your computer and use it in GitHub Desktop.
Save tanmaykm/ec0f34cd74813dd2547a to your computer and use it in GitHub Desktop.
Hadoop / Elly.jl demo
# # get the helper script
# git clone https://github.com/tanmaykm/JuliaDockerImages.git
# cd JuliaDockerImages/pkgdists/hadoop
# # start a cluster with 5 workers (total 6 modes)
# ./cluster.sh start 5
# # if the cluster is started successfully, it will print out a ssh command that you can use to log on the cluster master node
# # execute the following script in the master node
# # once done, exit from the master node shell and stop cluster with:
# ./cluster.sh stop 5
using Elly
# create a hdfs connection and read some server configuration
h = HDFSClient(string(getipaddr()), 9000)
defs = hdfs_server_defaults(h)
fs_status = hdfs_status(h)
# create a folder, list and cd into it
mkdir(h, "test")
readdir(h)
cd(h, "test")
# work around for a bug in latest Julia 0.4
Base.ensureroom(iob::Base.AbstractIOBuffer, n::Integer) = Base.ensureroom(iob, Int(n))
# write a file
hfile = HDFSFile(h, "testfile.txt")
open(hfile, "w") do fhandle
println(fhandle, "hello world")
end
# read the file
open(hfile, "r") do fhandle
bytes = Array(UInt8, filesize(fhandle))
read!(fhandle, bytes)
println(bytestring(bytes))
end
# create a yarn client and query the number of nodes available
yarnclnt = YarnClient(string(getipaddr()), 8032)
nnodes = nodecount(yarnclnt)
# yarn cluster manager
yarncm = YarnManager(yarnhost=string(getipaddr()), rmport=8032, schedport=8030, launch_timeout=60);
addprocs(yarncm; np=6, env=Dict("JULIA_PKGDIR"=>Pkg.dir()));
@everywhere println(myid())
# calculates colsums of a large matrix stored in HDFS
# julia hadoopblocksdemo.jl
using Elly
using HadoopBlocks
const INP = "hdfs://root@" * string(getipaddr()) * ":9000/colsuminp.csv"
if myid() == 1
yarncm = YarnManager(yarnhost=string(getipaddr()), rmport=8032, schedport=8030, launch_timeout=60);
addprocs(yarncm; np=6, env=Dict("JULIA_PKGDIR"=>Pkg.dir()));
nprocs()
function gendata(M, N)
open(HDFSFile(INP), "w") do f
for m in 1:M
write(f, join(map(string, rand(N)), ","), "\n")
end
end
end
gendata(10^5, 5)
#@everywhere include("hadoopblocksdemo.jl")
end
# split script here and uncomment below lines to run data generation and computation separately
#using Elly
#using HadoopBlocks
#const INP = "hdfs://tan@localhost:9000/colsuminp.csv"
function findrow(r::HdfsBlockReader, iter_status)
rec = HadoopBlocks.find_rec(r, iter_status, Vector)
#HadoopBlocks.logmsg("findrow found rec:$rec")
rec
end
function maprow(rec)
#HadoopBlocks.logmsg("maprow called with rec:$rec")
[tuple([parse(Float64, x) for x in rec]...)]
end
function collectrow(results, rec)
#HadoopBlocks.logmsg("collectrow called with results:$results rec:$rec")
isempty(rec) && (return results)
(results == nothing) && (return rec)
tuple([results[x]+rec[x] for x in 1:length(results)]...)
end
function reducerow(reduced, results...)
#HadoopBlocks.logmsg("reducerow called with reduced:$reduced results:$results")
(nothing == reduced) && (reduced = zeros(Float64, length(results[1])))
for res in results
(nothing == res) && continue
#HadoopBlocks.logmsg("reducerow res:$res")
#HadoopBlocks.logmsg("reducerow res:$([res...])")
for x in 1:length(res)
reduced[x] += res[x]
end
end
#HadoopBlocks.logmsg("reducerow returning reduced:$reduced")
reduced
end
function wait_results(j_mon)
loopstatus = true
while(loopstatus)
sleep(5)
jstatus,jstatusinfo = status(j_mon,true)
((jstatus == "error") || (jstatus == "complete")) && (loopstatus = false)
(jstatus == "running") && println("$(j_mon): $(jstatusinfo)% complete...")
end
wait(j_mon)
println("time taken (total time, wait time, run time): $(times(j_mon))")
println("")
end
if myid() == 1
j = dmapreduce(MRHdfsFileInput([INP], findrow), maprow, collectrow, reducerow)
wait_results(j)
println(results(j))
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment