Skip to content

Instantly share code, notes, and snippets.

@samklr
Created June 27, 2017 19:19
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save samklr/219c9f5a7c4a3f4853808e5a6d2326b0 to your computer and use it in GitHub Desktop.
Save samklr/219c9f5a7c4a3f4853808e5a6d2326b0 to your computer and use it in GitHub Desktop.
Spark Streaming Websocket Receiver
val ssc = new StreamingContext("local", "datastream", Seconds(15))
// create InputDStream
ssc.registerInputStream(stream)
// interact with stream
ssc.start()
class PriceEcho extends PriceWebSocketClient {
createSocket(println)
}
import spark.streaming.dstream.NetworkReceiver
import spark.storage.StorageLevel
class PriceReceiver extends NetworkReceiver[PriceUpdate] with PriceWebSocketClient {
lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
protected override def onStart() {
blockGenerator.start
createSocket(m => {
val priceUpdate = PriceUpdate(m)
blockGenerator += priceUpdate
})
}
protected override def onStop() {
blockGenerator.stop
websocket.shutdown
}
}
import scala.util.parsing.json.JSON
import scala.collection.JavaConversions
import java.util.TreeMap
case class PriceUpdate(id: String, price: Double, lastPrice: Double)
object PriceUpdate {
// No native Scala TreeMap as yet, so I'll borrow Java's
val lastPrices = JavaConversions.asMap(new TreeMap[String,Double])
def apply(text: String): PriceUpdate = {
val (id, price) = getIdAndPriceFromJSON(text)
val lastPrice: Double = lastPrices.getOrElse(id, price)
lastPrices.put(id, price)
PriceUpdate(id, price, lastPrice)
}
def getIdAndPriceFromJSON(text: String) = // snip - simple JSON processing
}
import scalawebsocket.WebSocket
trait PriceWebSocketClient {
import Listings._
def createSocket(handleMessage: String => Unit) = {
websocket = WebSocket().open("ws://localhost:8080/1.0/marketDataWs").onTextMessage(m => {
handleMessage(m)
})
subscriptions.foreach(listing => websocket.sendText("{\"subscribe\":{" + listing + "}}"))
}
var websocket: WebSocket = _
}
object Stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment