Skip to content

Instantly share code, notes, and snippets.

@toddleo
Last active August 17, 2018 06:03
Show Gist options
  • Save toddleo/0988d3334ffbbf772373186f126ba5ef to your computer and use it in GitHub Desktop.
Save toddleo/0988d3334ffbbf772373186f126ba5ef to your computer and use it in GitHub Desktop.
A slow Kafka + Spark Structured Streaming example with CSV
import scalaz._, Scalaz._
import reflect.runtime.universe._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object MyJob {
def main(args: Array[String]): Unit = {
implicit val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
spark.conf.set("spark.streaming.backpressure.enabled", true)
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
val query = df |> ReadAsCSV |>
{ _.writeStream
// .trigger(Trigger.Continuous("1 second")) // Continuous Processing
.format("console")
// .option("truncate", "false")
.start()
}
// spark.streams.addListener(streamingListener)
query.awaitTermination
}
def ReadAsCSV(df: DataFrame, schema: StructType): DataFrame = {
import df.sqlContext.implicits._
val (stringTypes, otherTypes) = schema.partition(_.dataType == StringType)
val fieldsWithTypeFix = stringTypes.map { field =>
col(field.name) cast field.dataType.simpleString as field.name
} ++ otherTypes.map(_.name).map(col)
df.select('value cast "string")
.select(split('value, """\^""") as "raw")
.select(ColumnExplode('raw, schema.size): _*) // flatten WrappedArray
.toDF(schema.fieldNames: _*) // apply column names
.select(fieldsWithTypeFix: _*) // cast column types from string
.select(schema.fieldNames.map(col): _*) // re-order columns, as defined in schema
}
def ColumnExplode(col: Column, len: Int) = {
0 until len map(col.apply)
}
implicit class DataFrameSupport(df: DataFrame) {
def select[T: TypeTag](cols: Traversable[T]): DataFrame = {
cols match {
case c if typeOf[T] =:= typeOf[String] =>
val cols = c.asInstanceOf[Traversable[String]]
df.select(cols.head, cols.toSeq.tail: _*)
case c if typeOf[T] <:< typeOf[Column] =>
val cols = c.asInstanceOf[Traversable[Column]]
df.select(cols.toSeq: _*)
}
}
}
def schema = StructType(fieldTypes)
def fieldTypes: Seq[StructField] = Seq(
"timestamp" -> StringType,
"rid" -> IntegerType,
"conn_conn_state" -> StringType,
"conn_duration" -> FloatType
// and many more...
).map { case (field, datatype) =>
StructField(field, datatype, nullable = true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment