Skip to content

Instantly share code, notes, and snippets.

@tanmaykm
Last active October 6, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tanmaykm/364395756f549950038b to your computer and use it in GitHub Desktop.
Save tanmaykm/364395756f549950038b to your computer and use it in GitHub Desktop.
Twitter link graph
using Elly
using HadoopBlocks
@everywhere begin
#const INP = "hdfs://root@" * string(getipaddr()) * ":9000/twitter_rv.net"
#const COLSEP = '\t'
const INP = "hdfs://root@" * string(getipaddr()) * ":9000/twitter_small.csv"
# const YARNHOST = string(getipaddr()
#const INP = "hdfs://tan@localhost:9000/twitter_small.csv"
const COLSEP = ','
const MAXNODE = 11316811
const YARNHOST = "localhost"
function findrow(r::HadoopBlocks.HdfsBlockReader, iter_status)
rec = HadoopBlocks.find_rec(r, iter_status, Matrix, '\n', COLSEP)
#HadoopBlocks.logmsg("findrow found rec:$rec")
rec
end
function to_id(v::AbstractString)
v = strip(v)
isempty(v) ? 0 : parse(Int32, v)
end
to_id(v::Number) = Int32(v)
function maprow(rec)
HadoopBlocks.logmsg("map starting...")
L = size(rec,1)
I = Int32[]
J = Int32[]
for idx in 1:L
i = to_id(rec[idx,2])
j = to_id(rec[idx,1])
if (i > 0) && (j > 0)
push!(I, i)
push!(J, j)
end
end
L = length(I)
S = sparse(I, J, ones(L), MAXNODE, MAXNODE)
HadoopBlocks.logmsg("map finished.")
rets = Any[]
push!(rets, S)
rets
end
function collectrow(results, rec)
HadoopBlocks.logmsg("collect starting...")
isempty(rec) && (return results)
if results == nothing
results = rec
else
results = results .+ rec
end
HadoopBlocks.logmsg("collect finished.")
results
end
function reducerow(reduced, results...)
HadoopBlocks.logmsg("reduce starting...")
for res in results
if nothing == reduced
reduced = res
else
reduced = reduced .+ res
end
end
HadoopBlocks.logmsg("reduce finished.")
reduced
end
end
function wait_results(j_mon)
loopstatus = true
while(loopstatus)
sleep(5)
jstatus,jstatusinfo = status(j_mon,true)
((jstatus == "error") || (jstatus == "complete")) && (loopstatus = false)
(jstatus == "running") && println("$(j_mon): $(jstatusinfo)% complete...")
end
wait(j_mon)
println("time taken (total time, wait time, run time): $(times(j_mon))")
println("")
end
function submit_job()
yarncm = YarnManager(yarnhost=YARNHOST, rmport=8032, schedport=8030, launch_timeout=120);
addprocs(yarncm; np=10, env=Dict("JULIA_PKGDIR"=>Pkg.dir()), mem=4096);
println("nworkers: $(nworkers())")
@everywhere require("demo.jl")
HadoopBlocks.Scheduler.prep_remotes(true)
j = dmapreduce(MRHdfsFileInput([INP], findrow), maprow, collectrow, reducerow)
wait_results(j)
R = results(j)
println(R)
end
# dataset: http://parsa.epfl.ch/cloudsuite/graph.html
# 1,468,365,182 rows
# 1000000 rows => 5 secs
function store(N, A)
open("$N.ser", "w") do f
serialize(f, A)
end
nothing
end
function load(N)
open("$N.ser") do f
return deserialize(f)
end
end
function preprocess()
f = open("twitter_rv.net");
lno = 0;
t1 = time();
I = Int32[]
J = Int32[]
while !eof(f)
lno += 1
l = readline(f)
id1, id2 = map((x)->parse(Int32, strip(x)), split(l))
if (lno % 1000000) == 0
t2 = time()
println("$lno : $id1 -> $id2 in $(t2-t1) secs")
t1 = t2
end
push!(I, id1)
push!(J, id2)
end
println("storing...")
store("I", I)
store("J", J)
I, J
end
function process(S)
E = eigs(S; nev=1)
store("E", E)
E
end
load_sparse() = load_sparse(load("I"), load("J"))
function load_sparse(I, J)
println("creating sparse matrix...")
V = ones(length(I))
S = sparse(I, J, V)
println("done")
S
end
load_eig() = load("E")
function find_extreme_pos(S, E)
EV = convert(Array{Float64}, E[2])
P = S * EV
P = abs(P[:,1])
sp = sortperm(P)
infl = sp[end-9:end]
newb = sp[1:10]
infl, newb
end
function count_connections(S, ids)
for id in ids
println("id:$id connections:$(nnz(S[:,id]))")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment