Skip to content

Instantly share code, notes, and snippets.

@dragos
Created March 11, 2016 20:40
Show Gist options
  • Save dragos/77b048c2baba93d36cd8 to your computer and use it in GitHub Desktop.
Save dragos/77b048c2baba93d36cd8 to your computer and use it in GitHub Desktop.

Investigating memory leaks in the Spark Shell

Summary

Re-assigning a variable in the REPL leads to memory leaks.

Reproducible test-case

The original case had a few unnecessary features, but it boils down to re-assigning a fresh large array (100MB) to the same variable several times.

The existing test case can be further minimized to not depend on Spark:

object Min {
  def m(array: Array[Int]): Array[Int] = {
    Array.range(0, array.length)
  }
}
val n = 25000000
val array = (0 until n).toArray

var ary2: Array[Int] = null

// Repeat the rest until failure:
ary2 = Min.m(array)

This example fails both in spark-shell and the plain Scala REPL (2.11.7).

Methodology

I ran the reproducible code in the spark-shell (1.6.1-rc1 with scala 2.10.5) and noticed the heap going up. I used jvisualvm to monitor the heap usage and took a heap dump when I saw it was significantly higher (after 4-5 repeats of the last line). The profiler immediately identified the top-most consumers: sevaral arrays of exactly 25 million elements, each one holding onto 100MB of heap.

JVisualVM can find the nearest GC root for each one of the arrays, here's the breakdown:

  • one of them was the expected ary2
  • the others were deeply nested fields rooted in a top-level object, itself contained inside the Spark class loader. The path to the object is something like $read$.$iw.$iw.$iw.$iw.$iw.$iw.$iresX, where X is a number, each time different.

It became clear that each invocation of m is creating a new array that is not garbage collectable. Clearly, Array.range is copying the array and the returne RDD is live. But the returned value is stored inside a mutable variable, ary2 and should overwrite the reference to the previous array.

So who holds on to the old RDD?

REPL debug

The Spark REPL allows some of the regular Scala REPL command line arguments, so the next step was to run it with -Xprint:parse. I was particularly interested in what the assignment is compiled down to.

The Scala REPL is not really an interpreter: each line is actually compiled to a full Scala program by wrapping the line in a fairly involved mix of wrapper classes, objects and imports for the previously defined members.

Here's a slightly cleaned-up version of the output (using a slightly more complicated version of the reproducible test case still involving Spark types):

scala> ary2 = Min.m(sc, array, n)
[[syntax trees at end of                    parser]] // <console>
package $line25 {
  class $read extends Serializable {
    class $iwC extends Serializable {
      
      val $VAL21 = $line3.$read.INSTANCE;
      import $VAL21.$iw.$iw.sc;
      val $VAL22 = $line4.$read.INSTANCE;
      import $VAL22.$iw.$iw.sqlContext;
      
      class $iwC extends Serializable {
      
        import org.apache.spark.SparkContext._;
        class $iwC extends Serializable {
          class $iwC extends Serializable {

            import sqlContext.implicits._;
            class $iwC extends Serializable {

              import sqlContext.sql;
              class $iwC extends Serializable {

                import org.apache.spark.sql.functions._;

                  val $VAL23 = $line15.$read.INSTANCE;
                  import $VAL23.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.n;
                  val $VAL24 = $line19.$read.INSTANCE;
                  import $VAL24.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.array;
                  import org.apache.spark.rdd.RDD;
                  val $VAL25 = $line21.$read.INSTANCE;

                  import $VAL25.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.ary2;
                  import org.apache.spark.SparkContext;

                  class $iwC extends Serializable {

                    import org.apache.spark.rdd.RDD;
                    val $VAL26 = $line24.$read.INSTANCE;
                    import $VAL26.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.Min;
                    class $iwC extends Serializable {

                      org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);
                      ary2 = Min.m(sc, array, n);
                      val $ires9 = ary2
                    };
                    val $iw = new $iwC()
                  };
                  val $iw = new $iwC()
                };
                val $iw = new $iwC()
              };
              val $iw = new $iwC()
            };
            val $iw = new $iwC()
          };
          val $iw = new $iwC()
        };
        val $iw = new $iwC()
      };
      val $iw = new $iwC()
    };
    val $iw = new $iwC()
  };
  object $read extends scala.AnyRef {
    val INSTANCE = new $read()
  }
}

[[syntax trees at end of                    parser]] // <console>
package $line25 {
  object $eval extends scala.AnyRef {
    lazy val $result = $line25.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$ires9;
    val $print: String = {
      $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
      "".$plus("ary2: org.apache.spark.rdd.RDD[Int] = ").$plus($line25.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$ires9).$plus("\n")
    }
  }
}

The important bit is around the ary2 assignment. And to my surprise, there's the insidious $ires9 field:

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);
ary2 = Min.m(sc, array, n);
val $ires9 = ary2

$ires9 is a field that holds on to the same array as ary2, except that it's never re-assigned! That's where our memory goes!

Looking through the Spark sources it looks like this assignment is added by the REPL in SparkMemberHandlers.scala (for Scala 2.10), more precisely inside AssignmentHandler. The same code appears in the vanilla Scala REPL.

Why, oh why?!

The answer is in the last lines of the listing: because the REPL prints out the toString of the last expression! However, assignments have type Unit, and that's not interesting enough, so someone decided to store that variable into a field and print that field instead of the uninteresting (). Assignments are special-cased.

Conclusions

A proper bugfix needs to be applied to the Scala REPL. I don't see an easy way to fix this issue while keeping the special treatment of assignments, other than eagerly evaluating toString on the variable and storing that in the synthetic result. However, printing is more involved that just a call to toString (The REPL does a deep print on Arrays, truncates output after a certain number of elements, etc). Maybe assignments should not be special-cased after all? The Worksheet doesn't do it, and that's never been a problem. A lie has short legs, as they say in some parts of the world!

Workaround

Never assign a variable in the REPL. You can define a helper method that does it for you, but NEVER, ever, let an assignment be evaluated by the REPL.

def update(r1: RDD[Int]): Unit = {
  ary2 = r1
}  

This can update ary2 as many times as you want, without causing an OOM.

@deanwampler
Copy link

This is great. I forgot about the -Xprint:parse feature. I actually hacked the forked REPL in Spark to print the string before compilation!

Would the ires9 problem be avoided if you added another harmless expression after the assignment, as a workaround? For example, println(array.take(5))?

@retronym
Copy link

A short term workaround might be for the Spark shell to reflectively null out the $ires in previous line's $iw object before interpreting the next line.

Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_71).
Type in expressions for evaluation. Or try :help.

scala> var foo = ""
foo: String = ""

scala> foo = "2"
foo: String = 2

scala> val last = $intp.lastRequest
last: $intp.Request = Request(line=foo = "2", 1 trees)

scala> last.fullAccessPath
res0: String = $iw // ORLY?

scala> $intp.isettings.unwrapStrings = false
$intp.isettings.unwrapStrings: Boolean = false

scala> last.fullAccessPath
res1: String = $line4.$read.$iw.$iw

scala> val iw = $line4.$read.$iw.$iw
iw: $line4.$read.$iw.$iw.type = $line4.$read$$iw$$iw$@23444a36

scala> iw.getClass.getMethods.find(_.getName.startsWith("$ires")).head.invoke(iw)
res2: Object = 2

scala> def nullIRes(x: Any) = { val cls = x.getClass; cls.getDeclaredFields.filter(_.getName.startsWith("$ires")).foreach{fld => fld.setAccessible(true); fld.set(x, null)}}
nullIRes: (x: Any)Unit

scala> $intp.interpret(s"nullIRes(${last.fullAccessPath})")
res4: scala.tools.nsc.interpreter.IR.Result = Success

scala> iw.getClass.getMethods.find(_.getName.startsWith("$ires")).head.invoke(iw)
res6: Object = null

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment