Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Created June 1, 2015 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NiteshKant/58522e8ccba0fcec51a3 to your computer and use it in GitHub Desktop.
Save NiteshKant/58522e8ccba0fcec51a3 to your computer and use it in GitHub Desktop.
Sample for Drew
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.examples.tcp.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.reactivex.netty.examples.AbstractClientExample;
import io.reactivex.netty.protocol.tcp.client.ConnectionRequest;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* A TCP "Hello World" example. There are three ways of running this example:
*
* <h2>Default</h2>
*
* The default way is to just run this class with no arguments, which will start a server ({@link EchoServer}) on
* an ephemeral port and then send an HTTP request to that server and print the response.
*
* <h2>After starting {@link EchoServer}</h2>
*
* If you want to see how {@link EchoServer} work, you can run {@link EchoServer} by yourself and then pass
* the port on which the server started to this class as a program argument:
*
<PRE>
java io.reactivex.netty.examples.tcp.echo.EchoClient [server port]
</PRE>
*
* <h2>Existing HTTP server</h2>
*
* You can also use this client to send a GET request "/hello" to an existing HTTP server (different than
* {@link EchoServer}) by passing the port fo the existing server similar to the case above:
*
<PRE>
java io.reactivex.netty.examples.tcp.echo.EchoClient [server port]
</PRE>
*
* In all the above usages, this client will print the response received from the server.
*/
public final class EchoClient extends AbstractClientExample {
public static void main(String[] args) throws InterruptedException {
/*
* Retrieves the server port, using the following algorithm:
* <ul>
<li>If an argument is passed, then use the argument as the server port.</li>
<li>Otherwise, see if the server in the passed server class is already running. If so, use that port.</li>
<li>Otherwise, start the passed server class and use that port.</li>
</ul>
*/
int port = getServerPort(EchoServer.class, args);
TestScheduler testScheduler = Schedulers.test();
final RequestSender sender = new RequestSender(port, testScheduler);
int requestCount = 10;
final CountDownLatch latch = new CountDownLatch(requestCount);
for (int i = 0; i < requestCount; i++) {
sender.send(i)
.take(1)
.doOnTerminate(latch::countDown)
.forEach(System.out::println);
}
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
latch.await(1, TimeUnit.MINUTES);
}
private static class RequestSender {
private final ConcurrentLinkedQueue<RequestHolder> queuedRequests;
private final ConcurrentLinkedQueue<RequestHolder> sentRequests;
private final ConnectionRequest<ByteBuf, ByteBuf> connectionRequest;
private RequestSender(int port) {
this(port, Schedulers.computation());
}
private RequestSender(int port, Scheduler tickScheuler) {
queuedRequests = new ConcurrentLinkedQueue<>();
sentRequests = new ConcurrentLinkedQueue<>();
connectionRequest = TcpClient.<ByteBuf, ByteBuf>newClient("127.0.0.1", port)
.maxConnections(10)
.<ByteBuf, ByteBuf>addChannelHandlerLast("decoder",
() -> new LengthFieldBasedFrameDecoder(
1024, 0, 2, 0, 2))
.<ByteBuf, ByteBuf>addChannelHandlerLast("encoder",
() -> new LengthFieldPrepender(2))
.createConnectionRequest();
Observable.interval(1, TimeUnit.SECONDS, tickScheuler)
.flatMap(aTick ->
connectionRequest.flatMap(conn ->
conn.write(poll().map(holder -> {
sentRequests.add(holder);
return Unpooled.buffer().writeBytes(("Interval: " + holder.getRequest()).getBytes());
})).cast(ByteBuf.class)
.concatWith(conn.getInput())
.map(bb -> bb.toString(Charset.defaultCharset()))
.map(response -> {
RequestHolder holder = sentRequests.poll();
holder.pushResult(response);
return null;
})
))
.subscribe();
}
public Observable<String> send(int request) {
final RequestHolder newHolder = new RequestHolder(request);
queuedRequests.add(newHolder);
return newHolder.resultSubject;
}
private Observable<RequestHolder> poll() {
return Observable.create(subscriber -> {
// TODO: Backpressure.
RequestHolder nextRequest;
while (null != (nextRequest = queuedRequests.poll())) {
subscriber.onNext(nextRequest);
}
subscriber.onCompleted();
});
}
}
private static class RequestHolder {
private final int request;
private final PublishSubject<String> resultSubject;
private RequestHolder(int request) {
this.request = request;
resultSubject = PublishSubject.create();
}
public int getRequest() {
return request;
}
public void pushResult(String result) {
resultSubject.onNext(result);
resultSubject.onCompleted();
}
public void pushError(Throwable error) {
resultSubject.onError(error);
}
}
}
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.examples.tcp.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.examples.AbstractServerExample;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import rx.functions.Func1;
public final class EchoServer extends AbstractServerExample {
public static void main(final String[] args) {
TcpServer<ByteBuf, ByteBuf> server;
server = TcpServer.newServer()
.<ByteBuf, ByteBuf>addChannelHandlerLast("decoder",
() -> new LengthFieldBasedFrameDecoder(1024, 0, 2, 0,
2))
.<ByteBuf, ByteBuf>addChannelHandlerLast("encoder", () -> new LengthFieldPrepender(2))
.start(connection -> {
return connection.writeAndFlushOnEach(connection.getInput()
.map(new Converter(connection)));
});
if (shouldWaitForShutdown(args)) {
/*When testing the args are set, to avoid blocking till shutdown*/
server.awaitShutdown();
}
setServerPort(server.getServerPort());
}
private static class Converter implements Func1<ByteBuf, ByteBuf> {
private final Connection<ByteBuf, ByteBuf> connection;
public Converter(Connection<ByteBuf, ByteBuf> connection) {
this.connection = connection;
}
@Override
public ByteBuf call(ByteBuf bb) {
ByteBuf buffer = connection.unsafeNettyChannel().alloc().buffer();
bb.forEachByte(new ByteBufProcessor() {
@Override
public boolean process(byte value) throws Exception {
buffer.writeByte(value + 1);
return true;
}
});
bb.release();
return buffer;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment