Skip to content

Instantly share code, notes, and snippets.

@zsxwing
Last active February 29, 2016 20:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zsxwing/f46d4f9b3a864fbb6bb9 to your computer and use it in GitHub Desktop.
Save zsxwing/f46d4f9b3a864fbb6bb9 to your computer and use it in GitHub Desktop.
StreamingApp.scala
package streaming.app
import java.util.UUID
import scala.util.Random
import scala.util.control.NonFatal
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingApp {
val checkpointDir = "dbfs:/checkpoint"
val testDir = "dbfs:/test"
def pushData(content: String): Unit = {
val fileName = UUID.randomUUID().toString
val fs = FileSystem.get(SparkContext.getOrCreate(new SparkConf()).hadoopConfiguration)
val dir = new Path(testDir)
if (!fs.exists(dir)) {
fs.mkdirs(dir)
}
val output = fs.create(new Path(dir, fileName))
IOUtils.write(content, output)
output.close()
}
@volatile var pushingDataStopped = false
def startPushingData(): Unit = {
new Thread() {
setDaemon(true)
override def run() {
while (!pushingDataStopped) {
val msg = Random.nextPrintableChar.toString * 4 + " " +
Random.nextPrintableChar.toString * 4
pushData(msg)
Thread.sleep(1000)
}
}
}.start()
}
def stopPushingData(): Unit = {
pushingDataStopped = false
}
def main(args: Array[String]): Unit = {
def createFn(): StreamingContext = {
val sc = SparkContext.getOrCreate(new SparkConf())
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(checkpointDir)
val lines = ssc.textFileStream(testDir)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { rdd =>
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.toDF()
df.show()
}
ssc
}
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, createFn)
ssc.start()
startPushingData()
try {
ssc.awaitTerminationOrTimeout(60000) // Run 1 minute
} catch {
case NonFatal(e) => e.printStackTrace()
} finally {
ssc.stop()
stopPushingData()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment