Skip to content

Instantly share code, notes, and snippets.

@coderplay
Last active August 29, 2015 13:59
Show Gist options
  • Save coderplay/10690572 to your computer and use it in GitHub Desktop.
Save coderplay/10690572 to your computer and use it in GitHub Desktop.
Push mode data shuffling desgin notes

MapReduce Pull mode

Push mode

Map启动时, 需要知道下游所有Reduce的地址

Reduce启动时, 需要知道上游所有Map的地址

Fault tolerance

Data skew

pseudo code

1. 建立partition与NetworkChannel的对应关系

// 由于partition是连续的整数, 所以可以用数组表示partition与NetworkChannel的对应关系
// 这种方法比HashMap<ParitionId, Channel>更高效
Channel[] channels = new Channel[partitions.length]
// 一个partition对应一个文件
String[]  files = new String[partition.length]

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup) ...

for(int i = 0; i < partitions.length; i++) {
  ChannelFuture future = bootstrap.connect(new InetSocketAddress(partition[i].targetIP,partition[i].targetPort));
  future.addListener( new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
      channels[i] = future.channel();
      files[i] = "../stageid/part-" + i "" 
    }
  }
}

2. 用户map()方法插入KV

write(K key, V value) {
  int partition = partitioner.getPartition(key);
  // pipeline下游的handler判断: 如果buffer已满, 则spill buffer 并send buffer
  channels[partition].write(key, value, partition)
}

3. 处理map输出的handler

//  此handler判断: 如果buffer已满, 则spill buffer 并send buffer
class SpillAndSendHandler extends ChannelOutboundHandlerAdapter {
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
    Channel channel = ctx.channel();
    Byte buf = channel.unsafe().outboundBuffer();
    buf.write(msg);
    channel.unsafe().outboundBuffer() = another new buffer from threadpool
    if(buf is full) {
      spillToDisk(files[msg.getPartition], buf);           // buf写入磁盘, block io
      channel.writeAndFlush(buf); // 向目标partition节点发送buf
    }
  }
}

Benchmark Design

借助akka cluster的功能, 可以监测到节点up和down的事件, 方便统一调度工作节点. 首先启一个master, 这个master用来监测到节点up和down的事件。然后所有slaves节点启动, master监测所有节点都已经启动之后, 分配一些节点作为运行map tasks的节点, 另一些节点作为运行reduce tasks的节点. map 节点分配的同时, 告诉它所有reduce节点的位置. reduce节点分配的同时, 告诉它所有map节点的位置. map task一旦启动, 就进一个循环, 不断地造数据, 分partition后把数据push给对应的reduce端. reduce task启动, 用netty监听网络事件, 有数据到达, 则进入reduce 接收数据的阶段.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment