Skip to content

Instantly share code, notes, and snippets.

@elandau
Created March 14, 2015 06:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save elandau/2a651e9d15d7d3c86178 to your computer and use it in GitHub Desktop.
Save elandau/2a651e9d15d7d3c86178 to your computer and use it in GitHub Desktop.
package io.reactivex.netty.examples.tcp.loadbalancer;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.string.StringEncoder;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import io.reactivex.netty.protocol.tcp.server.TcpServerImpl;
import io.reactivex.netty.protocol.text.StringLineDecoder;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Transformer;
import rx.Subscriber;
import rx.functions.Func1;
public class LoadBalancerTest {
private final List<TcpServer<String, String>> servers = new ArrayList<TcpServer<String, String>>();
LoadBalancerTest(int nServers) {
for (int i = 0; i < nServers; i++) {
servers.add(startServer());
}
}
TcpServer<String, String> startServer() {
return new TcpServerImpl<ByteBuf, ByteBuf>(0)
.<ByteBuf, String>addChannelHandlerLast("encoder", StringEncoder::new)
.<String, String>addChannelHandlerLast("decoder", StringLineDecoder::new)
.start(connection -> {
System.out.println("Server connected");
return connection.writeAndFlushOnEach(
connection
.getInput()
.map(msg -> {
System.out.println("Got input : " + msg);
return "echo => " + msg + '\n';
}));
});
}
public static class Server {
private String host;
private int port;
Server(String host, int port) {
this.host = host;
this.port = port;
}
}
public Observable<Server> getServers() {
return Observable
.from(servers)
.map(server -> new Server("127.0.0.1", server.getServerPort()));
}
/**
* Create a client with configured pipeline from a server address.
* This is called once for new server that is identified
* @return
*/
public Func1<Server, TcpClient<String, String>> clientFactory() {
return new Func1<Server, TcpClient<String, String>>() {
@Override
public TcpClient<String, String> call(Server server) {
return TcpClient.newClient(server.host, server.port)
.<String, ByteBuf>addChannelHandlerLast("encoder", StringEncoder::new)
.<String, String>addChannelHandlerLast("decoder", StringLineDecoder::new);
}
};
}
public Transformer<TcpClient<String, String>, Connection<String, String>> roundRobin() {
return new Transformer<TcpClient<String, String>, Connection<String, String>>() {
@Override
public Observable<Connection<String, String>> call(Observable<TcpClient<String, String>> clients) {
final List<Connection<String, String>> connections = new ArrayList<Connection<String, String>>();
final AtomicInteger position = new AtomicInteger(0);
clients
.flatMap(client -> client.createConnectionRequest())
.doOnNext(conn -> conn.getInput().subscribe(response -> System.out.println("Got response : "+ response)))
.subscribe(conn -> connections.add(conn));
return Observable.create(new OnSubscribe<Connection<String, String>>() {
@Override
public void call(Subscriber<? super Connection<String, String>> t1) {
if (connections.isEmpty()) {
t1.onError(new NoSuchElementException());
}
else {
Connection<String, String> conn = connections.get(position.incrementAndGet() % connections.size());
t1.onNext(conn);
t1.onCompleted();
}
}
});
}
};
}
public void run() {
Observable<Connection<String, String>> clients = getServers()
.map(clientFactory())
.compose(roundRobin());
Observable
.interval(1, TimeUnit.SECONDS)
.flatMap(counter -> clients.switchMap(conn -> conn.writeAndFlush("Request-" + counter + "\n")))
.subscribe();
}
public static void main(final String[] args) {
new LoadBalancerTest(10).run();
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment