A slow Kafka + Spark Structured Streaming example with CSV
import scalaz._, Scalaz._
import reflect.runtime.universe._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object MyJob {
def main(args: Array[String]): Unit = {
implicit val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
spark.conf.set("spark.streaming.backpressure.enabled", true)
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
val query = df |> ReadAsCSV |>
{ _.writeStream
// .trigger(Trigger.Continuous("1 second")) // Continuous Processing
// .option("truncate", "false")
// spark.streams.addListener(streamingListener)
def ReadAsCSV(df: DataFrame, schema: StructType): DataFrame = {
import df.sqlContext.implicits._
val (stringTypes, otherTypes) = schema.partition(_.dataType == StringType)
val fieldsWithTypeFix = { field =>
col( cast field.dataType.simpleString as
} ++'value cast "string")
.select(split('value, """\^""") as "raw")
.select(ColumnExplode('raw, schema.size): _*) // flatten WrappedArray
.toDF(schema.fieldNames: _*) // apply column names
.select(fieldsWithTypeFix: _*) // cast column types from string
.select( _*) // re-order columns, as defined in schema
def ColumnExplode(col: Column, len: Int) = {
0 until len map(col.apply)
implicit class DataFrameSupport(df: DataFrame) {
def select[T: TypeTag](cols: Traversable[T]): DataFrame = {
cols match {
case c if typeOf[T] =:= typeOf[String] =>
val cols = c.asInstanceOf[Traversable[String]], cols.toSeq.tail: _*)
case c if typeOf[T] <:< typeOf[Column] =>
val cols = c.asInstanceOf[Traversable[Column]] _*)
def schema = StructType(fieldTypes)
def fieldTypes: Seq[StructField] = Seq(
"timestamp" -> StringType,
"rid" -> IntegerType,
"conn_conn_state" -> StringType,
"conn_duration" -> FloatType
// and many more...
).map { case (field, datatype) =>
StructField(field, datatype, nullable = true)
