Skip to content

Instantly share code, notes, and snippets.

@vsevolodstep-db
Last active September 25, 2023 10:12
Show Gist options
  • Save vsevolodstep-db/b8e4d676745d6e2d047ecac291e5254c to your computer and use it in GitHub Desktop.
Save vsevolodstep-db/b8e4d676745d6e2d047ecac291e5254c to your computer and use it in GitHub Desktop.
Notes about Spark's ClosureCleaner and Ammonite commands

Consider the following three code blocks (code cells) executed in Ammonite REPl with CodeClassWrapper:

cmd0:

val x = 123; val y = 456

cmd1:

val z = 789
def foo(a: Int) = { x }

cmd2:

val lambda = (a: Int) => a + foo(a) + z

In this example, cmd1 references variable x defined in cmd0 and cmd2 uses variables from both cmd1 and cmd0 (transitive dependency cmd1#foo -> cmd0#x). Ammonite will translate these code blocks to this generated code (slightly simplified for better readability):

cmd0:

object cmd0{
  val wrapper = new cmd0; val instance = new wrapper.Helper
}
final class cmd0 extends java.io.Serializable {
  final class Helper extends java.io.Serializable{
    val x = 123; val y = 456
  }
}

cmd1:

object cmd1{
  val wrapper = new cmd1; val instance = new wrapper.Helper
}
final class cmd1 extends java.io.Serializable {
  val cmd0: ammonite.$sess.cmd0.instance.type = ammonite.$sess.cmd0.instance
  import cmd0.{y, x}
  final class Helper extends java.io.Serializable{
    def foo(a: Int) = { x }; 
    val z = 789;
  }
}

cmd2:

object cmd2{
  val wrapper = new cmd2; val instance = new wrapper.Helper
}
final class cmd2 extends java.io.Serializable {
 val cmd0: ammonite.$sess.cmd1.instance.type = null
 val cmd1: ammonite.$sess.cmd1.instance.type = ammonite.$sess.cmd1.instance
 import cmd1.{z, foo}
 import cmd1.{y, x}

 final class Helper extends java.io.Serializable{
  val w = new NonSerializable
  val lambda = (a: Int) => a + foo(a) + z
 }
}

Note: real command code is more complex. In the example above, cmd2#cmd0 == null because cmd0 is not directly referenced from cmd2. However, actual Ammonite codegen will figure this out in runtime instead of codegen time. It doesn't impact ideas described here.

The picture below displays a reference graph for given code example. Dashed arrows are references that are not actually accessed (and are safe to null out). Please note that there's alsoa circular dependency between lambda and cmd2$Helper objects which may lead to serialization issues if not handled properly image

Please refer to this gist to understand how Spark's Closure Cleaner works for normal Scala closures & closures defined in a native Scala REPL.

Notes about Spark's ClosureCleaner and Ammonite commands

Spark Connect uses Ammonite REPL by default. Ammonite REPL has its own code cell wrapping, SparkConnect uses CodeClassWrapper by default. For given block of code, this wrapper generates:

  1. Command object holding references to other commands variables
  2. Command Helper object (inner object of command) containing actual cell code and keeping references to defined variables
  3. Command companion object, instantiating both command and helper objects.

For a detailed example please refer to ammonite_code_wrapper.md

This means that a closure defined in an Ammonite command#X will capture:

  • Enclosing cmdX$Helper object
  • outer cmdX object via cmdX$Helper.$outer reference
  • cmdY$Helper & cmdY objects if this command references variables defined in command#Y

As a result, a closure may capture some objects via transitive references that it doesn't actually need. This may lead to serialization issues and in an increased payload size. Spark implements ClosureCleaner which is capable of cleaning Scala closures by nullifying fields that are not accessed by the closure code. However, the current implemention has a couple of limitations:

  • It's somewhat specific to the native Scala REPL (e.g. it's hardcoded that closure capturing class name must start with $line and end with $iw
  • It doesn't support traversing transitive (non-nested) dependencies

To properly support Scala UDFs in SparkConnect with Ammonite REPL, we need to modify ClosureCleaner to overcome the existing limitations

New Ammonite ClosureCleaner codepath

The current ClosureCleaner implementation traverses closure enclosing class in IndylambdaScalaClosures.findAccessedFields method, keeping track of its accessed fields and recursively visiting nested classes and closures. The changes described below will be applied only to closures defined in Ammonite (capturing class name starts with ammonite.$sess.$cmdX

To support Ammonite closures, this traversal needs to be extended to:

  • recursively traverse any cmdX and cmdX$Helper object it encounters
  • keep track of instances of Ammonite cmdX and cmdX$Helper objects, as they'll be modified later

Note that ClosureCleaner should not try to traverse arbitrary user-defined objects, as it can be absolutely anything and it's safer not to try to modify them.

After closure traversal, we need to:

  1. Clone each cmdX and cmdX$Helper object reachable from this closure
  2. Nullify cmdX$Helper fields that are not accessed. Initialize cmdX$Helper $outer field with a clone of cmdX
  3. Initialize cmdX clone references to cmdY$Helper objects with proper clone references

Corner cases

  1. Sometimes outer class of a closure defined in Ammonite REPL is not an Ammonite-generated object. For example, in this code
val x = 123
class Test extends Serializable {
  // capturing class is cmd$Helper$Test
  val myUdf = udf((i: Int) => i + x)
}

closure myUdf captures Test instance, which is an inner class of cmd$Helper object. In this case, we'll still visit outer cmd object, as well as its transitive dependencies

  1. Sometimes, a closure may be enclosed by another closue. Consider this code: cmd:
val x = 123
spark.range(0, 10).
// for this call UdfUtils will create a new lambda and this lambda becomes enclosing
  map(i => i + x).
  agg(sum("value")).
  collect()(0)(0).asInstanceOf[Long]

Dataset:

  def map[U: Encoder](f: T => U): Dataset[U] = {
    mapPartitions(UdfUtils.mapFuncToMapPartitionsAdaptor(f))
  }

UdfUtils:

def mapFuncToMapPartitionsAdaptor[T, U](f: T => U): Iterator[T] => Iterator[U] = _.map(f(_))

The actual lambda passed to ClosureCleaner will be of type org.apache.spark.sql.connect.common.UdfUtils$$$Lambda$.... It's first captured argument (enclosing object) will be of type ammonite.$sess.cmd3$Helper$$Lambda$... which is our i => i + x lambda. In this case, we try to clean enclosing closure instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment