-
-
Save tanmaykm/ec0f34cd74813dd2547a to your computer and use it in GitHub Desktop.
Hadoop / Elly.jl demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# # get the helper script | |
# git clone https://github.com/tanmaykm/JuliaDockerImages.git | |
# cd JuliaDockerImages/pkgdists/hadoop | |
# # start a cluster with 5 workers (total 6 modes) | |
# ./cluster.sh start 5 | |
# # if the cluster is started successfully, it will print out a ssh command that you can use to log on the cluster master node | |
# # execute the following script in the master node | |
# # once done, exit from the master node shell and stop cluster with: | |
# ./cluster.sh stop 5 | |
using Elly | |
# create a hdfs connection and read some server configuration | |
h = HDFSClient(string(getipaddr()), 9000) | |
defs = hdfs_server_defaults(h) | |
fs_status = hdfs_status(h) | |
# create a folder, list and cd into it | |
mkdir(h, "test") | |
readdir(h) | |
cd(h, "test") | |
# work around for a bug in latest Julia 0.4 | |
Base.ensureroom(iob::Base.AbstractIOBuffer, n::Integer) = Base.ensureroom(iob, Int(n)) | |
# write a file | |
hfile = HDFSFile(h, "testfile.txt") | |
open(hfile, "w") do fhandle | |
println(fhandle, "hello world") | |
end | |
# read the file | |
open(hfile, "r") do fhandle | |
bytes = Array(UInt8, filesize(fhandle)) | |
read!(fhandle, bytes) | |
println(bytestring(bytes)) | |
end | |
# create a yarn client and query the number of nodes available | |
yarnclnt = YarnClient(string(getipaddr()), 8032) | |
nnodes = nodecount(yarnclnt) | |
# yarn cluster manager | |
yarncm = YarnManager(yarnhost=string(getipaddr()), rmport=8032, schedport=8030, launch_timeout=60); | |
addprocs(yarncm; np=6, env=Dict("JULIA_PKGDIR"=>Pkg.dir())); | |
@everywhere println(myid()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# calculates colsums of a large matrix stored in HDFS | |
# julia hadoopblocksdemo.jl | |
using Elly | |
using HadoopBlocks | |
const INP = "hdfs://root@" * string(getipaddr()) * ":9000/colsuminp.csv" | |
if myid() == 1 | |
yarncm = YarnManager(yarnhost=string(getipaddr()), rmport=8032, schedport=8030, launch_timeout=60); | |
addprocs(yarncm; np=6, env=Dict("JULIA_PKGDIR"=>Pkg.dir())); | |
nprocs() | |
function gendata(M, N) | |
open(HDFSFile(INP), "w") do f | |
for m in 1:M | |
write(f, join(map(string, rand(N)), ","), "\n") | |
end | |
end | |
end | |
gendata(10^5, 5) | |
#@everywhere include("hadoopblocksdemo.jl") | |
end | |
# split script here and uncomment below lines to run data generation and computation separately | |
#using Elly | |
#using HadoopBlocks | |
#const INP = "hdfs://tan@localhost:9000/colsuminp.csv" | |
function findrow(r::HdfsBlockReader, iter_status) | |
rec = HadoopBlocks.find_rec(r, iter_status, Vector) | |
#HadoopBlocks.logmsg("findrow found rec:$rec") | |
rec | |
end | |
function maprow(rec) | |
#HadoopBlocks.logmsg("maprow called with rec:$rec") | |
[tuple([parse(Float64, x) for x in rec]...)] | |
end | |
function collectrow(results, rec) | |
#HadoopBlocks.logmsg("collectrow called with results:$results rec:$rec") | |
isempty(rec) && (return results) | |
(results == nothing) && (return rec) | |
tuple([results[x]+rec[x] for x in 1:length(results)]...) | |
end | |
function reducerow(reduced, results...) | |
#HadoopBlocks.logmsg("reducerow called with reduced:$reduced results:$results") | |
(nothing == reduced) && (reduced = zeros(Float64, length(results[1]))) | |
for res in results | |
(nothing == res) && continue | |
#HadoopBlocks.logmsg("reducerow res:$res") | |
#HadoopBlocks.logmsg("reducerow res:$([res...])") | |
for x in 1:length(res) | |
reduced[x] += res[x] | |
end | |
end | |
#HadoopBlocks.logmsg("reducerow returning reduced:$reduced") | |
reduced | |
end | |
function wait_results(j_mon) | |
loopstatus = true | |
while(loopstatus) | |
sleep(5) | |
jstatus,jstatusinfo = status(j_mon,true) | |
((jstatus == "error") || (jstatus == "complete")) && (loopstatus = false) | |
(jstatus == "running") && println("$(j_mon): $(jstatusinfo)% complete...") | |
end | |
wait(j_mon) | |
println("time taken (total time, wait time, run time): $(times(j_mon))") | |
println("") | |
end | |
if myid() == 1 | |
j = dmapreduce(MRHdfsFileInput([INP], findrow), maprow, collectrow, reducerow) | |
wait_results(j) | |
println(results(j)) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment