Skip to content

Instantly share code, notes, and snippets.

@janakagamini
Last active January 15, 2016 11:22
Show Gist options
  • Save janakagamini/09b1195a43bc91a5a670 to your computer and use it in GitHub Desktop.
Save janakagamini/09b1195a43bc91a5a670 to your computer and use it in GitHub Desktop.
A simple websocket server to process incoming ecg readings and write to InfluxDB using akka reactive streams. Client to produce ecg data can be found at: https://github.com/janakagamini/ecg_ws_source
package com.example
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import com.paulgoldbaum.influxdbclient.Parameter.Precision
import com.paulgoldbaum.influxdbclient.{InfluxDB, Point}
import org.json4s._
import org.json4s.native.JsonMethods._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
case class Reading(timestamp: Long, lead1: Double, lead2: Double)
object WSRequest {
def unapply(req: HttpRequest): Option[HttpRequest] = {
if (req.header[UpgradeToWebsocket].isDefined) {
req.header[UpgradeToWebsocket] match {
case Some(upgrade) => Some(req)
case None => None
}
} else None
}
}
object ApplicationMain extends App {
implicit val actorSystem = ActorSystem()
implicit val flowMaterializer = ActorMaterializer()
val binding = Http().bindAndHandleSync({
case WSRequest(req@HttpRequest(GET, Uri.Path("/stream"), _, _, _)) => handleWith(req, dataFlow)
case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request")
}, interface = "localhost", port = 9003)
def handleWith(req: HttpRequest, flow: Flow[Message, Message, Unit]) = req.header[UpgradeToWebsocket].get.handleMessages(flow)
// binding is a future, we assume it's ready within a second or timeout
try {
Await.result(binding, 1 second)
println("Server online at http://localhost:9003")
} catch {
case exc: TimeoutException =>
println("Server took to long to startup, shutting down")
actorSystem.shutdown()
}
// Pipeline of the data
def dataFlow: Flow[Message, Message, Unit] = Flow[Message].map {
case TextMessage.Strict(msg) => msg
}
.via(parseJson)
.via(convertToPoint)
.via(collectPoints)
.via(writeToInflux)
// Add more processing blocks here?
.map {
// No need to return anything to the websocket
_ => TextMessage(Source.empty)
}
def parseJson = Flow[String].map {
s =>
implicit val formats = DefaultFormats
parse(s).extract[Reading]
}
// CHANGE HERE
val stream_id = "janakaOptiPlex9020"
def convertToPoint = Flow[Reading].map(p => Point("ecg", p.timestamp).addTag("stream_id", stream_id).addField("lead1", p.lead1).addField("lead2", p.lead2))
def collectPoints = Flow[Point].grouped(2000)
// CHANGE HERE
val influx_db = InfluxDB.connect("localhost", 8086)
val database = influx_db.selectDatabase("vsigns")
def writeToInflux = Flow[Seq[Point]].map {
points => {
val r = database.bulkWrite(points.toList, precision = Precision.MILLISECONDS)
r.onComplete {
case Success(_) => println("Successfully posted data")
case Failure(e) => println(e.getMessage)
}
}
}
}
name := "minimal-akka-scala-seed"
version := "1.0"
scalaVersion := "2.11.7"
resolvers += "Gamlor-Repo" at "https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots"
libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "2.0.1",
"com.typesafe.akka" % "akka-http-core-experimental_2.11" % "2.0.1",
"com.typesafe.akka" % "akka-http-experimental_2.11" % "2.0.1",
"org.json4s" %% "json4s-native" % "3.3.0",
"com.paulgoldbaum" %% "scala-influxdb-client" % "0.4.1"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment