Created
May 16, 2014 06:21
-
-
Save daschl/8559d892ded3adbda9b8 to your computer and use it in GitHub Desktop.
Reactor Examples from JAX 2014
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.couchbase; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import reactor.core.Environment; | |
import reactor.core.Reactor; | |
import reactor.core.spec.Reactors; | |
import reactor.event.Event; | |
import reactor.function.Consumer; | |
import reactor.net.NetChannel; | |
import reactor.net.netty.tcp.NettyTcpClient; | |
import reactor.net.tcp.TcpClient; | |
import reactor.net.tcp.spec.TcpClientSpec; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.LockSupport; | |
import static reactor.event.selector.Selectors.T; | |
public class HighPerf { | |
static final Environment ENV = new Environment(); | |
private static final int PAYLOAD_SIZE = 27; | |
public static void main(String[] args) throws Exception { | |
TcpClient<ByteBuf, ByteBuf> client = new TcpClientSpec<ByteBuf, ByteBuf>(NettyTcpClient.class) | |
.env(ENV) | |
.connect("localhost", 11210) | |
.get(); | |
final NetChannel<ByteBuf, ByteBuf> conn1 = client.open().await(); | |
final NetChannel<ByteBuf, ByteBuf> conn2 = client.open().await(); | |
final NetChannel<ByteBuf, ByteBuf> conn3 = client.open().await(); | |
final NetChannel<ByteBuf, ByteBuf> conn4 = client.open().await(); | |
Reactor r = Reactors.reactor() | |
.env(new Environment()) | |
.dispatcher(Environment.RING_BUFFER) | |
.get(); | |
r.on(T(ByteBuf.class), new Node(conn1)); | |
r.on(T(ByteBuf.class), new Node(conn2)); | |
r.on(T(ByteBuf.class), new Node(conn3)); | |
r.on(T(ByteBuf.class), new Node(conn4)); | |
final ByteBuf sourceBuf = Unpooled.buffer(PAYLOAD_SIZE) | |
.writeByte(0x80) | |
.writeByte(0x09) | |
.writeBytes(new byte[]{0x00, 0x03}) | |
.writeByte(0x00) | |
.writeByte(0x00) | |
.writeBytes(new byte[]{0x00, 0x00}) | |
.writeBytes(new byte[]{0x00, 0x00, 0x00, 0x03}) | |
.writeBytes(new byte[]{0x00, 0x00, 0x00, 0x00}) | |
.writeBytes(new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}); | |
AtomicInteger i = new AtomicInteger(0); | |
while (i.get() < 100000000) { | |
r.notify(ByteBuf.class, (reactor.function.Supplier<Event<ByteBuf>>) () -> { | |
ByteBuf buf = sourceBuf.copy() | |
.writeBytes(new byte[]{0x65, 0x6F, (byte) (i.getAndIncrement() % 9)}); | |
return Event.wrap(buf); | |
}); | |
} | |
ENV.shutdown(); | |
} | |
private static class Node implements Consumer<Event<ByteBuf>> { | |
private final NetChannel<ByteBuf, ByteBuf> conn; | |
private final ByteBuf cache; | |
private final int BATCH_SIZE = 1452 * 20; | |
public Node(NetChannel<ByteBuf, ByteBuf> conn) { | |
this.conn = conn; | |
cache = Unpooled.buffer(BATCH_SIZE); | |
} | |
@Override | |
public void accept(Event<ByteBuf> ev) { | |
ByteBuf buf = ev.getData().copy(); | |
if (buf.readableBytes() > cache.writableBytes()) { | |
conn.sendAndForget(cache.copy()); | |
LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(300)); | |
cache.clear(); | |
} | |
cache.writeBytes(buf); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples.core; | |
import reactor.core.Environment; | |
import reactor.core.processor.Processor; | |
import reactor.core.processor.spec.ProcessorSpec; | |
import reactor.io.Buffer; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
public class ProcessorSamples { | |
static final Environment ENV = new Environment(); | |
static final int runs = 10000000; // 10M | |
public static void main(String... args) throws Exception { | |
CountDownLatch latch = new CountDownLatch(runs); | |
AtomicLong sum = new AtomicLong(); | |
Processor<Buffer> processor = new ProcessorSpec<Buffer>() | |
.singleThreadedProducer() | |
.dataBufferSize(1024 * 16) // 16k | |
.dataSupplier(() -> new Buffer(4, true)) // 4 => int | |
.consume(buff -> { | |
sum.addAndGet(buff.readInt()); // incrementing int here | |
buff.clear(); | |
latch.countDown(); | |
}) | |
.get(); | |
final AtomicInteger i = new AtomicInteger(0); | |
long start = System.nanoTime(); | |
while(i.get() < runs) { | |
//Operation<Buffer> op = processor.prepare(); | |
//op.get().append(i.getAndIncrement()).flip(); | |
//op.commit(); | |
processor.batch(512, buff -> buff.append(i.getAndIncrement()).flip()); | |
} | |
latch.await(5, TimeUnit.SECONDS); | |
double end = System.nanoTime(); | |
System.out.println("throughput: "+(long) (runs / ((end - start) / 1000000000))+" ops/sec"); | |
ENV.shutdown(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples.core; | |
import reactor.core.Environment; | |
import reactor.core.composable.Deferred; | |
import reactor.core.composable.Promise; | |
import reactor.core.composable.spec.Promises; | |
import reactor.function.support.Boundary; | |
public class PromiseSamples { | |
static final Environment ENV = new Environment(); | |
public static void main(String[] args) { | |
simplePromise(); | |
//doubleComplete(); | |
ENV.shutdown(); | |
} | |
public static void simplePromise() { | |
Boundary b = new Boundary(); | |
Deferred<String,Promise<String>> deferred = Promises.defer(ENV); | |
Promise<String> promise = deferred.compose(); | |
promise | |
.onComplete(p -> System.out.println("Completed Promise: " + p)) | |
.onSuccess(s -> System.out.println("Success with Value: " + s)) | |
.onError(t -> System.err.println("Got Throwable: " + t)); | |
deferred.accept("Hello World!"); | |
deferred.accept(new Throwable("Something went wrong :(")); | |
b.await(); | |
} | |
public static void doubleComplete() { | |
Boundary b = new Boundary(); | |
Deferred<String,Promise<String>> deferred = Promises.defer(ENV); | |
Promise<String> promise = deferred.compose(); | |
promise.onSuccess(s -> System.out.println("Success with Value: " + s)); | |
deferred.accept("Hello World!"); | |
deferred.accept("Hello DevFest!"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples.core; | |
import reactor.core.Environment; | |
import reactor.core.Reactor; | |
import reactor.core.spec.Reactors; | |
import reactor.event.Event; | |
import reactor.function.Consumer; | |
import reactor.function.Function; | |
import static reactor.event.selector.Selectors.*; | |
public class ReactorSamples { | |
static final Environment ENV = new Environment(); | |
public static void main(String[] args) { | |
//simpleReactor(); | |
replyReactor(); | |
ENV.shutdown(); | |
} | |
public static void simpleReactor() { | |
Reactor reactor = Reactors.reactor() | |
.env(ENV) | |
.dispatcher(Environment.RING_BUFFER) | |
.get(); | |
//reactor.<Event<String>>on($("test"), ev -> System.out.println("hi " + ev.getData())); | |
reactor.on($("test"), new Consumer<Event<String>>() { | |
@Override | |
public void accept(Event<String> ev) { | |
System.out.println("hi " + ev.getData()); | |
} | |
}); | |
reactor.notify("test", Event.wrap("DevFest")); | |
} | |
public static void replyReactor() { | |
Reactor reactor = Reactors.reactor() | |
.env(ENV) | |
.dispatcher(Environment.RING_BUFFER) | |
.get(); | |
reactor.receive(U("/weather/{city}"), new Function<Event<?>, Object>() { | |
@Override | |
public Object apply(Event<?> event) { | |
String city = event.getHeaders().get("city"); | |
if (city.equals("vienna")) { | |
return "Always raining"; | |
} else if (city.equals("barcelona")) { | |
return "Sunny!"; | |
} else { | |
throw new IllegalArgumentException("I don't know " + city); | |
} | |
} | |
}); | |
reactor.on($("weatherStatus"), new Consumer<Event<String>>() { | |
@Override | |
public void accept(Event<String> event) { | |
System.out.println("The requested weather is: " + event.getData()); | |
} | |
}); | |
reactor.on(T(Exception.class), new Consumer<Event<Exception>>() { | |
@Override | |
public void accept(Event<Exception> event) { | |
System.err.println("I can handle: " + event.getData().getMessage()); | |
} | |
}); | |
reactor.send("/weather/vienna", new Event<Void>((Void) null).setReplyTo("weatherStatus")); | |
reactor.send("/weather/barcelona", new Event<Void>((Void) null).setReplyTo("weatherStatus")); | |
reactor.send("/weather/munich", new Event<Void>((Void) null).setReplyTo("weatherStatus")); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.couchbase; | |
import com.fasterxml.jackson.annotation.JsonCreator; | |
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | |
import com.fasterxml.jackson.annotation.JsonProperty; | |
import java.util.List; | |
import java.util.Map; | |
/** | |
* Represents a list of pools. | |
*/ | |
@JsonIgnoreProperties(ignoreUnknown = true) | |
final class RestPools { | |
private final List<Map<String, String>> pools; | |
private final String version; | |
@JsonCreator | |
RestPools( | |
@JsonProperty("pools") List<Map<String, String>> pools, | |
@JsonProperty("implementationVersion") String version) { | |
this.pools = pools; | |
this.version = version; | |
} | |
List<Map<String, String>> getPools() { | |
return pools; | |
} | |
String getVersion() { | |
return version; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.couchbase; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.handler.codec.http.*; | |
import io.netty.util.CharsetUtil; | |
import reactor.core.Environment; | |
import reactor.core.composable.Deferred; | |
import reactor.core.composable.Stream; | |
import reactor.core.composable.spec.Streams; | |
import reactor.function.Consumer; | |
import reactor.function.Function; | |
import reactor.function.support.Boundary; | |
import reactor.net.NetChannel; | |
import reactor.net.netty.NettyClientSocketOptions; | |
import reactor.net.netty.tcp.NettyTcpClient; | |
import reactor.net.tcp.TcpClient; | |
import reactor.net.tcp.spec.TcpClientSpec; | |
import java.util.concurrent.atomic.AtomicReference; | |
public class RestWalk { | |
static final Environment ENV = new Environment(); | |
static final ObjectMapper mapper = new ObjectMapper(); | |
public static void main(String... args) { | |
Deferred<String,Stream<String>> deferred = Streams.defer(ENV, Environment.THREAD_POOL); | |
Stream<String> stream = deferred.compose(); | |
Boundary b = new Boundary(); | |
stream | |
.map(new ConnectFunction()) | |
.map(new Step1Function("/pools")) | |
.map(new Step2Function()) | |
.consume(b.bind(s -> System.out.println("Now go look at: " + s))); | |
deferred.accept("localhost"); | |
b.await(); | |
ENV.shutdown(); | |
} | |
private static class ConnectFunction implements Function<String, NetChannel<HttpResponse, HttpRequest>> { | |
@Override | |
public NetChannel<HttpResponse, HttpRequest> apply(String host) { | |
TcpClient<HttpResponse, HttpRequest> client = new TcpClientSpec<HttpResponse, HttpRequest>(NettyTcpClient.class) | |
.env(ENV) | |
.options(new NettyClientSocketOptions().pipelineConfigurer(new Consumer<ChannelPipeline>() { | |
@Override | |
public void accept(ChannelPipeline pipeline) { | |
pipeline.addLast(new HttpClientCodec()); | |
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); | |
} | |
})) | |
.connect(host, 8091) | |
.get(); | |
try { | |
return client.open().await(); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException("woops"); | |
} | |
} | |
} | |
private static class Step1Function implements Function<NetChannel<HttpResponse, HttpRequest>, String> { | |
private final String uri; | |
public Step1Function(String uri) { | |
this.uri = uri; | |
} | |
@Override | |
public String apply(NetChannel<HttpResponse, HttpRequest> conn) { | |
HttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); | |
final Boundary b = new Boundary(); | |
final AtomicReference<String> content = new AtomicReference(); | |
conn.sendAndReceive(req).onSuccess(b.bind(new Consumer<HttpResponse>() { | |
@Override | |
public void accept(HttpResponse resp) { | |
FullHttpResponse r = (FullHttpResponse) resp; | |
content.set(r.content().toString(CharsetUtil.UTF_8)); | |
} | |
})); | |
b.await(); | |
return content.get(); | |
} | |
} | |
private static class Step2Function implements Function<String, String> { | |
@Override | |
public String apply(String json) { | |
try { | |
RestPools result = mapper.readValue(json, RestPools.class); | |
return result.getPools().get(0).get("uri"); | |
} catch (Exception ex) { | |
throw new RuntimeException("Something went wrong while parsing"); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples.core; | |
import reactor.core.Environment; | |
import reactor.core.composable.Deferred; | |
import reactor.core.composable.Stream; | |
import reactor.core.composable.spec.Streams; | |
import reactor.function.Function; | |
import reactor.function.Predicate; | |
import reactor.function.support.Boundary; | |
public class StreamSamples { | |
static final Environment ENV = new Environment(); | |
public static void main(String[] args) { | |
//simpleStream(); | |
//transformValues(); | |
filterValues(); | |
ENV.shutdown(); | |
} | |
public static void simpleStream() { | |
Boundary b = new Boundary(); | |
Deferred<String,Stream<String>> deferred = Streams.defer(ENV); | |
Stream<String> stream = deferred.compose(); | |
stream.consume(b.<String>bind(s -> System.out.println("Got String: " + s))); | |
deferred.accept("Hello World!"); | |
deferred.accept("Hello DevFest!"); | |
b.await(); | |
} | |
public static void transformValues() { | |
Boundary b = new Boundary(); | |
Deferred<String,Stream<String>> deferred = Streams.defer(ENV); | |
Stream<String> stream = deferred.compose(); | |
stream | |
.map(new Function<String, String>() { | |
@Override | |
public String apply(String s) { | |
return s.toUpperCase(); | |
} | |
}) | |
.consume(b.bind(s -> System.out.println("Got String: " + s))); | |
deferred.accept("Hello World!"); | |
deferred.accept("Hello DevFest!"); | |
b.await(); | |
} | |
public static void filterValues() { | |
Boundary b = new Boundary(); | |
Deferred<String,Stream<String>> deferred = Streams.defer(ENV); | |
Stream<String> stream = deferred.compose(); | |
stream.filter(new Predicate<String>() { | |
@Override | |
public boolean test(String s) { | |
return s.contains("DevFest"); | |
} | |
}).consume(b.<String>bind(s -> System.out.println("Got String: " + s))); | |
deferred.accept("Hello World!"); | |
deferred.accept("Hello DevFest!"); | |
b.await(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment