Skip to content

Instantly share code, notes, and snippets.

View rxin's full-sized avatar

Reynold Xin rxin

View GitHub Profile
@rxin
rxin / NaNTesting.java
Created July 21, 2015 00:39
NaN double vs float testing
package com.databricks.unsafe.util.benchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@rxin
rxin / CodegenTest.scala
Created August 20, 2015 05:54
code gen test
package org.apache.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
object CodegenTest {
def main(args: Array[String]): Unit = {
val sc = SparkContext.getOrCreate()
val sqlContext = new SQLContext(sc)
@rxin
rxin / benchmark.scala
Last active September 10, 2015 06:09
Spark Parquet benchmark
// Launch spark-shell
MASTER=local[4] bin/spark-shell --driver-memory 4G --conf spark.shuffle.memoryFraction=0.5 --packages com.databricks:spark-csv_2.10:1.2.0
// Read the DF in
val pdf = sqlContext.read.parquet("d_small_key.parquet")
sqlContext.setConf("spark.sql.shuffle.partitions", "8")
// Data reading
val start = System.currentTimeMillis
@rxin
rxin / ampcamp-ecnu-2013-data.sh
Last active December 14, 2015 10:49
scripts to help setup ampcamp @ ECNU March 2013
################################################################################
# Step 1. Download wiki traffic log.
# from
# https://s3.amazonaws.com/ampcamp/ampcamp-ecnu-2013/wikistats/part-00095.gz
# to
# https://s3.amazonaws.com/ampcamp/ampcamp-ecnu-2013/wikistats/part-00168.gz
# Note that 095 and 168 are both 0 bytes. The sole purpose of their existence is
# to verify the downloads.
# NOTE THAT THE FOLLOWING SCRIPT STARTS wget AS BACKGROUND PROCESSES.
def testWrite(path: String): Long = {
val startTime = System.currentTimeMillis()
val out = new java.io.FileWriter(path)
var i = 1
val bytes = " " * (1024 * 1024)
while (i < 1000) {
out.write(bytes)
i += 1
}
out.close
package spark.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable
import org.objectweb.asm.{ClassReader, MethodVisitor}
import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
@rxin
rxin / InsertPerf.scala
Last active December 19, 2015 03:49
Scala collection insert performance (2.9.3)
// 1001 381 384 384 383 384 407 404 409 407
object ArrayBufferBenchmark extends scala.testing.Benchmark {
def run = {
val len = 10 * 1000 * 1000
val a = new scala.collection.mutable.ArrayBuffer[Int](len)
@rxin
rxin / update.sh
Last active December 21, 2015 01:09
Update Spark/Shark on EC2 AMI
set -e
set -o pipefail
/root/spark/bin/stop-all.sh
rm -rf ~/.ivy2/local/org.spark*
rm -rf ~/.ivy2/cache/org.spark*
cd /root/spark
git checkout master
@rxin
rxin / gist:6896688
Last active December 25, 2015 01:39
take async
def takeAsync(num: Int): FutureAction[Seq[T]] = {
val promise = new CancellablePromise[Seq[T]]
promise.run {
val buf = new ArrayBuffer[T](num)
val totalParts = self.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts && !promise.cancelled) {
// The number of partitions to try in this iteration. It is ok for this number to be
@rxin
rxin / df.py
Last active January 26, 2017 00:44
DataFrame simple aggregation performance benchmark
data = sqlContext.load("/home/rxin/ints.parquet")
data.groupBy("a").agg(col("a"), avg("num")).collect()