Skip to content

Instantly share code, notes, and snippets.

// run with "--conf spark.cleaner.referenceTracking=false"
// spin up our full set of executors
sc.parallelize(1 to 100, 100).map { x => Thread.sleep(1000); x}.collect()
def getLocalDirs(): Array[String] = {
val clz = Class.forName("org.apache.spark.util.Utils")
val conf = org.apache.spark.SparkEnv.get.conf
val method = clz.getMethod("getConfiguredLocalDirs", conf.getClass())
method.setAccessible(true)
method.invoke(null, conf).asInstanceOf[Array[String]]
@squito
squito / AccumulatorListener.scala
Last active March 15, 2019 06:34
Accumulator Examples
import scala.collection.mutable.Map
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext}
import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListener}
import org.apache.spark.SparkContext._
/**
* just print out the values for all accumulators from the stage.
* you will only get updates from *named* accumulators, though
@squito
squito / getPaths.scala
Last active March 15, 2019 06:31
get paths of a spark-sql query
import java.lang.reflect.Method
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation}
import org.apache.spark.sql.DataFrame
def getPaths(relation: BaseRelation): Iterator[String] = {
relation match {
case hr: HadoopFsRelation =>
hr.paths.toIterator
@squito
squito / SlowIterator.scala
Last active September 14, 2018 20:41
SlowIterationLogger
// This is an example iterator that runs slowly, to demonstrate how SlowLoggingIterator works
// it just iterates over a range of ints, but puts in occassional delays, to simulate an iterator that is
// actually doing something more complex, eg. fetching records from a DB which is occassionaly slow.
class SlowIterator(start: Int, end: Int, delay: Long, every: Int) extends java.util.Iterator[Integer] {
val underlying = (start until end).toIterator
def hasNext(): Boolean = underlying.hasNext
def next(): Integer = {
@squito
squito / tl.out
Last active May 1, 2018 16:13
InheritableThreadLocals
creating a new thread pool thread 0 : tl = null
creating a new thread pool thread 1 : tl = null
creating a new thread pool thread 2 : tl = null
creating a new thread pool thread 3 : tl = null
creating a new thread pool thread 4 : tl = null
creating a new thread pool thread 5 : tl = null
creating a new thread pool thread 6 : tl = null
creating a new thread pool thread 7 : tl = null
creating a new thread pool thread 8 : tl = null
creating a new thread pool thread 9 : tl = null
# filters out "apachespark" choice now
# notice what happens with
# * bad input ("floop")
# * real user that can't be assigned a jira ("fakeimran")
# * selection from list ("imran")
# * arbitrary user that can be assigned ("vanzin")
In [1]: from merge_spark_pr import *
@squito
squito / equality_parsing_psql_timestamp.md
Last active September 11, 2017 16:27
example of how postgres treats different timestamp types, both parsing and as the timezone is changed
@squito
squito / gist:ccd56fefefe4dfef808dc21196a89385
Created August 28, 2017 18:05
random example of exploring spark internals w/ reflection while debugging cluster config
// paste in
// https://gist.githubusercontent.com/squito/329d9cd82a21f645d592/raw/ca9217708293d6fe69ed6638f4feeb3038f8fd9c/reflector.scala
val xCat = spark.sessionState.catalog.externalCatalog
val catClient = get(xCat, "client")
catClient.reflectMethod("getConf", Seq("hive.metastore.uris", ""))
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)
@squito
squito / LA_output.txt
Created August 24, 2017 16:47
Java timestamp mechanics
> scala -Duser.timezone=America/Los_Angeles timestamp.scala
Defaul TZ: America/Los_Angeles
hours in UTC: 8
TZ offset in hours: -8
@squito
squito / macro compiler error.scala
Last active August 23, 2017 05:45
The horrors of compiler errors when working with macros ...
scala> def classExpandMacroImpl(c: Context)(s: c.Expr[Any]) : c.Expr[Any] = {
| import c.universe._
|
| val cdef = s.tree match {
| case Block(List(x:ClassDef), _) => x
| case _ => c.abort(c.enclosingPosition, "Was expecting a block w/ a ClassDef")
| }
|
| val q"class $name { ..$body }" = cdef
| val newdefs = List[c.universe.Tree](q"def x: Int = z + 7", q"def y: Float = z + 3.2f")