Skip to content

Instantly share code, notes, and snippets.

View rocketpages's full-sized avatar

Kevin Webber rocketpages

View GitHub Profile

Keybase proof

I hereby claim:

  • I am rocketpages on github.
  • I am kvnwbbr (https://keybase.io/kvnwbbr) on keybase.
  • I have a public key whose fingerprint is 186C 1201 E03B 45FB ABB3 AEDA 3082 1DF1 9D92 7F5B

To claim this, I am signing this object:

def index = Action { implicit request =>
Ok(...)
}
def index = Action.async { implicit request =>
Future(Ok(...))
}
@rocketpages
rocketpages / AsyncShoppingCartAction.java
Created December 25, 2016 04:56
Async Shopping Cart Example
public CompletionStage<Result> index() {
Form<CheckoutForm> checkoutForm = formFactory.form(CheckoutForm.class);
CompletionStage<Cart> cartFuture = CompletableFuture.supplyAsync(() -> cartService.getCartForUser(), ec.current());
return cartFuture.thenApply(cart -> ok(index.render(cart, checkoutForm)));
}
Delays for carrier HA: 18 average mins, 18736 delayed flights
Delays for carrier DL: 27 average mins, 209018 delayed flights
Delays for carrier FL: 31 average mins, 117632 delayed flights
Delays for carrier 9E: 32 average mins, 90601 delayed flights
Delays for carrier OH: 34 average mins, 96154 delayed flights
Delays for carrier B6: 42 average mins, 83202 delayed flights
Delays for carrier EV: 35 average mins, 122751 delayed flights
Delays for carrier AQ: 12 average mins, 1908 delayed flights
Delays for carrier MQ: 35 average mins, 205765 delayed flights
Delays for carrier CO: 34 average mins, 141680 delayed flights
// @formatter:off
val g = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
// Source
val A: Outlet[String] = builder.add(Source.fromIterator(() => flightDelayLines)).out
// Flows
val B: FlowShape[String, FlightEvent] = builder.add(csvToFlightEvent)
@rocketpages
rocketpages / LinearFlow.scala
Last active February 16, 2021 07:01
Demonstrating a basic linear data flow using Akka Streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
// implicit actor system
implicit val system = ActorSystem("Sys")
// implicit actor materializer
implicit val materializer = ActorMaterializer()
// return the freshest element when there's capacity downstream, drop everything else
val droppyStream: Flow[Message, Message] =
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)
val decider: Supervision.Decider = exc => exc match {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
// ActorFlowMaterializer takes the list of transformations comprising a akka.stream.scaladsl.Flow
// and materializes them in the form of org.reactivestreams.Processor
implicit val mat = ActorFlowMaterializer(
ActorFlowMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("lowercaseStrings", "groupName", new StringDecoder())
val subscriber = kafka.publish("uppercaseStrings", "groupName", new StringEncoder())
// consume lowercase strings from kafka and publish them transformed to uppercase
Source(publisher).map(_.toUpperCase).to(Sink(subscriber)).run()