Skip to content

Instantly share code, notes, and snippets.

@mrpotes
Created July 31, 2013 12:30
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mrpotes/6121580 to your computer and use it in GitHub Desktop.
Save mrpotes/6121580 to your computer and use it in GitHub Desktop.
class PriceReceiver extends NetworkReceiver[String] with PriceWebSocketClient {
lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
protected override def onStart() {
blockGenerator.start
createSocket(m => blockGenerator += m)
}
protected override def onStop() {
blockGenerator.stop
websocket.shutdown
}
}
import scala.collection.immutable.List
import spark.SparkContext._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.streaming.dstream._
object DataStream extends App {
val reportHeader = """----------------------------------------------
Positive Trending
=================
""".stripMargin
override def main(args: Array[String]) {
import Listings._
import System._
val ssc = new StreamingContext("local", "datastream", Seconds(15), "C:/software/spark-0.7.3", List("target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
object stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
ssc.checkpoint("spark")
ssc.registerInputStream(stream)
val reduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 + pair._2)
else (reduced._1 + pair._1, reduced._2 - pair._2)
}
val invReduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 - pair._2)
else (reduced._1 + pair._1, reduced._2 + pair._2)
}
val sectorPriceChanges = stream.map(pu => (listingSectors(pu.id), (pu.price - pu.lastPrice, 1)))
val windowedPriceChanges = sectorPriceChanges.reduceByKeyAndWindow(reduce, invReduce, Seconds(5*60), Seconds(15))
val positivePriceChanges = windowedPriceChanges.filter{case (_, (_, count)) => count > 0}
val priceChangesToSector = positivePriceChanges.map{case(sector, (value, count)) => (value * count, sector)}
val sortedSectors = priceChangesToSector.transform(rdd => rdd.sortByKey(false)).map(_._2)
sortedSectors.foreach(rdd => {
println("""|----------------------------------------------
|Positive Trending (Time: %d ms)
|----------------------------------------------
|""".stripMargin.format(currentTimeMillis + rdd.take(5).map(sectorCodes(_)).mkString("\n"))
})
ssc.start()
}
}
override def main(args: Array[String]) {
import Listings._
val ssc = new StreamingContext("local", "datastream", Seconds(15), "C:/software/spark-0.7.3", List("target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
object stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
ssc.registerInputStream(stream)
stream.map(pu => listingNames(pu.id) + " - " + pu.lastPrice + " - " + pu.price).print()
ssc.start()
}
class PriceEcho extends PriceWebSocketClient {
createSocket(println)
}
import spark.streaming.dstream.NetworkReceiver
import spark.storage.StorageLevel
class PriceReceiver extends NetworkReceiver[PriceUpdate] with PriceWebSocketClient {
lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
protected override def onStart() {
blockGenerator.start
createSocket(m => {
val priceUpdate = PriceUpdate(m)
blockGenerator += priceUpdate
})
}
protected override def onStop() {
blockGenerator.stop
websocket.shutdown
}
}
import scala.util.parsing.json.JSON
import scala.collection.JavaConversions
import java.util.TreeMap
case class PriceUpdate(id: String, price: Double, lastPrice: Double)
object PriceUpdate {
// No native Scala TreeMap as yet, so I'll borrow Java's
val lastPrices = JavaConversions.asMap(new TreeMap[String,Double])
def apply(text: String): PriceUpdate = {
val (id, price) = getIdAndPriceFromJSON(text)
val lastPrice: Double = lastPrices.getOrElse(id, price)
lastPrices.put(id, price)
PriceUpdate(id, price, lastPrice)
}
def getIdAndPriceFromJSON(text: String) = // snip - simple JSON processing
}
import scalawebsocket.WebSocket
trait PriceWebSocketClient {
import Listings._
def createSocket(handleMessage: String => Unit) = {
websocket = WebSocket().open("ws://localhost:8080/1.0/marketDataWs").onTextMessage(m => {
handleMessage(m)
})
subscriptions.foreach(listing => websocket.sendText("{\"subscribe\":{" + listing + "}}"))
}
var websocket: WebSocket = _
}
val reduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 + pair._2)
else (reduced._1 + pair._1, reduced._2 - pair._2)
}
val invReduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 - pair._2)
else (reduced._1 + pair._1, reduced._2 + pair._2)
}
val windowedPriceChanges = sectorPriceChanges.reduceByKeyAndWindow(reduce, invReduce, Seconds(5*60), Seconds(15))
object stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
val ssc = new StreamingContext("local", "datastream", Seconds(15), "C:/software/spark-0.7.3", List("target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
// create InputDStream
ssc.registerInputStream(stream)
// interact with stream
ssc.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment