Skip to content

Instantly share code, notes, and snippets.

@pveentjer
Created June 25, 2014 07:12
Show Gist options
  • Save pveentjer/4072e4b7bd14072083d6 to your computer and use it in GitHub Desktop.
Save pveentjer/4072e4b7bd14072083d6 to your computer and use it in GitHub Desktop.
package com.hazelcast.projectx.io.maggot;
import com.hazelcast.projectx.InvocationFuture;
import com.hazelcast.projectx.config.Config;
import com.hazelcast.projectx.io.Address;
import com.hazelcast.projectx.io.AddressComparator;
import com.hazelcast.projectx.io.Network;
import com.hazelcast.projectx.io.PartitionEndpoint;
import com.hazelcast.projectx.spi.SpiInvoker;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
public class MaggotNetwork implements Network {
private final Config config;
private final SpiInvoker invoker;
private final Address[] addresses;
private final Endpoint[] endpoints;
public MaggotNetwork(Settings settings) {
this.config = settings.config;
this.invoker = settings.invoker;
this.addresses = initAddresses();
this.endpoints = new Endpoint[config.partitionCount];
}
private Address[] initAddresses() {
List<Address> addresses = new LinkedList<Address>();
for (Address baseAddress : config.addresses) {
for (int port = baseAddress.port; port < baseAddress.port + config.partitionThreadCount; port++) {
addresses.add(new Address(baseAddress.hostname, port));
}
}
Collections.sort(addresses, new AddressComparator());
return addresses.toArray(new Address[addresses.size()]);
}
@Override
public PartitionEndpoint getEndpoint(int partitionId) {
return endpoints[partitionId];
}
@Override
public void start() {
//start the server threads.
for (int k = 0; k < config.partitionThreadCount; k++) {
try {
int port = k + config.port;
startServerThread(port);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
startEndpoints();
//start the client threads.
for (int k = 0; k < config.partitionThreadCount; k++) {
try {
startClientThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Maggot Network started");
}
private void startClientThread() throws InterruptedException {
Bootstrap b = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OperationClientHandler());
}
});
// Start the client.
//ChannelFuture f = b.connect(host, port).sync(); // (5)
}
/**
* Starts a server thread on a given port. To add more parallelization, we just open more ports. Each port will
* be responsible for handling a set of partitions. On the client side you calculate the position, then you calculate
* the port of the remote machine and send the operation directly to the right cpu.
*
* @param port
* @throws InterruptedException
*/
private void startServerThread(int port) throws InterruptedException {
//we only want to have 1 server thread per port.
int threadCount = 1;
EventLoopGroup bossGroup = new NioEventLoopGroup(threadCount);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OperationDecoder(), new OperationServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
System.out.println("started: " + port);
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
//f.channel().closeFuture().sync();
}
private void startEndpoints() {
Map<Address, Endpoint> lookup = new HashMap<Address, Endpoint>();
for (int partitionId = 0; partitionId < endpoints.length; partitionId++) {
Address address = getAddress(partitionId);
Endpoint endpoint = lookup.get(address);
if (endpoint == null) {
endpoint = new Endpoint(address);
}
endpoints[partitionId] = endpoint;
}
}
private Address getAddress(int partitionId) {
int index = partitionId % addresses.length;
return addresses[index];
}
@Override
public void shutdown() {
}
/**
* Responsible for decoding an operation. This is run server-side.
*/
public class OperationDecoder extends ByteToMessageDecoder {
private int size = -1;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (size == -1) {
if (in.readableBytes() < 4) {
return;
}
size = in.readInt();
}
if (in.readableBytes() < size) {
return;
}
byte[] operation = new byte[size];
in.readBytes(operation);
out.add(operation);
size = -1;
}
}
/**
* This functionality works fine.
* <p/>
* Few enhancements:
* - instead of creating a response buffer, we should reuse the existing one
* - instead op copying the operation into an array, we should pass the byte-buffer (to prevent unwanted copying).
*/
public class OperationServerHandler extends ChannelInboundHandlerAdapter {
//todo: currently not used.
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(1024);
// System.out.println("Channel handler added");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release();
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuffer response = ByteBuffer.allocate(1024);
byte[] operation = (byte[]) msg;
invoker.invoke(operation, response);
response.flip();
ctx.write(Unpooled.wrappedBuffer(response));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
/**
* Responsible for translating a client request, e.g. map.get(foo) to an operation, sending
* it to the right port on the right machine and waiting for a response.
*
* The Endpoint is fixed, but the address is points to can change over time (in the future) e.g. when
* partitions are moving around. So you only need to get the PartitionEndpoint once and then you can
* stick it into field for example instead of looking it up for every call.
*/
public class Endpoint implements PartitionEndpoint {
private final ConcurrentMap<Long, InvocationFuture> calls = new ConcurrentHashMap<>();
private final AtomicLong nextCallId = new AtomicLong();
private final SocketAddress address;
private final OperationClientHandler clientHandler = null;
public Endpoint(Address address) {
this.address = new InetSocketAddress(address.hostname, address.port);
}
@Override
public ByteBuffer newOperationByteBuffer() {
return ByteBuffer.allocate(1024);
}
@Override
public long register(InvocationFuture invocationFuture) {
long callId = nextCallId.incrementAndGet();
calls.put(callId, invocationFuture);
return callId;
}
@Override
public void write(ByteBuffer operation) {
//switch to reading mode
operation.flip();
}
}
//todo: work in progress
/**
* Responsible for receiving operations from an endpoint, and sending them to the right machine.
* In our nio implementation, the selection-key for a channel we give a kick after we offer
* some work on the work-queue. This triggers the client-thread to drain the work-queue and
* send the operations to the right machine. The OperationClientHandler also is responsible for receiving
* the result and notifying the future associated with that operation (through a call-id)
*/
public class OperationClientHandler extends SimpleChannelInboundHandler {
private ChannelHandlerContext ctx;
private int receivedMessages;
private int next = 1;
final ConcurrentLinkedQueue answer = new ConcurrentLinkedQueue();
@Override
public void channelActive(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, final Object msg) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public static class ConnectRequest {
public Address address;
public MaggotNetwork.Endpoint endpoint;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment