Skip to content

Instantly share code, notes, and snippets.

@lancearlaus
Last active September 22, 2015 15:19
Show Gist options
  • Save lancearlaus/e6e52fc8c7ca534cb026 to your computer and use it in GitHub Desktop.
Save lancearlaus/e6e52fc8c7ca534cb026 to your computer and use it in GitHub Desktop.
Akka user list examples
#!/usr/bin/env scalas
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas
// See http://www.scala-sbt.org/0.13/docs/Scripts.html
/***
scalaVersion := "2.11.6"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns)
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.+",
"com.typesafe.akka" %% "akka-agent" % "2.3.12",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0",
"com.typesafe.akka" %% "akka-http-experimental" % "1.0",
"org.scalatest" %% "scalatest" % "2.2.4"
)
*/
import com.typesafe.scalalogging.StrictLogging
import akka.actor.{ActorSystem, Cancellable}
import akka.agent.Agent
import akka.pattern.after
import akka.stream._
import akka.stream.io._
import akka.stream.stage._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.scaladsl._
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.util.ByteString
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalactic.Tolerance._
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.immutable._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Random, Try, Success, Failure}
import java.util.concurrent.atomic.AtomicBoolean
import java.io.ByteArrayOutputStream
import java.net.URL
case class Message[T](id: Long, body: T)
trait Queue {
def acknowledge(id: Long): Unit
}
type Handler[T] = Flow[Message[T], Try[_], _]
type AckSink = Sink[(Message[_], Try[_]), Future[_]]
def ackSink(queue: Queue) =
Sink.foreach[(Message[_], Try[_])] {
case (msg, result) => result match {
case Success(_) => queue.acknowledge(msg.id)
case Failure(t) => {
// Do something on failure
println(t)
}
}
}
def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) =
Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], Try[_]])((_, mat, _, _) => mat) {
implicit b => (handler, ackSink, bcast, zip) =>
bcast ~> zip.in0
bcast ~> handler ~> zip.in1
zip.out ~> ackSink
(bcast.in)
}
// Create/destroy an actor system for testing
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite =>
implicit var system: ActorSystem = _
implicit var materializer: Materializer = _
override def beforeAll = {
super.beforeAll
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_"))
materializer = ActorMaterializer()
}
override def afterAll = {
system.shutdown
super.afterAll
}
}
class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message $n"))
val testQueue = new Queue {
def acknowledge(id: Long) = println(s"acknowledging message $id")
}
val testHandler = Flow[Message[String]].map { msg =>
// Randomly fail
if (Random.nextBoolean) Failure(new Exception(s"failure processing message $msg"))
else Success(s"success processing message $msg")
}
"Acknowledge" should "ack messages" in {
val future = testSource(10).runWith(handleAndAckSink(testHandler, ackSink(testQueue)))
whenReady(future) { result =>
}
}
}
// Run the test case
run(new AckSpec)
#!/usr/bin/env scalas
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas
// See http://www.scala-sbt.org/0.13/docs/Scripts.html
/***
scalaVersion := "2.11.6"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns)
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.+",
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0",
"org.scalatest" %% "scalatest" % "2.2.4"
)
*/
import com.typesafe.scalalogging.StrictLogging
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.io._
import akka.stream.stage._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.scaladsl._
import akka.util.ByteString
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalactic.Tolerance._
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.immutable._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import java.io.ByteArrayOutputStream
// Creates an unbounded random alphanumeric source with a known seed (for repeatability)
def randomAlphaSource(seed: Int) = Source(() => new Random(seed).alphanumeric.iterator)
// Split input stream and write to a set of input-specific output streams
def splitWrite(in: Source[Char, Unit])(implicit m: Materializer): RunnableGraph[Future[Map[Char, ByteArrayOutputStream]]] = {
implicit val ec: ExecutionContext = m.executionContext
val outputMap = mutable.Map[Char, ByteArrayOutputStream]()
// Creates a new output stream sink for the given character
// Each stream is backed by a byte array output stream, but these could just as easily
// be file output streams to stream data to disk instead
def charSink(c: Char) = OutputStreamSink(() => outputMap.getOrElseUpdate(c, new ByteArrayOutputStream()))
// Split the input stream into a set of character-specific streams
// Note that that resulting flows must be run to ensure consumption (see groupBy docs)
val groupBy = in.groupBy(c => c).map {
case (c, source) => source.map(c => ByteString(c.toByte)).toMat(charSink(c))(Keep.right).run()
}
// Return a runnable graph that materializes into a future containing the output map
groupBy.toMat(Sink.ignore.mapMaterializedValue(_.map(_ => outputMap.toMap)))(Keep.right)
}
// Create/destroy an actor system for testing
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite =>
implicit var system: ActorSystem = _
implicit var materializer: Materializer = _
override def beforeAll = {
super.beforeAll
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_"))
materializer = ActorMaterializer()
}
override def afterAll = {
system.shutdown
super.afterAll
}
}
class SplitWriteSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
val seed = 42
"Split" should "handle random alphanumeric stream" in {
val size = 1000
val randoms = randomAlphaSource(seed).take(size)
val future = splitWrite(randoms).run
// Split takes more than 150 ms to complete
import org.scalatest.time.{Millis, Seconds, Span}
implicit def patienceConfig = PatienceConfig(timeout = Span(1, Seconds), interval = Span(50, Millis))
whenReady(future) { map =>
// println(s"map: $map")
var length = 0
map.foreach {
case (k, v) => {
val s = v.toString
// Each string should only contain the same character
s.filter(_ != k) should have length 0
length += s.length
}
}
// All characters should be accounted for
length shouldBe size
}
}
}
// Run the test case
run(new SplitWriteSpec)
#!/usr/bin/env scalas
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas
// See http://www.scala-sbt.org/0.13/docs/Scripts.html
/***
scalaVersion := "2.11.6"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns)
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.+",
"com.typesafe.akka" %% "akka-agent" % "2.3.12",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0",
"com.typesafe.akka" %% "akka-http-experimental" % "1.0",
"org.scalatest" %% "scalatest" % "2.2.4"
)
*/
import com.typesafe.scalalogging.StrictLogging
import akka.actor.{ActorSystem, Cancellable}
import akka.agent.Agent
import akka.pattern.after
import akka.stream._
import akka.stream.io._
import akka.stream.stage._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.scaladsl._
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.util.ByteString
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalactic.Tolerance._
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.immutable._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Random
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicBoolean
import java.io.ByteArrayOutputStream
import java.net.URL
// A simple implementation of the Akka Cancellable trait
class AtomicCancellable extends Cancellable {
private val cancelled = new AtomicBoolean()
override def cancel() = cancelled.compareAndSet(false, true)
override def isCancelled() = cancelled.get
}
object AtomicCancellable {
def apply() = new AtomicCancellable
}
abstract class Token(prefix: String, sequence: Long) {
val code: String = prefix + sequence
}
case class RefreshToken(sequence: Long) extends Token("R", sequence) {
def this() = this(0)
def next = this.copy(sequence = sequence + 1)
}
case class AccessToken private (sequence: Long, expiresIn: FiniteDuration) extends Token("A", sequence) {
def this(token: RefreshToken, expiresIn: FiniteDuration) = this(token.sequence, expiresIn)
}
case class RefreshResponse(access: AccessToken, refresh: Option[RefreshToken])
// The basic OAuth refresh token request/response flow (section 6 of the OAuth 2.0 spec)
// This flow is the fundamental source of access and refresh tokens
// and would be a real HTTP flow in a working example
def refreshRequest(endpoint: URL, clientId: String, clientSecret: String): Flow[RefreshToken, Future[RefreshResponse], Unit] = {
// val expiresIn = 10.minutes
// This would be the real flow that makes the request to the OAuth refresh endpoint
// using Http().singleRequest(...), for example
Flow[RefreshToken].map(refresh => Future.successful(RefreshResponse(new AccessToken(refresh, 10.minutes), Some(refresh.next))))
}
// Creates a sink that materializes an Agent for obtaining the current Access Token
// The sink accepts a single element, the initial Refresh Token, and
// initiates an internal flow that automatically requests new Access Tokens
// and updates the materialized Agent accordingly
// The paired Cancellable can be used to stop the automatic update process
// Callers should provide a valid persist sink, typically backed by a database
// or other persistent store, to save the latest refresh token value
// The lead argument is the lead time to request the next Access Token prior to
// the current token's expiration
def autoRefresh(
request: Flow[RefreshToken, Future[RefreshResponse], _],
persist: Sink[RefreshToken, _],
lead: FiniteDuration = 30.seconds
)(implicit materializer: Materializer, system: ActorSystem) : Sink[RefreshToken, (Agent[Future[AccessToken]], Cancellable)] = {
Sink.head[RefreshToken].mapMaterializedValue { futureInitial =>
implicit val executionContext = materializer.executionContext
val first = Promise[AccessToken]
val agent = Agent(first.future)
val cancellable = AtomicCancellable()
// Create the auto refresh flow that will run independently
// to periodically update the agent with the current Access Token
val auto = Sink(
Flow[(RefreshToken, Promise[AccessToken])],
Merge[(RefreshToken, Promise[AccessToken])](2),
request,
Unzip[RefreshToken, Promise[AccessToken]],
Zip[Future[RefreshResponse], Promise[AccessToken]],
Broadcast[(RefreshResponse, Promise[AccessToken])](3)
)((mat, _, _, _, _, _) => mat) {
implicit b => (initial, merge, request, unzip, zip, bcast) =>
// Detect and handle cancellation
// The splitWhen diverts the flow of elements upon cancellation,
// ending the refresh process and allowing us to handle cleanup
// so we don't leave a dangling, uncompleted promise
val cancel = b.add(Flow[(RefreshToken, Promise[AccessToken])]
.splitWhen(_ => cancellable.isCancelled)
.prefixAndTail(1).map {
case (prefix, tail) => {
tail.map(_.map {
case (_, promise) => promise.failure(new Exception("auto refresh cancelled"))
})
prefix.head
}
}.flatten(FlattenStrategy.concat))
// Complete current promise and create next promise
val promise = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])]
.map {
case (fresponse, cur) => {
cur.completeWith(fresponse.map(_.access))
val next = Promise[AccessToken]
// Update agent upon promise completion
// Note that this a side effect, hence the andThen
next.future.andThen { case _ => agent.send(next.future) }
(fresponse, next)
}
})
// Unwrap the completed response future
val response = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])]
.map { case (fr, p) => Source(fr.map(r => (r, p))) }
.flatten(FlattenStrategy.concat))
// Save the updated refresh token, if supplied
val save = b.add(Flow[(RefreshResponse, Promise[AccessToken])]
.collect { case (RefreshResponse(_, Some(refresh)), _) => refresh }
.to(persist))
// Send the next future to the agent upon expiration of current to
// prevent users of the agent from using expired tokens
// Uses the Akka after pattern to schedule the send
val expiration = b.add(Sink.foreach[(RefreshResponse, Promise[AccessToken])] {
case (RefreshResponse(access, _), promise) => {
after(access.expiresIn, system.scheduler)(Future {
if (!promise.isCompleted) {
agent.send(promise.future)
}
})
}
})
// Feed back the refresh token after delay to initiate the next access token refresh
// Uses the Akka after pattern to schedule the Future with configured lead time
// prior to expiration
val refresh = b.add(Flow[(RefreshResponse, Promise[AccessToken])]
.collect {
case (RefreshResponse(access, Some(refresh)), promise) => {
val delay = access.expiresIn.minus(lead)
val future = after(delay, system.scheduler)(Future.successful((refresh, promise)))
Source(future)
}
}.flatten(FlattenStrategy.concat))
// The rolling request flow, initiated with the initial refresh token
initial ~> merge ~> cancel ~> unzip.in
unzip.out0 ~> request ~> zip.in0
unzip.out1 ~> zip.in1
zip.out ~> promise ~> response ~> bcast ~> save
bcast ~> expiration
bcast ~> refresh
merge <~ refresh
initial.inlet
}
// Run the auto refresh flow
val initial = futureInitial.map(refresh => (refresh, first))
auto.runWith(Source(initial))
// Return the (Agent, Cancellable) pair
(agent, cancellable)
}
}
// Create an Access Token source that always retrieves the latest value from an agent
def accessTokenSource(agent: Agent[Future[AccessToken]]): Source[AccessToken, Unit] =
Source.repeat().map(_ => Source(agent())).flatten(FlattenStrategy.concat)
// Flow that decorates requests with the current access token
def addAccessToken(tokens: Source[AccessToken, Unit]): Flow[HttpRequest, HttpRequest, Unit] =
Flow(Flow[HttpRequest], tokens, Zip[HttpRequest, AccessToken])((mat, _, _) => mat) {
implicit b => (requests, tokens, zip) =>
val addHeader = b.add(Flow[(HttpRequest, AccessToken)].map {
case (request, access) => request.withHeaders(new Authorization(new OAuth2BearerToken(access.code)))
})
requests ~> zip.in0
tokens ~> zip.in1
zip.out ~> addHeader
(requests.inlet, addHeader.outlet)
}
// Create/destroy an actor system for testing
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite =>
implicit var system: ActorSystem = _
implicit var materializer: Materializer = _
override def beforeAll = {
super.beforeAll
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_"))
materializer = ActorMaterializer()
}
override def afterAll = {
system.shutdown
super.afterAll
}
}
class OAuthRefreshSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
def createMockResponse(refresh: RefreshToken, expiresIn: FiniteDuration) =
RefreshResponse(new AccessToken(refresh, expiresIn), Some(refresh.next))
"Auto refresh" should "generate access token" in {
val initialRefresh = new RefreshToken()
val expiresIn = 3.seconds
val mockRequestFlow = Flow[RefreshToken].map(r => Future.successful(createMockResponse(r, expiresIn)))
val persist = Sink.foreach[RefreshToken](r => println(s"Saving refresh token: $r"))
val autoRefreshSink = autoRefresh(mockRequestFlow, persist, 1.second)
val (agent, cancellable) = Source.single(initialRefresh).runWith(autoRefreshSink)
whenReady(agent()) { token =>
val expected = new AccessToken(initialRefresh, expiresIn)
token shouldBe expected
cancellable.cancel()
}
}
}
// Run the test case
run(new OAuthRefreshSpec)
#!/usr/bin/env scalas
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas
// See http://www.scala-sbt.org/0.13/docs/Scripts.html
/***
scalaVersion := "2.11.6"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns)
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.+",
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0",
"org.scalatest" %% "scalatest" % "2.2.4"
)
*/
import com.typesafe.scalalogging.StrictLogging
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.stage._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.scaladsl._
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalactic.Tolerance._
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.immutable._
import scala.util.Random
// Creates an unbounded source of random ints with a known seed (for repeatability)
def randomSource(seed: Int) = Source(() => {
val random = new Random(seed)
Iterator.continually(random.nextInt)
})
// Transform a source of integers into a normalized source of doubles where
// each element emitted is in the range of 0 to 1
// Note that the incoming source must be both finite and support multiple subscribers
def normalize(in: Source[Int, Unit]): Source[Double, Unit] = {
// Fold over the input source to create a new source that emits a single element
// which is the range of integers over the entire stream
val fold = in.fold((Int.MaxValue, Int.MinValue)) {
(range, n) => range match {
case (l, u) => (l.min(n), u.max(n))
}
}
// Transform the single element range source into an unbounded source
// that continually emits the same element
val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat)
// Create a stage that normalizes each value
val normalize = Flow[(Int, (Int, Int))].map {
case (n, (min, max)) if (min == max) => 1.0
case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble)
}
// Create the final source using a flow that combines the prior constructs
Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) {
implicit b => (in, range, zip, normalize) =>
in ~> zip.in0
range ~> zip.in1
zip.out ~> normalize
normalize.outlet
}
}
// Create/destroy an actor system for testing
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite =>
implicit var system: ActorSystem = _
implicit var materializer: Materializer = _
override def beforeAll = {
super.beforeAll
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_"))
materializer = ActorMaterializer()
}
override def afterAll = {
system.shutdown
super.afterAll
}
}
class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
val seed = 42
"Normalize" should "properly calculate for constant stream" in {
val value = 5
val size = 100
val expected = Seq.fill(size)(1.0)
val constants = Source.repeat(value).take(size)
val normalized = normalize(constants)
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))
whenReady(future) { result =>
//println(s"result: $result")
result should have size expected.size
result.zip(expected).foreach { case (actual, expected) =>
actual shouldBe expected
}
}
}
it should "properly calculate for random stream" in {
val size = 100
val randoms = randomSource(seed).take(size)
val normalized = normalize(randoms)
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))
whenReady(future) { result =>
//println(s"result: $result")
result should have size size
result should contain (0.0)
result should contain (1.0)
result.exists(_ < 0.0) shouldBe false
result.exists(_ > 1.0) shouldBe false
}
}
}
// Run the test case
run(new NormalizeSpec)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment