Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
val routes: Route =
logDuration {
get {
path("test") {
val s = Source.tick(0.seconds, 1.second, "x").take(5).map(ByteString(_))
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, s))
}
}
}
}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.RouteResult.Complete
import akka.http.scaladsl.server.{Directive0, RouteResult}
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
def timeRequest(request: HttpRequest): Try[RouteResult] => Unit = {
val start = System.currentTimeMillis()
{
case Success(Complete(resp)) =>
val d = System.currentTimeMillis() - start
logger.info(s"[${resp.status.intValue()}] ${request.method.name} " +
s"${request.uri} took: ${d}ms")
case Success(Rejected(_)) =>
case Failure(_) =>
import io.prometheus.client.{Gauge, Histogram}
def reportRequestMetrics(request: HttpRequest): Try[RouteResult] => Unit = {
requestsInProgress.inc()
val requestTimer = requestLatencySeconds.startTimer()
result => {
requestsInProgress.dec()
result match {
case Success(Complete(_)) => requestTimer.observeDuration()
KStreamBuilder builder = new KStreamBuilder();
builder.stream(keySerde, valueSerde, "my_entity_events")
.groupByKey(keySerde, valueSerde)
// the folding function: should return the new state
.reduce((currentState, event) -> ..., "my_entity_store");
.toStream(); // yields a stream of intermediate states
return builder;
streams
.store("my_entity_store", QueryableStoreTypes.keyValueStore());
.get(entityId);
metadataService
.streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
package test;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class Compare1 {
// implementations are omitted
class User {
String getProfileId() { return null; }
String getEmail() { return null; }
package test;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/*
* Let's say we want to fetch the user's profile from one API call,
* and the user's friends from another API call, in parallel.
*/
public class Compare2Synchronous {
package test;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
public class Compare2Wrappers {
// I/O operations: non-blocking, asynchronous
CompletableFuture<String> sendHttpGet(String url) { return null; }
// the business logic: asynchronous, as the type suggests