View sparkjobapi.scala
package spark.jobserver.api
import com.typesafe.config.Config
import org.scalactic._
import spark.jobserver.api._
trait ContextProvider[C] {
val ctx: C with ContextLike = null
def context: C with ContextLike = ctx
}
View gist:5690cd1b44ee7f2057a81b3b6cc100fc
:: problems summary ::
:::: WARNINGS
[FAILED ] org.scala-sbt#main;0.13.11!main.jar: The HTTP response code for https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/main/0.13.11/jars/main.jar did not indicate a success. See log for more detail. (323ms)
[FAILED ] org.scala-sbt#main;0.13.11!main.jar: The HTTP response code for https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/main/0.13.11/jars/main.jar did not indicate a success. See log for more detail. (323ms)
==== typesafe-ivy-releases: tried
https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/main/0.13.11/jars/main.jar
[FAILED ] org.scala-sbt#compiler-interface;0.13.11!compiler-interface.jar: The HTTP response code for https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/compiler-interface/0.13.11/jars/compiler-interface.jar did not indicate a success. See log for more detail. (308ms)
[FAILED ] org.scala-sbt#compiler-interface;0.13.11!compiler-interface.jar: The HTTP response code for https:
View spark-jobserver-ui-thoughts
* See all current contexts
* For each context see all the jobs that are running
* History of past jobs for a given context. Actually this is not technically available via API; but history of past jobs
* Be able to see configuration for each job present or past (this is available via a tiny "C" link in current UI)
* Job results
* Link to Spark UI for detailed analysis
View scala-call-func-type-param.md

Let's say you have a Scala function which takes type parameter:

  def myFunc[K]: T

Let's say I have several functions like that. Right now, if K could be one of several different values, I'd need some code like the following:

  kType match {
View gist:909408dc6f053d6934be
// Primitive.scala - only change is asCql method
implicit object BlobIsPrimitive extends Primitive[ByteBuffer] {
override type PrimitiveType = java.nio.ByteBuffer
val cassandraType = CQLSyntax.Types.Blob
override def fromRow(column: String, row: Row): Try[ByteBuffer] = nullCheck(column, row) {
r => r.getBytes(column)
View gist:213b837c6e02c4982a9a

...to be turned into a blog post later. These are notes with references to commits, the blog post will have snippets of code so folks don't have to look things up.

How I tuned Filo for 50x speedup in 24 hours

Filo is an extreme serialization library for vector data. Think of it as the good parts of Parquet without the HDFS and file format garbage -- just the serdes and fast columnar storage.

I recently added a JMH benchmark for reading a Filo binary buffer containing 10,000 Ints using the simplest apply() method to sum up all the Ints.

Oh, and before we get started - avoid throwing exceptions in inner loops, especially Try(....).getOrElse(...) patterns. Even if they occur only occasionally they can be extremely expensive.

View gist:69ca1ab5e758d3b0ab13
import com.vividsolutions.jts.geom._
import com.vividsolutions.jts.geom.impl.PackedCoordinateSequence
import com.vividsolutions.jts.geom.util.GeometryTransformer
/**
* A custom CoordSequence based on byte arrays for compactness and speed
* Just 2 dimensions for now.
*
* It's an example of creating a custom CoordinateSequence.
* NOTE: This is much more memory efficient, but slower because of deserialization cost.
View gist:10447394
case StopContext(name) =>
if (contexts contains name) {
logger.info("Shutting down context {}", name)
context.watch(contexts(name)) // watch for termination event
contexts(name) ! PoisonPill
contexts.remove(name)
resultActors.remove(name)
sender ! ContextStopped
} else {
sender ! NoSuchContext
View gist:8967611
// We want a generic way to chunk large arrays into segments of byte arrays
// and to do so in a way that doesn't blow up memory for large objects, ie ability to chunk / page / stream
trait ChunkingArraySerDe[T] {
def apply(data: Array[T], chunkSize: Int): Iterator[Array[Byte]]
def unapply(serialized: Iterator[Array[Byte]]): Array[T]
}
// For arrays with fixed-size elements
trait PrimitiveChunkingSerDe[@specialized(Int, Long, Boolean) T] extends ChunkingArraySerDe[T] {
View gist:8921238

Useful Scalac Flags

So, there have been some discussions and angry tweets recently about irritating Scala "features" (like value discarding and auto-tupling) that can actually be turned off by selecting the right compiler flag in conjunction with -Xfatal-warnings. I highly recommend a set of options something like those below.

scalacOptions ++= Seq(
  "-deprecation",           
  "-encoding", "UTF-8",       // yes, this is 2 args
  "-feature",                
  "-language:existentials",