Skip to content

Instantly share code, notes, and snippets.

@mojavelinux
Last active August 29, 2015 14:07
Show Gist options
  • Select an option

  • Save mojavelinux/3078fbc1e02435ecb3b8 to your computer and use it in GitHub Desktop.

Select an option

Save mojavelinux/3078fbc1e02435ecb3b8 to your computer and use it in GitHub Desktop.
Building Reactive Apps by James Ward

Building Reactive Apps

Users Want

  • In-Sync Data

  • Real-time Collaboration

  • Instant Feedback

  • To Not Wait

Users Want Reactive Apps

Going Reactive

  • Reactive Web

  • Reactive Actors

  • Reactive Streams

Reactive Web

Async + Non-blocking

  • Reactive Requests

  • Reactive Composition

  • Reactive Push

  • 2-Way Reactive

Reactive Requests

reactive requests

Scala Code Examples

!

Blocking Request
def foo = Action {
  Ok("foo")
}
Async Request
def foo = Action.async {
  Future.successful(Ok("foo"))
}

!

Reactive Request (Async + Non-Blocking)
def pause(duration: Int) = Action.async {
  Promise.timeout(Ok(duration.toString), duration seconds)
}
Reactive WS Client
val f: Future[Response] = WS.url("http://www.foo.com").get

Reactive Composition

def foo = Action.async {
  val futureResponse = WS.url("http://www.foo.com").get
  futureResponse.map { response =>
    Ok(response.body)
  }
}

Reactive Composition (Alt)

def foo = Action.async {
  val futureJW = WS.url("http://www.jamesward.com").get
  val futureTwitter = WS.url("http://www.twitter.com").get
  for {
    jw <- futureJW
    twitter <- futureTwitter
  } yield Ok(jw.response.body + twitter.response.body)
}

Reactive Push with SSE

Server-side
def events = Action {
  val (enumerator, channel) = Concurrent.broadcast[String]
  Promise.timeout(channel.push("hello"), 1.second)
  Ok.feed(enumerator &> EventSource()).as(EVENT_STREAM)
}
Client-side
$ ->
  events = new EventSource("/events")
  events.onmessage = (e) ->
    console.log(e.data)

2-Way Reactive with WebSockets

Server-side
def echoWs = WebSocket.using[String] { request =>
 val (enumerator, channel) = Concurrent.broadcast[String]
 val in = Iteratee.foreach[String](channel.push)
 (in, enumerator)
}
Client-side
$ ->
  ws = new WebSocket("ws://localhost:9000/echo")
  ws.onopen = () ->
    ws.send("foo")
  ws.onmessage = (message) ->
    console.log(message.data)

Resilient Futures

future.recover {
  case Exception =>
    Logger.error("Failed!")
    InternalServerError("Boum!")
}

Java Code Examples

!

Blocking Request
public static Result index() {
  return ok(views.html.index.render("hello"));
}
Async Request
public static F.Promise<Result> index() {
  return F.Promise.promise(new F.Function0<Result>() {
    public Result apply() {
      return ok(views.html.index.render("hello"));
    }
  });
}

Reactive Request (Async + Non-Blocking)

public static F.Promise<Result> index() {
  return F.Promise.delayed(new F.Function0<Result>() {
    public Result apply() throws Throwable {
      return ok(views.html.index.render("hello"));
    }
  }, 5, TimeUnit.SECONDS);
}

Reactive Requests

public static F.Promise<Result> index() {
  F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
  return jw.map(new F.Function<WS.Response, Result>() {
    public Result apply(WS.Response response) throws Throwable {
      return ok(response.getBody());
    }
  });
}

Reactive Composition

public static F.Promise<Result> index() {
  final F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
  final F.Promise<WS.Response> tw = WS.url("http://www.twitter.com").get();
  return jw.flatMap(new F.Function<WS.Response, F.Promise<Result>>() {
    public F.Promise<Result> apply(final WS.Response jwR) throws Throwable {
      return tw.map(new F.Function<WS.Response, Result>() {
        public Result apply(WS.Response twR) throws Throwable {
          return ok(twR.getBody() + jwR.getBody());
        }
      });
    }
  });
}

Reactive Push with SSE

public static Result events() {
  EventSource eventSource = new EventSource() {
    public void onConnected() {
      sendData("hello");
    }
  };
  return ok(eventSource);
}

$ ->
  events = new EventSource("/events")
    events.onmessage = (e) ->
      console.log(e.data)

2-Way Reactive with WebSockets

public static WebSocket<String> echo() {
  return new WebSocket<String>() {
    public void onReady(final In<String> in, final Out<String> out) {
      in.onMessage(new F.Callback<String>() {
        public void invoke(String message) throws Throwable {
          out.write(message);
        }
      });
    }
  };
}

$ ->
  ws = new WebSocket("ws://localhost:9000/echo")
  ws.onopen = () ->
    ws.send("foo")
  ws.onmessage = (message) ->
    console.log(message.data)

Java 8 Code Examples

!

Blocking Request
public static Result index() {
  return ok("foo");
}
Async Request
public static F.Promise<Result> foo() {
  return F.Promise.promise(() -> ok("foo"));
}

Reactive Request (Async + Non-Blocking)

public static F.Promise<Result> foo() {
  return F.Promise.delayed(() -> ok("foo"), 5, TimeUnit.SECONDS);
}

Reactive Requests

public static F.Promise<Result> foo() {
  F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
  return jw.map(response -> ok(response.getBody()));
}

Reactive Composition

public static F.Promise<Result> foo() {
  F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
  F.Promise<WS.Response> tw = WS.url("http://www.twitter.com").get();
  return jw.flatMap(jwr ->
    tw.map(twr ->
      ok(twr.getBody() + jwr.getBody())));
}

Reactive Push with SSE

public static Result events() {
  EventSource eventSource = new EventSource() {
    public void onConnected() {
      sendData("hello");
    }
  };
  return ok(eventSource);
}

$ ->
  events = new EventSource("/events")
    events.onmessage = (e) ->
      console.log(e.data)

2-Way Reactive with WebSockets

public static WebSocket<String> echo() {
  return new WebSocket<String>() {
    public void onReady(final In<String> in, final Out<String> out) {
      in.onMessage(out::write);
    }
  };
}

$ ->
  ws = new WebSocket("ws://localhost:9000/echo")
  ws.onopen = () ->
    ws.send("foo")
  ws.onmessage = (message) ->
    console.log(message.data)

Reactive Actors

Event-Driven Workers

akka actors
  • Managed Concurrency

  • Isolated Failure Handling (Supervision)

  • Scales Up & Out

Actors & Requests

public class FooActor extends UntypedActor {
  @Override
  public void onReceive(Object message) throws Exception {
    F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
    Patterns.pipe(jw.wrapped(), getContext().dispatcher()).to(getSender());
  }
}

public static F.Promise<Result> foo() {
  ActorRef fooActor = Akka.system().actorOf(Props.create(FooActor.class));
  F.Promise<Object> p = F.Promise.wrap(Patterns.ask(fooActor, "foo", 5000));
  return p.map(r -> ok(((WS.Response) r).getBody()));
}

Reactive Streams

A JVM standard for asynchronous stream processing with non-blocking back pressure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment