Skip to content

Instantly share code, notes, and snippets.

Tim Van Wassenhove timvw

Block or report user

Report or block timvw

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
timvw /
Last active Oct 17, 2019
connect to cloudkarafka with vanilla kafka tools
# Export variables if not set already
timvw /
Last active Feb 18, 2019
generate --dependencies and --repositories arguments for spark from (effective) pom file
import scala.xml.XML
import ammonite.ops._
import ammonite.ops.ImplicitWd._
val tempFile = File.createTempFile("tmp", "tmp")
val f = tempFile.getAbsolutePath
val res = %%('mvn, "help:effective-pom", s"-Doutput=${f}")
val xml = XML.loadFile(f)
timvw / Cake.scala
Created Feb 7, 2019
Demonstrating cake pattern
View Cake.scala
trait IHaveLakeName {
def lakeName: String
trait IHaveStreamName {
def streamName: String
trait IHaveLakeAndStreamName extends IHaveLakeName with IHaveStreamName
timvw / App.scala
Created Jan 14, 2019
demonstrate how to connect with username/password to cassandra
View App.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf.ConnectionHostParam
import com.datastax.spark.connector.cql.DefaultAuthConfFactory.{PasswordParam, UserNameParam}
val spark = {
timvw / GetLatestMessageFromKafka.scala
Created Oct 30, 2018
Get latest message from kafka topic with spark structured streaming (batch)
View GetLatestMessageFromKafka.scala
import spark.implicits._
import org.apache.spark.sql.functions._
val ds =
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
timvw /
Created Oct 15, 2018
Manually deploy an artifact to nexus
mvn deploy:deploy-file -DgroupId=com.sksamuel.avro4s \
-DartifactId=avro4s-core_2.11 \
-Dversion=2.0.1 \
-Dpackaging=jar \
-Dfile=/Users/timvw/Desktop/avro4s-core_2.11-2.0.1.jar \
-DgeneratePom=true \
-DrepositoryId=icteam \
timvw / App.scala
Last active Oct 16, 2019
Spark structured streaming: Commit source offsets to Kafka on QueryProgress
View App.scala
val groupId = "demo-consumer"
val bootstrapServers = "localhost:9092"
val spark = SparkSession
//.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s")
//.config("spark.sql.streaming.metricsEnabled", true)
.config("spark.sql.streaming.minBatchesToRetain", 2)
timvw /
Last active Aug 28, 2018
Download kafka-connect-hdfs jars
# download jars from nexus
mvn dependency:copy-dependencies
# copy jars into a folder under plugin.path (
# in this example plugin.path=./data/kafka/jars
cp -R ./target/dependency ./data/kafka/jars/kafka-connect-hdfs
timvw / demo.scala
Created Jun 18, 2018
Explore spark sql (avro -> json)
View demo.scala
val spark = SparkSession
.config("spark.sql.avro.compression.codec", "snappy")
import spark.implicits._
val file = "/Users/timvw/src/docker/docker-streaming/data/kafka/hdfs/topics/test/year=2018/month=06/day=06/hour=20/test+0+0000000000+0000000029.avro"
timvw / gist:36758863d77670c821b89c3d0f715b69
Created May 16, 2018
avro encode/decode single record
View gist:36758863d77670c821b89c3d0f715b69
val entityMetadataSchema = SchemaBuilder
def serialize(x: GenericRecord, s: Schema): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val fw = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
fw.create(s, baos)
You can’t perform that action at this time.