Skip to content

Instantly share code, notes, and snippets.

Tim Van Wassenhove timvw

View GitHub Profile
timvw /
Last active Feb 18, 2019
generate --dependencies and --repositories arguments for spark from (effective) pom file
import scala.xml.XML
import ammonite.ops._
import ammonite.ops.ImplicitWd._
val tempFile = File.createTempFile("tmp", "tmp")
val f = tempFile.getAbsolutePath
val res = %%('mvn, "help:effective-pom", s"-Doutput=${f}")
val xml = XML.loadFile(f)
timvw / Cake.scala
Created Feb 7, 2019
Demonstrating cake pattern
View Cake.scala
trait IHaveLakeName {
def lakeName: String
trait IHaveStreamName {
def streamName: String
trait IHaveLakeAndStreamName extends IHaveLakeName with IHaveStreamName
timvw / App.scala
Created Jan 14, 2019
demonstrate how to connect with username/password to cassandra
View App.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf.ConnectionHostParam
import com.datastax.spark.connector.cql.DefaultAuthConfFactory.{PasswordParam, UserNameParam}
val spark = {
timvw / GetLatestMessageFromKafka.scala
Created Oct 30, 2018
Get latest message from kafka topic with spark structured streaming (batch)
View GetLatestMessageFromKafka.scala
import spark.implicits._
import org.apache.spark.sql.functions._
val ds =
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
timvw /
Created Oct 15, 2018
Manually deploy an artifact to nexus
mvn deploy:deploy-file -DgroupId=com.sksamuel.avro4s \
-DartifactId=avro4s-core_2.11 \
-Dversion=2.0.1 \
-Dpackaging=jar \
-Dfile=/Users/timvw/Desktop/avro4s-core_2.11-2.0.1.jar \
-DgeneratePom=true \
-DrepositoryId=icteam \
timvw / App.scala
Last active Oct 4, 2018
Spark structured streaming: Commit source offsets to Kafka on QueryProgress
View App.scala
val groupId = "demo-consumer"
val bootstrapServers = "localhost:9092"
val spark = SparkSession
//.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s")
//.config("spark.sql.streaming.metricsEnabled", true)
.config("spark.sql.streaming.minBatchesToRetain", 2)
timvw /
Last active Aug 28, 2018
Download kafka-connect-hdfs jars
# download jars from nexus
mvn dependency:copy-dependencies
# copy jars into a folder under plugin.path (
# in this example plugin.path=./data/kafka/jars
cp -R ./target/dependency ./data/kafka/jars/kafka-connect-hdfs
timvw / demo.scala
Created Jun 18, 2018
Explore spark sql (avro -> json)
View demo.scala
val spark = SparkSession
.config("spark.sql.avro.compression.codec", "snappy")
import spark.implicits._
val file = "/Users/timvw/src/docker/docker-streaming/data/kafka/hdfs/topics/test/year=2018/month=06/day=06/hour=20/test+0+0000000000+0000000029.avro"
timvw / gist:36758863d77670c821b89c3d0f715b69
Created May 16, 2018
avro encode/decode single record
View gist:36758863d77670c821b89c3d0f715b69
val entityMetadataSchema = SchemaBuilder
def serialize(x: GenericRecord, s: Schema): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val fw = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
fw.create(s, baos)
View App.scala (multiple lookup calls)
package be.icteam.playground
import be.icteam.playground.App.ParseError
import cats._
import cats.instances._
import cats.implicits._
import cats.syntax.apply._
object App {
You can’t perform that action at this time.