Skip to content

Instantly share code, notes, and snippets.

@maasg
Created May 30, 2017 07:38
Show Gist options
  • Save maasg/9d9a410c8e7ddad23625ae293bd8def7 to your computer and use it in GitHub Desktop.
Save maasg/9d9a410c8e7ddad23625ae293bd8def7 to your computer and use it in GitHub Desktop.
Spark Streaming job that uses a file stream
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Try
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Try
val ssc = new StreamingContext(sparkContext, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@32d037e1
val customSchema = StructType(Array(
      StructField("column0", StringType, true),
      StructField("column1", StringType, true),
      StructField("column2", StringType, true)))
customSchema: org.apache.spark.sql.types.StructType = StructType(StructField(column0,StringType,true), StructField(column1,StringType,true), StructField(column2,StringType,true))
val dStream = ssc.textFileStream("/tmp/streaming/test")
val rowDStream = dStream.map(line => line.split(">")).map(array => {
  val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
  val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
  val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
  Row.fromSeq(Seq(first, second, third))
})
dStream: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@3b04513c
rowDStream: org.apache.spark.streaming.dstream.DStream[org.apache.spark.sql.Row] = org.apache.spark.streaming.dstream.MappedDStream@5f03e1f2
@transient val rawDstreamViz = ul(10)
rawDstreamViz
rawDstreamViz: notebook.front.widgets.HtmlList = 
res87: notebook.front.widgets.HtmlList = 
dStream.foreachRDD(rdd => rdd.collect.foreach(e => rawDstreamViz.append(e.toString)))
@transient val dfViz = ul(10)
dfViz
dfViz: notebook.front.widgets.HtmlList = 
res91: notebook.front.widgets.HtmlList = 
rowDStream.foreachRDD { rdd =>
  val trainingDF = sparkSession.createDataFrame(rdd, customSchema)
  val res = trainingDF.groupBy("column1", "column2").count().collect().map(e => e.toString)
  dfViz.appendAll(res)
  /////////////////////end extract the packet
  val numFeatures = 3
  val model = new StreamingLinearRegressionWithSGD()
    .setInitialWeights(Vectors.zeros(numFeatures))
  /////////////////////////////////////////////////////Model made
}
ssc.start()
ssc.stop(false)
@maasg
Copy link
Author

maasg commented May 30, 2017

Raw Data

image

Grouped DataFrame

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment