Skip to content

Instantly share code, notes, and snippets.

Tim Van Wassenhove timvw

View GitHub Profile
@timvw
timvw / sparkgen.sc
Last active Feb 18, 2019
generate --dependencies and --repositories arguments for spark from (effective) pom file
View sparkgen.sc
import java.io._
import scala.xml.XML
import ammonite.ops._
import ammonite.ops.ImplicitWd._
val tempFile = File.createTempFile("tmp", "tmp")
tempFile.deleteOnExit()
val f = tempFile.getAbsolutePath
val res = %%('mvn, "help:effective-pom", s"-Doutput=${f}")
val xml = XML.loadFile(f)
@timvw
timvw / Cake.scala
Created Feb 7, 2019
Demonstrating cake pattern
View Cake.scala
trait IHaveLakeName {
def lakeName: String
}
trait IHaveStreamName {
def streamName: String
}
trait IHaveLakeAndStreamName extends IHaveLakeName with IHaveStreamName
@timvw
timvw / App.scala
Created Jan 14, 2019
demonstrate how to connect with username/password to cassandra
View App.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf.ConnectionHostParam
import com.datastax.spark.connector.cql.DefaultAuthConfFactory.{PasswordParam, UserNameParam}
val spark = {
SparkSession
.builder()
.master("local[*]")
.appName("test")
@timvw
timvw / GetLatestMessageFromKafka.scala
Created Oct 30, 2018
Get latest message from kafka topic with spark structured streaming (batch)
View GetLatestMessageFromKafka.scala
import spark.implicits._
import org.apache.spark.sql.functions._
val ds = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.orderBy(desc("timestamp"))
@timvw
timvw / mvn-deploy.sh
Created Oct 15, 2018
Manually deploy an artifact to nexus
View mvn-deploy.sh
mvn deploy:deploy-file -DgroupId=com.sksamuel.avro4s \
-DartifactId=avro4s-core_2.11 \
-Dversion=2.0.1 \
-Dpackaging=jar \
-Dfile=/Users/timvw/Desktop/avro4s-core_2.11-2.0.1.jar \
-DgeneratePom=true \
-DrepositoryId=icteam \
-Durl=http://nexus.icteam.be/nexus/content/repositories/custom/
@timvw
timvw / App.scala
Last active Oct 4, 2018
Spark structured streaming: Commit source offsets to Kafka on QueryProgress
View App.scala
val groupId = "demo-consumer"
val bootstrapServers = "localhost:9092"
val spark = SparkSession
.builder()
.master("local[*]")
.appName("demo")
//.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s")
//.config("spark.sql.streaming.metricsEnabled", true)
.config("spark.sql.streaming.minBatchesToRetain", 2)
@timvw
timvw / fetch_dependencies.sh
Last active Aug 28, 2018
Download kafka-connect-hdfs jars
View fetch_dependencies.sh
# download jars from nexus
mvn dependency:copy-dependencies
# copy jars into a folder under plugin.path (connect-distribute.properties)
# in this example plugin.path=./data/kafka/jars
cp -R ./target/dependency ./data/kafka/jars/kafka-connect-hdfs
@timvw
timvw / demo.scala
Created Jun 18, 2018
Explore spark sql (avro -> json)
View demo.scala
val spark = SparkSession
.builder()
.config("spark.sql.avro.compression.codec", "snappy")
.appName("test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val file = "/Users/timvw/src/docker/docker-streaming/data/kafka/hdfs/topics/test/year=2018/month=06/day=06/hour=20/test+0+0000000000+0000000029.avro"
@timvw
timvw / gist:36758863d77670c821b89c3d0f715b69
Created May 16, 2018
avro encode/decode single record
View gist:36758863d77670c821b89c3d0f715b69
val entityMetadataSchema = SchemaBuilder
.record("entityMetadata")
.fields()
.name("relatedLookupRecords").`type`().array().items(Schema.create(Schema.Type.STRING)).noDefault()
.endRecord()
def serialize(x: GenericRecord, s: Schema): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val fw = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
fw.create(s, baos)
View App.scala (multiple lookup calls)
package be.icteam.playground
import be.icteam.playground.App.ParseError
import cats._
import cats.instances._
import cats.implicits._
import cats.syntax.apply._
object App {
You can’t perform that action at this time.