Skip to content

Instantly share code, notes, and snippets.

@jonnylaw
Created June 23, 2017 10:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jonnylaw/b74642f6421fdb1959ac600f67c4b378 to your computer and use it in GitHub Desktop.
Save jonnylaw/b74642f6421fdb1959ac600f67c4b378 to your computer and use it in GitHub Desktop.
scalaVersion := "2.12.1"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.17"
)
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.NotUsed
import akka.stream._
import akka.util.ByteString
import java.nio.file.Paths
import scala.concurrent.Future
object DescriptiveStats extends App {
case class PebbleData(
time: Double,
mainAvgCurrent: Double,
mainAvgPower: Double,
mainAvgVoltage: Double)
def readFile(file: String) = FileIO.fromPath(Paths.get(file)).
via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8192, allowTruncation = true)).
map(_.utf8String)
def marshall = Flow[String].
map(_.split(",")).
map(s => PebbleData(s.head.toDouble, s(1).toDouble, s(2).toDouble, s(3).toDouble))
def mapBatch[T](batchSize: Int)(f: Seq[PebbleData] => Future[T]) =
Flow[PebbleData].
grouped(batchSize).
mapAsync(4)(f)
def mean(xs: Seq[Double]) = xs.sum / xs.size
implicit val system = ActorSystem("Descriptivestats")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
readFile("170622_PebbleSteel_firstTestRun.csv").
drop(1).
via(marshall).
via(mapBatch(1000)((p: Seq[PebbleData]) =>
Future.successful(PebbleData(
p.head.time,
mean(p.map(_.mainAvgCurrent)),
mean(p.map(_.mainAvgPower)),
mean(p.map(_.mainAvgVoltage))
))
)).
runWith(Sink.seq).
onComplete { s =>
println(s)
system.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment