Map启动时, 需要知道下游所有Reduce的地址
Reduce启动时, 需要知道上游所有Map的地址
// 由于partition是连续的整数, 所以可以用数组表示partition与NetworkChannel的对应关系
// 这种方法比HashMap<ParitionId, Channel>更高效
Channel[] channels = new Channel[partitions.length]
// 一个partition对应一个文件
String[] files = new String[partition.length]
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup) ...
for(int i = 0; i < partitions.length; i++) {
ChannelFuture future = bootstrap.connect(new InetSocketAddress(partition[i].targetIP,partition[i].targetPort));
future.addListener( new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
channels[i] = future.channel();
files[i] = "../stageid/part-" + i ""
}
}
}
write(K key, V value) {
int partition = partitioner.getPartition(key);
// pipeline下游的handler判断: 如果buffer已满, 则spill buffer 并send buffer
channels[partition].write(key, value, partition)
}
// 此handler判断: 如果buffer已满, 则spill buffer 并send buffer
class SpillAndSendHandler extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
Channel channel = ctx.channel();
Byte buf = channel.unsafe().outboundBuffer();
buf.write(msg);
channel.unsafe().outboundBuffer() = another new buffer from threadpool
if(buf is full) {
spillToDisk(files[msg.getPartition], buf); // buf写入磁盘, block io
channel.writeAndFlush(buf); // 向目标partition节点发送buf
}
}
}
借助akka cluster的功能, 可以监测到节点up和down的事件, 方便统一调度工作节点. 首先启一个master, 这个master用来监测到节点up和down的事件。然后所有slaves节点启动, master监测所有节点都已经启动之后, 分配一些节点作为运行map tasks的节点, 另一些节点作为运行reduce tasks的节点. map 节点分配的同时, 告诉它所有reduce节点的位置. reduce节点分配的同时, 告诉它所有map节点的位置. map task一旦启动, 就进一个循环, 不断地造数据, 分partition后把数据push给对应的reduce端. reduce task启动, 用netty监听网络事件, 有数据到达, 则进入reduce 接收数据的阶段.