Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
package com.softwaremill.kmq;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;
@Path("/hello")
public class Hello {
@GET
@Produces(MediaType.TEXT_PLAIN)
public String helloWorld() {
return "Hello World";
}
}
class MyApplicationBootstrap {
public static void main(String[] args) {
// 1. create the object graph. Manually, by using "new". Like in the stone age.
Hello h = new Hello();
// 2. create a list of all endpoints our application will expose
List<Endpoint> endpoints = new ArrayList<Endpoint>();
endpoints.addAll(new HelloEndpoints().endpoints(h));
endpoints.addAll(...);
@adamw
adamw / 1.java
Created October 12, 2017 12:30
@Path("user/{userId}")
public String getUser(@PathParam("userId") userId) {
...
}
import com.softwaremill.sttp._
val sort: Option[String] = None
val query = "http language:scala"
// the `query` parameter is automatically url-encoded
// `sort` is removed, as the value is not defined
val request = sttp.get(
uri"https://api.github.com/search/repositories?q=$query&sort=$sort")
<K, V> void addToMultimap(ConcurrentHashMap<K, Set<V>> map, K key, V value) {
map.compute(key, (k, v) -> {
if (v == null) {
v = ConcurrentHashMap.newKeySet();
}
v.add(value);
return v;
});
}
val logDuration: Directive0 = extractRequestContext.flatMap { ctx =>
val start = System.currentTimeMillis()
mapResponse { resp =>
val took = System.currentTimeMillis() - start
logger.info(s"[${resp.status.intValue()}] ${ctx.request.method.name} " +
s"${ctx.request.uri} took: ${d}ms")
resp
}
}
val routes: Route =
logDuration {
get {
path("test") {
val s = Source.tick(0.seconds, 1.second, "x").take(5).map(ByteString(_))
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, s))
}
}
}
}
KStreamBuilder builder = new KStreamBuilder();
builder.stream(keySerde, valueSerde, "my_entity_events")
.groupByKey(keySerde, valueSerde)
// the folding function: should return the new state
.reduce((currentState, event) -> ..., "my_entity_store");
.toStream(); // yields a stream of intermediate states
return builder;
metadataService
.streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)