Skip to content

Instantly share code, notes, and snippets.

import java.nio.charset.Charset
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFutureListener}
import org.apache.kafka.clients._
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
case class Test(i1: Int, i2: Int, i3: Int, i4: Int, i5: Int, i6: Int, i7: Int, i8: Int)
object EncoderBenchmark {
val runCount = 10000000
@marmbrus
marmbrus / extraStrategies.md
Created September 10, 2015 21:35
Example of injecting custom planning strategies into Spark SQL.

First a disclaimer: This is an experimental API that exposes internals that are likely to change in between different Spark releases. As a result, most datasources should be written against the stable public API in org.apache.spark.sql.sources. We expose this mostly to get feedback on what optimizations we should add to the stable API in order to get the best performance out of data sources.

We'll start with a simple artificial data source that just returns ranges of consecutive integers.

/** A data source that returns ranges of consecutive integers in a column named `a`. */
case class SimpleRelation(
    start: Int, 
    end: Int)(
    @transient val sqlContext: SQLContext) 
package com.databricks.spark.jira
import scala.io.Source
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.{TableScan, BaseRelation, RelationProvider}
@marmbrus
marmbrus / gist:15e72f7bc22337cf6653
Created November 27, 2014 03:10
Parallel list files on S3 with Spark
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
case class S3File(path: String, isDir: Boolean, size: Long) {
def children = listFiles(path)
}
def listFiles(path: String): Seq[S3File] = {
val fs = FileSystem.get(new java.net.URI(path), new Configuration())
fs.listStatus(new Path(path)).map(s => S3File(s.getPath.toString, s.isDir, s.getLen))
@marmbrus
marmbrus / gist:fff0b058f134fa7752fe
Last active September 17, 2015 14:27
Spark Hadoop Filesystem Textfile Iterator
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.GZIPInputStream
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
/**
* Returns an iterator over the lines in the file using the hadoop file system.
*
* Comparison to sparkContext.textFile:
@marmbrus
marmbrus / GenerateAdd.scala
Last active August 29, 2015 14:02
Spark SQL Code Generation Example
(() => {
final class $anon extends org.apache.spark.sql.catalyst.expressions.MutableProjection {
def <init>() = {
super.<init>();
()
};
private[this] var mutableRow: org.apache.spark.sql.catalyst.expressions.MutableRow = new org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
def target(row: org.apache.spark.sql.catalyst.expressions.MutableRow): org.apache.spark.sql.catalyst.expressions.MutableProjection = {
mutableRow = row;
this
@marmbrus
marmbrus / gist:14fd5ef364ea0ccdf15f
Last active August 29, 2015 14:01
Parquet Dependency Changes
akka-actor_2.10-2.2.3-shaded-protobuf.jar
akka-remote_2.10-2.2.3-shaded-protobuf.jar
akka-slf4j_2.10-2.2.3-shaded-protobuf.jar
ant-1.9.0.jar
ant-launcher-1.9.0.jar
chill-java-0.3.6.jar
chill_2.10-0.3.6.jar
colt-1.2.0.jar
commons-beanutils-1.7.0.jar
commons-beanutils-core-1.8.0.jar
### Keybase proof
I hereby claim:
* I am marmbrus on github.
* I am marmbrus (https://keybase.io/marmbrus) on keybase.
* I have a public key whose fingerprint is 93D3 F9A6 9928 D87A 1398 2069 7D5F A00B 4BAE C60C
To claim this, I am signing this object:
@marmbrus
marmbrus / ViewsExample
Last active December 31, 2015 10:09
An example of using catalyst's TreeNode transform functionality to replace UnresolvedRelations with other logical plans.
import catalyst.analysis.UnresolvedRelation
import catalyst.plans.Inner
import catalyst.plans.logical._
/* Implicit Conversions */
import dsl._
import shark2.TestShark._ // For .toRdd execution using locally running test Shark.
object ViewsExample {
def main(args: Array[String]): Unit = {