Skip to content

Instantly share code, notes, and snippets.

@TeemuKoivisto
Last active November 27, 2018 20:07
Show Gist options
  • Save TeemuKoivisto/468c1a1607703db5b154edfe44d4a1c9 to your computer and use it in GitHub Desktop.
Save TeemuKoivisto/468c1a1607703db5b154edfe44d4a1c9 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext, SparkConf
import os
import sys
#datasets path on shared group directory on Ukko2. Uncomment the one which you would like to work on.
dataset = "/proj/group/distributed-data-infra/data-1-sample.txt"
#dataset = "/proj/group/distributed-data-infra/data-1.txt"
#dataset = "/proj/group/distributed-data-infra/data-2-sample.txt"
#dataset = "/proj/group/distributed-data-infra/data-2.txt"
conf = (SparkConf()
.setAppName("teekoivi") ##change app name to your username
.setMaster("spark://128.214.48.227:7077")
.set("spark.cores.max", "10") ##dont be too greedy ;)
.set("spark.rdd.compress", "true")
.set("spark.broadcast.compress", "true"))
sc = SparkContext(conf=conf)
data = sc.textFile(dataset)
data = data.map(lambda s: float(s))
count = data.count()
sum = data.sum()
print "Count = %.8f" % count
print "Sum = %.8f" % sum
import numpy as np
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
quantile(data, 0.5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment