Skip to content

Instantly share code, notes, and snippets.

import com.typesafe.config.ConfigFactory
import star.io.{JdbcReader, Reader}
import star.{Loader, StarConfig}
/**
* Created by markmo on 12/03/2016.
*/
class StarSpec extends UnitSpec {
val starConf = new StarConfig(ConfigFactory.load("star.conf"))
@markmo
markmo / gist:7653ba15bd054887286e
Created March 12, 2016 01:26
Transformation DSL Example
/* WIP - NOT READY
"DSL notation" should "execute a pipeline" in {
import diamond.transformation._
// import helper functions such as fieldLocator
import RowTransformation._
// create a new context object that can pass state between transformations
val ctx = new TransformationContext
import java.util.Calendar
import diamond.io.{CSVSink, CSVSource}
import diamond.models.{AttributeType, Event, Feature}
import diamond.store.{FeatureStore, FeatureStoreRepository}
import diamond.transform.TransformationContext
import diamond.transform.row.{AppendColumnRowTransformation, RowTransformation}
import diamond.transform.sql.NamedSQLTransformation
import diamond.transform.table.RowTransformationPipeline
import diamond.utility.dateFunctions._
@markmo
markmo / gist:2c775674307bff284f2b
Created March 12, 2016 01:15
EventAnalysisSpec
import java.util.Calendar
import diamond.models.Event
import diamond.transform.eventFunctions._
import diamond.utility.dateFunctions._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.streaming.Duration
/**
/**
* Created by markmo on 20/01/2016.
*/
class DemoSpec extends UnitSpec {
"Data" should "be processed" in {
val demo = sqlContext.read.load(s"$BASE_URI/$LAYER_RAW/Customer_Demographics.parquet")
val tx = sqlContext.read.load(s"$BASE_URI/$LAYER_RAW/Customer_Transactions.parquet")
/*
* Alternative implementation using a Custom Interpolator.
*
* @param sc StringContext
*/
implicit class TemplateHelper(val sc: StringContext) extends AnyVal {
def t(args: Any*): String = {
val strings = sc.parts.iterator
val expressions = args.iterator
package diamond.transform
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
/**
* Created by markmo on 12/12/2015.
*/
object schemas {
val inputSchema = StructType(Seq(
@markmo
markmo / gist:2f79e46397567bca7acf
Created March 12, 2016 00:23
Spark Transformation DSL
package diamond.transform
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.types.DataType
import scala.language.implicitConversions
/**
* All this is WIP - NOT READY
*
trait FeatureType[V] {
val name: String
}
trait FeatureX[V] extends FeatureType[V] {
val value: V
}
case class FeatureCollection(id: Int,
package diamond.models
import java.util.Date
/**
* @param entity String entity id (usually hashed)
* @param eventType String attribute name
* @param ts Date event timestamp
* @param namespace String logical grouping
* @param session Option[String] session id