Skip to content

Instantly share code, notes, and snippets.

@raphw
Created October 22, 2018 11:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raphw/e6eaf4262b3c016a12c6207d51765127 to your computer and use it in GitHub Desktop.
Save raphw/e6eaf4262b3c016a12c6207d51765127 to your computer and use it in GitHub Desktop.
A reactive stream processor
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>reactivetest</groupId>
<artifactId>reactivetest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
import static org.asynchttpclient.Dsl.asyncHttpClient;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import com.github.tomakehurst.wiremock.WireMockServer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class ReactiveMain {
private static final int BATCH = 20, LOWER_WAIT = 250, UPPER_WAIT = 500, HARD_WORK = 250;
public static void main(String[] args) throws Exception {
WireMockServer server = new WireMockServer(8089);
server.stubFor(post(urlMatching("/part")).willReturn(aResponse()
.withUniformRandomDelay(LOWER_WAIT, UPPER_WAIT).withStatus(200).withBody("partsinfo")));
server.stubFor(post(urlMatching("/boks")).willReturn(aResponse()
.withUniformRandomDelay(LOWER_WAIT, UPPER_WAIT).withStatus(200).withBody("skatteinfo")));
server.stubFor(get(urlMatching("/next")).willReturn(aResponse()
.withStatus(200).withBody("0")));
server.start();
try (AsyncHttpClient httpClient = asyncHttpClient()) {
Flux
//.from(new AsyncPartGenerator(httpClient))
.<List<Long>, AtomicLong>generate(AtomicLong::new, (count, sink) -> {
long batch = count.getAndIncrement();
System.out.println("Generate: Batch " + batch + " on " + Thread.currentThread());
sink.next(LongStream.rangeClosed(1, BATCH)
.map(offset -> offset + batch * BATCH)
.boxed()
.collect(Collectors.toList()));
return count;
})
.flatMap(Flux::fromIterable)
.bufferTimeout(5, Duration.ofMillis(50))
.flatMap(parter -> Mono.fromCompletionStage(httpClient
.executeRequest(new RequestBuilder()
.setMethod("POST")
.setUrl("http://localhost:8089/part"))
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenCombine(
httpClient
.executeRequest(new RequestBuilder()
.setMethod("POST")
.setUrl("http://localhost:8089/boks"))
.toCompletableFuture()
.thenApply(Response::getResponseBody),
(partinfo, skatteinfo) -> parter.stream()
.map(part -> part + ": " + partinfo + ", " + skatteinfo)
.collect(Collectors.toList())
)
).flatMapIterable(Function.identity()))
.parallel(8, 100)
.runOn(Schedulers.newParallel("my-processor", 8), 100)
.subscribe(ReactiveMain::simulateHardWork);
Thread.sleep(Long.MAX_VALUE);
} finally {
server.stop();
}
}
private static void simulateHardWork(String part) {
try {
Thread.sleep(HARD_WORK);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Processing: " + part + " on " + Thread.currentThread());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment