Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

View JoshRosen's full-sized avatar

Josh Rosen JoshRosen

View GitHub Profile
@JoshRosen
JoshRosen / jacoco-coverage.patch
Created January 10, 2013 00:01
Code coverage for Spark tests using JaCoCo
$ sbt/sbt "jacoco:cover"
$ open core/target/scala-2.9.2/jacoco/html/index.html

Generating Flame Graphs for Apache Spark

Flame graphs are a nifty debugging tool to determine where CPU time is being spent. Using the Java Flight recorder, you can do this for Java processes without adding significant runtime overhead.

When are flame graphs useful?

Shivaram Venkataraman and I have found these flame recordings to be useful for diagnosing coarse-grained performance problems. We started using them at the suggestion of Josh Rosen, who quickly made one for the Spark scheduler when we were talking to him about why the scheduler caps out at a throughput of a few thousand tasks per second. Josh generated a graph similar to the one below, which illustrates that a significant amount of time is spent in serialization (if you click in the top right hand corner and search for "serialize", you can see that 78.6% of the sampled CPU time was spent in serialization). We used this insight to spee

@JoshRosen
JoshRosen / out.diff
Created October 21, 2021 22:05
Bytecode diff related to https://github.com/apache/spark/pull/34351 where I see what happens if I just change protected to private
diff --git a/OpenHashMap.class.asm b/OpenHashMap.class.asm
index aa713d0..b684cf3 100644
--- a/OpenHashMap.class.asm
+++ b/OpenHashMap.class.asm
@@ -1375,10 +1375,10 @@
MAXSTACK = 1
MAXLOCALS = 1
- // access flags 0x1
+ // access flags 0x2
@JoshRosen
JoshRosen / bench.scala
Created October 21, 2021 21:34
toy benchmark of OpenHashMap
def timeAndRecordAllocations(
numWarmups: Int,
numTrials: Int
)(functionToBenchmark: => Unit): Unit = {
import java.lang.management.ManagementFactory
import com.sun.management.ThreadMXBean
val threadMxBean = ManagementFactory.getThreadMXBean.asInstanceOf[ThreadMXBean]
val threadId = Thread.currentThread.getId
diff --git a/OpenHashMap$mcD$sp.class.asm b/OpenHashMap$mcD$sp.class.asm
index 3989e91..ea49dbb 100644
--- a/OpenHashMap$mcD$sp.class.asm
+++ b/OpenHashMap$mcD$sp.class.asm
@@ -29,6 +29,54 @@ public class org/apache/spark/util/collection/OpenHashMap$mcD$sp extends org/apa
// access flags 0x1
public D nullValue$mcD$sp
+ // access flags 0x1019
+ public final static synthetic $anonfun$changeValue$3(Lorg/apache/spark/util/collection/OpenHashMap$mcD$sp;I)V
@JoshRosen
JoshRosen / scala-lambda-serialization-with-lifted-local-defs.md
Last active June 12, 2021 16:35
Serialization of Scala closures that contain local defs

Serialization of Scala closures that contain local defs

Several Apache Spark APIs rely on the ability to serialize Scala closures. Closures may reference non-Serializable objects, preventing them from being serialized. In some cases (SI-1419 and others), however, these references are unnecessary and can be nulled out, allowing otherwise-unserializable closures to be serialized (in Spark, this nulling is performed by the ClosureCleaner).

Scala 2.12's use of Java 8 lambdas for implementing closures appears to have broken our ability to serialize closures which contain local defs. If we cannot resolve this problem, Spark will be unable to support Scala 2.12 and will be stuck on 2.10 and 2.11 forever.

As an example which illustrates this problem, the following closure has a nested localDef and is defined inside of a non-serializable class:

``

/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean range_initRange_0;
/* 010 */ private long range_nextIndex_0;
[run]
branch = true
parallel = true
data_file = ${COVERAGE_DIR}/coverage_data/coverage
[html]
ignore_errors = true
@JoshRosen
JoshRosen / scapegoat-to-csv-spark.py
Created May 24, 2017 22:30
Scapegoat output to CSV converter for Spark
import xml.etree.ElementTree as ET
import glob
import fnmatch
import os
import csv
SCAPEGOAT_VERSION = 'd9392e5072e3e408dd232e6fc799e0ac1640189b'
SPARK_VERSION = '4816c2ef5e04eb2dd70bed8b99882aa0b7fe7fd7'
SPARK_HOME = '/Users/joshrosen/Documents/spark/'
@JoshRosen
JoshRosen / apply-patch.sh
Created June 24, 2016 23:10 — forked from kfish/apply-patch.sh
Apply a patch file that was produced with "git format-patch" using the patch command, and commit it using the message from the original commit.
#!/bin/bash
apply () {
filename=$1
shift
patch_args=$*
gotSubject=no
msg=""