Skip to content

Instantly share code, notes, and snippets.

@lancearlaus
Last active August 29, 2015 14:26
Show Gist options
  • Save lancearlaus/b43b7acb8a3aada51701 to your computer and use it in GitHub Desktop.
Save lancearlaus/b43b7acb8a3aada51701 to your computer and use it in GitHub Desktop.
#!/usr/bin/env scalas
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas
// See http://www.scala-sbt.org/0.13/docs/Scripts.html
/***
scalaVersion := "2.11.6"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns)
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.+",
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0",
"org.scalatest" %% "scalatest" % "2.2.4"
)
*/
import com.typesafe.scalalogging.StrictLogging
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.stage._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.scaladsl._
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalactic.Tolerance._
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.immutable._
import scala.util.Random
// Creates an unbounded source of random ints with a known seed (for repeatability)
def randomSource(seed: Int) = Source(() => {
val random = new Random(seed)
Iterator.continually(random.nextInt)
})
// Transform a source of integers into a normalized source of doubles where
// each element emitted is in the range of 0 to 1
// Note that the incoming source must be both finite and support multiple subscribers
def normalize(in: Source[Int, Unit]): Source[Double, Unit] = {
// Fold over the input source to create a new source that emits a single element
// which is the range of integers over the entire stream
val fold = in.fold((Int.MaxValue, Int.MinValue)) {
(range, n) => range match {
case (l, u) => (l.min(n), u.max(n))
}
}
// Transform the single element range source into an unbounded source
// that continually emits the same element
val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat)
// Create a stage that normalizes each value
val normalize = Flow[(Int, (Int, Int))].map {
case (n, (min, max)) if (min == max) => 1.0
case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble)
}
// Create the final source using a flow that combines the prior constructs
Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) {
implicit b => (in, range, zip, normalize) =>
in ~> zip.in0
range ~> zip.in1
zip.out ~> normalize
normalize.outlet
}
}
// Create/destroy an actor system for testing
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite =>
implicit var system: ActorSystem = _
implicit var materializer: Materializer = _
override def beforeAll = {
super.beforeAll
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_"))
materializer = ActorMaterializer()
}
override def afterAll = {
system.shutdown
super.afterAll
}
}
class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
val seed = 42
"Normalize" should "properly calculate for constant stream" in {
val value = 5
val size = 100
val expected = Seq.fill(size)(1.0)
val constants = Source.repeat(value).take(size)
val normalized = normalize(constants)
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))
whenReady(future) { result =>
//println(s"result: $result")
result should have size expected.size
result.zip(expected).foreach { case (actual, expected) =>
actual shouldBe expected
}
}
}
it should "properly calculate for random stream" in {
val size = 100
val randoms = randomSource(seed).take(size)
val normalized = normalize(randoms)
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))
whenReady(future) { result =>
//println(s"result: $result")
result should have size size
result should contain (0.0)
result should contain (1.0)
result.exists(_ < 0.0) shouldBe false
result.exists(_ > 1.0) shouldBe false
}
}
}
// Run the test case
run(new NormalizeSpec)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment