Skip to content

Instantly share code, notes, and snippets.

@daschl
Created May 16, 2014 06:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save daschl/8559d892ded3adbda9b8 to your computer and use it in GitHub Desktop.
Save daschl/8559d892ded3adbda9b8 to your computer and use it in GitHub Desktop.
Reactor Examples from JAX 2014
package example.couchbase;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.function.Consumer;
import reactor.net.NetChannel;
import reactor.net.netty.tcp.NettyTcpClient;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.spec.TcpClientSpec;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import static reactor.event.selector.Selectors.T;
public class HighPerf {
static final Environment ENV = new Environment();
private static final int PAYLOAD_SIZE = 27;
public static void main(String[] args) throws Exception {
TcpClient<ByteBuf, ByteBuf> client = new TcpClientSpec<ByteBuf, ByteBuf>(NettyTcpClient.class)
.env(ENV)
.connect("localhost", 11210)
.get();
final NetChannel<ByteBuf, ByteBuf> conn1 = client.open().await();
final NetChannel<ByteBuf, ByteBuf> conn2 = client.open().await();
final NetChannel<ByteBuf, ByteBuf> conn3 = client.open().await();
final NetChannel<ByteBuf, ByteBuf> conn4 = client.open().await();
Reactor r = Reactors.reactor()
.env(new Environment())
.dispatcher(Environment.RING_BUFFER)
.get();
r.on(T(ByteBuf.class), new Node(conn1));
r.on(T(ByteBuf.class), new Node(conn2));
r.on(T(ByteBuf.class), new Node(conn3));
r.on(T(ByteBuf.class), new Node(conn4));
final ByteBuf sourceBuf = Unpooled.buffer(PAYLOAD_SIZE)
.writeByte(0x80)
.writeByte(0x09)
.writeBytes(new byte[]{0x00, 0x03})
.writeByte(0x00)
.writeByte(0x00)
.writeBytes(new byte[]{0x00, 0x00})
.writeBytes(new byte[]{0x00, 0x00, 0x00, 0x03})
.writeBytes(new byte[]{0x00, 0x00, 0x00, 0x00})
.writeBytes(new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00});
AtomicInteger i = new AtomicInteger(0);
while (i.get() < 100000000) {
r.notify(ByteBuf.class, (reactor.function.Supplier<Event<ByteBuf>>) () -> {
ByteBuf buf = sourceBuf.copy()
.writeBytes(new byte[]{0x65, 0x6F, (byte) (i.getAndIncrement() % 9)});
return Event.wrap(buf);
});
}
ENV.shutdown();
}
private static class Node implements Consumer<Event<ByteBuf>> {
private final NetChannel<ByteBuf, ByteBuf> conn;
private final ByteBuf cache;
private final int BATCH_SIZE = 1452 * 20;
public Node(NetChannel<ByteBuf, ByteBuf> conn) {
this.conn = conn;
cache = Unpooled.buffer(BATCH_SIZE);
}
@Override
public void accept(Event<ByteBuf> ev) {
ByteBuf buf = ev.getData().copy();
if (buf.readableBytes() > cache.writableBytes()) {
conn.sendAndForget(cache.copy());
LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(300));
cache.clear();
}
cache.writeBytes(buf);
}
}
}
package examples.core;
import reactor.core.Environment;
import reactor.core.processor.Processor;
import reactor.core.processor.spec.ProcessorSpec;
import reactor.io.Buffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ProcessorSamples {
static final Environment ENV = new Environment();
static final int runs = 10000000; // 10M
public static void main(String... args) throws Exception {
CountDownLatch latch = new CountDownLatch(runs);
AtomicLong sum = new AtomicLong();
Processor<Buffer> processor = new ProcessorSpec<Buffer>()
.singleThreadedProducer()
.dataBufferSize(1024 * 16) // 16k
.dataSupplier(() -> new Buffer(4, true)) // 4 => int
.consume(buff -> {
sum.addAndGet(buff.readInt()); // incrementing int here
buff.clear();
latch.countDown();
})
.get();
final AtomicInteger i = new AtomicInteger(0);
long start = System.nanoTime();
while(i.get() < runs) {
//Operation<Buffer> op = processor.prepare();
//op.get().append(i.getAndIncrement()).flip();
//op.commit();
processor.batch(512, buff -> buff.append(i.getAndIncrement()).flip());
}
latch.await(5, TimeUnit.SECONDS);
double end = System.nanoTime();
System.out.println("throughput: "+(long) (runs / ((end - start) / 1000000000))+" ops/sec");
ENV.shutdown();
}
}
package examples.core;
import reactor.core.Environment;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.function.support.Boundary;
public class PromiseSamples {
static final Environment ENV = new Environment();
public static void main(String[] args) {
simplePromise();
//doubleComplete();
ENV.shutdown();
}
public static void simplePromise() {
Boundary b = new Boundary();
Deferred<String,Promise<String>> deferred = Promises.defer(ENV);
Promise<String> promise = deferred.compose();
promise
.onComplete(p -> System.out.println("Completed Promise: " + p))
.onSuccess(s -> System.out.println("Success with Value: " + s))
.onError(t -> System.err.println("Got Throwable: " + t));
deferred.accept("Hello World!");
deferred.accept(new Throwable("Something went wrong :("));
b.await();
}
public static void doubleComplete() {
Boundary b = new Boundary();
Deferred<String,Promise<String>> deferred = Promises.defer(ENV);
Promise<String> promise = deferred.compose();
promise.onSuccess(s -> System.out.println("Success with Value: " + s));
deferred.accept("Hello World!");
deferred.accept("Hello DevFest!");
}
}
package examples.core;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.function.Consumer;
import reactor.function.Function;
import static reactor.event.selector.Selectors.*;
public class ReactorSamples {
static final Environment ENV = new Environment();
public static void main(String[] args) {
//simpleReactor();
replyReactor();
ENV.shutdown();
}
public static void simpleReactor() {
Reactor reactor = Reactors.reactor()
.env(ENV)
.dispatcher(Environment.RING_BUFFER)
.get();
//reactor.<Event<String>>on($("test"), ev -> System.out.println("hi " + ev.getData()));
reactor.on($("test"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> ev) {
System.out.println("hi " + ev.getData());
}
});
reactor.notify("test", Event.wrap("DevFest"));
}
public static void replyReactor() {
Reactor reactor = Reactors.reactor()
.env(ENV)
.dispatcher(Environment.RING_BUFFER)
.get();
reactor.receive(U("/weather/{city}"), new Function<Event<?>, Object>() {
@Override
public Object apply(Event<?> event) {
String city = event.getHeaders().get("city");
if (city.equals("vienna")) {
return "Always raining";
} else if (city.equals("barcelona")) {
return "Sunny!";
} else {
throw new IllegalArgumentException("I don't know " + city);
}
}
});
reactor.on($("weatherStatus"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> event) {
System.out.println("The requested weather is: " + event.getData());
}
});
reactor.on(T(Exception.class), new Consumer<Event<Exception>>() {
@Override
public void accept(Event<Exception> event) {
System.err.println("I can handle: " + event.getData().getMessage());
}
});
reactor.send("/weather/vienna", new Event<Void>((Void) null).setReplyTo("weatherStatus"));
reactor.send("/weather/barcelona", new Event<Void>((Void) null).setReplyTo("weatherStatus"));
reactor.send("/weather/munich", new Event<Void>((Void) null).setReplyTo("weatherStatus"));
}
}
package example.couchbase;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
/**
* Represents a list of pools.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
final class RestPools {
private final List<Map<String, String>> pools;
private final String version;
@JsonCreator
RestPools(
@JsonProperty("pools") List<Map<String, String>> pools,
@JsonProperty("implementationVersion") String version) {
this.pools = pools;
this.version = version;
}
List<Map<String, String>> getPools() {
return pools;
}
String getVersion() {
return version;
}
}
package example.couchbase;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import reactor.core.Environment;
import reactor.core.composable.Deferred;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Streams;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.support.Boundary;
import reactor.net.NetChannel;
import reactor.net.netty.NettyClientSocketOptions;
import reactor.net.netty.tcp.NettyTcpClient;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.spec.TcpClientSpec;
import java.util.concurrent.atomic.AtomicReference;
public class RestWalk {
static final Environment ENV = new Environment();
static final ObjectMapper mapper = new ObjectMapper();
public static void main(String... args) {
Deferred<String,Stream<String>> deferred = Streams.defer(ENV, Environment.THREAD_POOL);
Stream<String> stream = deferred.compose();
Boundary b = new Boundary();
stream
.map(new ConnectFunction())
.map(new Step1Function("/pools"))
.map(new Step2Function())
.consume(b.bind(s -> System.out.println("Now go look at: " + s)));
deferred.accept("localhost");
b.await();
ENV.shutdown();
}
private static class ConnectFunction implements Function<String, NetChannel<HttpResponse, HttpRequest>> {
@Override
public NetChannel<HttpResponse, HttpRequest> apply(String host) {
TcpClient<HttpResponse, HttpRequest> client = new TcpClientSpec<HttpResponse, HttpRequest>(NettyTcpClient.class)
.env(ENV)
.options(new NettyClientSocketOptions().pipelineConfigurer(new Consumer<ChannelPipeline>() {
@Override
public void accept(ChannelPipeline pipeline) {
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}))
.connect(host, 8091)
.get();
try {
return client.open().await();
} catch (InterruptedException e) {
throw new IllegalStateException("woops");
}
}
}
private static class Step1Function implements Function<NetChannel<HttpResponse, HttpRequest>, String> {
private final String uri;
public Step1Function(String uri) {
this.uri = uri;
}
@Override
public String apply(NetChannel<HttpResponse, HttpRequest> conn) {
HttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
final Boundary b = new Boundary();
final AtomicReference<String> content = new AtomicReference();
conn.sendAndReceive(req).onSuccess(b.bind(new Consumer<HttpResponse>() {
@Override
public void accept(HttpResponse resp) {
FullHttpResponse r = (FullHttpResponse) resp;
content.set(r.content().toString(CharsetUtil.UTF_8));
}
}));
b.await();
return content.get();
}
}
private static class Step2Function implements Function<String, String> {
@Override
public String apply(String json) {
try {
RestPools result = mapper.readValue(json, RestPools.class);
return result.getPools().get(0).get("uri");
} catch (Exception ex) {
throw new RuntimeException("Something went wrong while parsing");
}
}
}
}
package examples.core;
import reactor.core.Environment;
import reactor.core.composable.Deferred;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Streams;
import reactor.function.Function;
import reactor.function.Predicate;
import reactor.function.support.Boundary;
public class StreamSamples {
static final Environment ENV = new Environment();
public static void main(String[] args) {
//simpleStream();
//transformValues();
filterValues();
ENV.shutdown();
}
public static void simpleStream() {
Boundary b = new Boundary();
Deferred<String,Stream<String>> deferred = Streams.defer(ENV);
Stream<String> stream = deferred.compose();
stream.consume(b.<String>bind(s -> System.out.println("Got String: " + s)));
deferred.accept("Hello World!");
deferred.accept("Hello DevFest!");
b.await();
}
public static void transformValues() {
Boundary b = new Boundary();
Deferred<String,Stream<String>> deferred = Streams.defer(ENV);
Stream<String> stream = deferred.compose();
stream
.map(new Function<String, String>() {
@Override
public String apply(String s) {
return s.toUpperCase();
}
})
.consume(b.bind(s -> System.out.println("Got String: " + s)));
deferred.accept("Hello World!");
deferred.accept("Hello DevFest!");
b.await();
}
public static void filterValues() {
Boundary b = new Boundary();
Deferred<String,Stream<String>> deferred = Streams.defer(ENV);
Stream<String> stream = deferred.compose();
stream.filter(new Predicate<String>() {
@Override
public boolean test(String s) {
return s.contains("DevFest");
}
}).consume(b.<String>bind(s -> System.out.println("Got String: " + s)));
deferred.accept("Hello World!");
deferred.accept("Hello DevFest!");
b.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment