Skip to content

Instantly share code, notes, and snippets.

View timvw's full-sized avatar

Tim Van Wassenhove timvw

View GitHub Profile
@timvw
timvw / sparkgen.sc
Last active February 18, 2019 13:55
generate --dependencies and --repositories arguments for spark from (effective) pom file
import java.io._
import scala.xml.XML
import ammonite.ops._
import ammonite.ops.ImplicitWd._
val tempFile = File.createTempFile("tmp", "tmp")
tempFile.deleteOnExit()
val f = tempFile.getAbsolutePath
val res = %%('mvn, "help:effective-pom", s"-Doutput=${f}")
val xml = XML.loadFile(f)
@timvw
timvw / Cake.scala
Created February 7, 2019 09:29
Demonstrating cake pattern
trait IHaveLakeName {
def lakeName: String
}
trait IHaveStreamName {
def streamName: String
}
trait IHaveLakeAndStreamName extends IHaveLakeName with IHaveStreamName
@timvw
timvw / App.scala
Created January 14, 2019 20:19
demonstrate how to connect with username/password to cassandra
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf.ConnectionHostParam
import com.datastax.spark.connector.cql.DefaultAuthConfFactory.{PasswordParam, UserNameParam}
val spark = {
SparkSession
.builder()
.master("local[*]")
.appName("test")
@timvw
timvw / GetLatestMessageFromKafka.scala
Created October 30, 2018 08:42
Get latest message from kafka topic with spark structured streaming (batch)
import spark.implicits._
import org.apache.spark.sql.functions._
val ds = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.orderBy(desc("timestamp"))
@timvw
timvw / mvn-deploy.sh
Created October 15, 2018 09:23
Manually deploy an artifact to nexus
mvn deploy:deploy-file -DgroupId=com.sksamuel.avro4s \
-DartifactId=avro4s-core_2.11 \
-Dversion=2.0.1 \
-Dpackaging=jar \
-Dfile=/Users/timvw/Desktop/avro4s-core_2.11-2.0.1.jar \
-DgeneratePom=true \
-DrepositoryId=icteam \
-Durl=http://nexus.icteam.be/nexus/content/repositories/custom/
@timvw
timvw / App.scala
Last active September 28, 2022 23:36
Spark structured streaming: Commit source offsets to Kafka on QueryProgress
val groupId = "demo-consumer"
val bootstrapServers = "localhost:9092"
val spark = SparkSession
.builder()
.master("local[*]")
.appName("demo")
//.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s")
//.config("spark.sql.streaming.metricsEnabled", true)
.config("spark.sql.streaming.minBatchesToRetain", 2)
@timvw
timvw / fetch_dependencies.sh
Last active August 28, 2018 08:31
Download kafka-connect-hdfs jars
# download jars from nexus
mvn dependency:copy-dependencies
# copy jars into a folder under plugin.path (connect-distribute.properties)
# in this example plugin.path=./data/kafka/jars
cp -R ./target/dependency ./data/kafka/jars/kafka-connect-hdfs
@timvw
timvw / demo.scala
Created June 18, 2018 07:45
Explore spark sql (avro -> json)
val spark = SparkSession
.builder()
.config("spark.sql.avro.compression.codec", "snappy")
.appName("test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val file = "/Users/timvw/src/docker/docker-streaming/data/kafka/hdfs/topics/test/year=2018/month=06/day=06/hour=20/test+0+0000000000+0000000029.avro"
@timvw
timvw / gist:36758863d77670c821b89c3d0f715b69
Created May 16, 2018 14:10
avro encode/decode single record
val entityMetadataSchema = SchemaBuilder
.record("entityMetadata")
.fields()
.name("relatedLookupRecords").`type`().array().items(Schema.create(Schema.Type.STRING)).noDefault()
.endRecord()
def serialize(x: GenericRecord, s: Schema): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val fw = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
fw.create(s, baos)
package be.icteam.playground
import be.icteam.playground.App.ParseError
import cats._
import cats.instances._
import cats.implicits._
import cats.syntax.apply._
object App {