Created
March 2, 2012 02:42
-
-
Save coderplay/1955146 to your computer and use it in GitHub Desktop.
Asynchronous Hadoop MapRunner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.mapred; | |
import java.io.IOException; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.util.ReflectionUtils; | |
/** | |
* Async map runner that separates I/O and CPU workloads into 2 threads. It can | |
* be used instead of the default implementation to gain a good throughput. | |
*/ | |
public class AsyncMapRunner<K1, V1, K2, V2> implements | |
MapRunnable<K1, V1, K2, V2> { | |
private static final Log LOG = LogFactory.getLog(AsyncMapRunner.class | |
.getName()); | |
private static final int DEFAULT_QUEUE_SIZE = 1024; | |
private BlockingQueue<KvPair<K1, V1>> queue; | |
private final KvPair<K1, V1> FINISH_TOKEN = new KvPair<K1, V1>(); | |
private Mapper<K1, V1, K2, V2> mapper; | |
private ExecutorService executorService; | |
private boolean incrProcCount; | |
static class KvPair<K1, V1> { | |
K1 key; | |
V1 value; | |
// used for finished token | |
KvPair() { | |
} | |
KvPair(K1 key, V1 value) { | |
this.key = key; | |
this.value = value; | |
} | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void configure(JobConf job) { | |
int size = | |
job.getInt("mapred.map.asyncmaprunner.queue.size", DEFAULT_QUEUE_SIZE); | |
queue = new ArrayBlockingQueue<KvPair<K1, V1>>(size); | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Configuring jobConf " + job.getJobName() + " to use a " + size | |
+ " length queue"); | |
} | |
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job); | |
// increment processed counter only if skipping feature is enabled | |
this.incrProcCount = | |
SkipBadRecords.getMapperMaxSkipRecords(job) > 0 | |
&& SkipBadRecords.getAutoIncrMapperProcCount(job); | |
executorService = Executors.newSingleThreadExecutor(); | |
} | |
@Override | |
public void run(RecordReader<K1, V1> input, | |
final OutputCollector<K2, V2> output, final Reporter reporter) | |
throws IOException { | |
try { | |
Future<Void> future = executorService.submit(new Callable<Void>() { | |
public Void call() throws IOException { | |
while (true) { | |
try { | |
KvPair<K1, V1> pair = queue.take(); | |
if (pair == FINISH_TOKEN) { | |
// finished | |
break; | |
} | |
mapper.map(pair.key, pair.value, output, reporter); | |
if (incrProcCount) { | |
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, | |
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); | |
} | |
} catch (InterruptedException ex) { | |
// task is cancelled or executor service is shutdown | |
break; | |
} | |
} | |
return null; | |
} | |
}); | |
// allocate key & value instances that are re-used for all entries | |
K1 key = input.createKey(); | |
V1 value = input.createValue(); | |
while (input.next(key, value)) { | |
while (!queue.offer(new KvPair<K1, V1>(key, value)) | |
&& !future.isDone()) { | |
// the queue is full | |
Thread.yield(); | |
} | |
// allocate new key & value instances as mapper is running in another | |
// thread | |
key = input.createKey(); | |
value = input.createValue(); | |
} | |
while (!queue.offer(FINISH_TOKEN) && !future.isDone()) { | |
// the queue is full | |
Thread.yield(); | |
} | |
// wait until the task is done or an exception was thrown | |
try { | |
future.get(); | |
} catch (ExecutionException ee) { | |
throw new IOException(ee.getCause()); | |
} catch (InterruptedException inter) { | |
Thread.currentThread().interrupt(); | |
// we don't need the result, so cancel the task | |
future.cancel(true); | |
} | |
} finally { | |
executorService.shutdownNow(); | |
mapper.close(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.mapred.lib; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import java.util.Iterator; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.AsyncMapRunner; | |
import org.apache.hadoop.mapred.FileInputFormat; | |
import org.apache.hadoop.mapred.FileOutputFormat; | |
import org.apache.hadoop.mapred.HadoopTestCase; | |
import org.apache.hadoop.mapred.JobClient; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.Mapper; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reducer; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.RunningJob; | |
import org.apache.hadoop.mapred.TextInputFormat; | |
import org.apache.hadoop.mapred.TextOutputFormat; | |
public class TestAsyncMapRunner extends HadoopTestCase { | |
public TestAsyncMapRunner() throws IOException { | |
super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1); | |
} | |
public void testOKRun() throws Exception { | |
run(false, false); | |
} | |
public void testIOExRun() throws Exception { | |
run(true, false); | |
} | |
public void testRuntimeExRun() throws Exception { | |
run(false, true); | |
} | |
private void run(boolean ioEx, boolean rtEx) throws Exception { | |
Path inDir = new Path("testing/mt/input"); | |
Path outDir = new Path("testing/mt/output"); | |
// Hack for local FS that does not have the concept of a 'mounting point' | |
if (isLocalFS()) { | |
String localPathRoot = System.getProperty("test.build.data", "/tmp") | |
.replace(' ', '+'); | |
inDir = new Path(localPathRoot, inDir); | |
outDir = new Path(localPathRoot, outDir); | |
} | |
JobConf conf = createJobConf(); | |
FileSystem fs = FileSystem.get(conf); | |
fs.delete(outDir, true); | |
if (!fs.mkdirs(inDir)) { | |
throw new IOException("Mkdirs failed to create " + inDir.toString()); | |
} | |
{ | |
DataOutputStream file = fs.create(new Path(inDir, "part-0")); | |
file.writeBytes("a\nb\n\nc\nd\ne"); | |
file.close(); | |
} | |
conf.setJobName("mt"); | |
conf.setInputFormat(TextInputFormat.class); | |
conf.setOutputKeyClass(LongWritable.class); | |
conf.setOutputValueClass(Text.class); | |
conf.setMapOutputKeyClass(LongWritable.class); | |
conf.setMapOutputValueClass(Text.class); | |
conf.setOutputFormat(TextOutputFormat.class); | |
conf.setOutputKeyClass(LongWritable.class); | |
conf.setOutputValueClass(Text.class); | |
conf.setMapperClass(IDMap.class); | |
conf.setReducerClass(IDReduce.class); | |
FileInputFormat.setInputPaths(conf, inDir); | |
FileOutputFormat.setOutputPath(conf, outDir); | |
conf.setMapRunnerClass(AsyncMapRunner.class); | |
conf.setInt("mapred.map.asyncmaprunner.queue.size", 2); | |
if (ioEx) { | |
conf.setBoolean("async.ioException", true); | |
} | |
if (rtEx) { | |
conf.setBoolean("async.runtimeException", true); | |
} | |
JobClient jc = new JobClient(conf); | |
RunningJob job =jc.submitJob(conf); | |
while (!job.isComplete()) { | |
Thread.sleep(100); | |
} | |
if (job.isSuccessful()) { | |
assertFalse(ioEx || rtEx); | |
} | |
else { | |
assertTrue(ioEx || rtEx); | |
} | |
} | |
public static class IDMap implements Mapper<LongWritable, Text, | |
LongWritable, Text> { | |
private boolean ioEx = false; | |
private boolean rtEx = false; | |
public void configure(JobConf job) { | |
ioEx = job.getBoolean("async.ioException", false); | |
rtEx = job.getBoolean("async.runtimeException", false); | |
} | |
public void map(LongWritable key, Text value, | |
OutputCollector<LongWritable, Text> output, | |
Reporter reporter) | |
throws IOException { | |
if (ioEx) { | |
throw new IOException(); | |
} | |
if (rtEx) { | |
throw new RuntimeException(); | |
} | |
output.collect(key, value); | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
public void close() throws IOException { | |
} | |
} | |
public static class IDReduce implements Reducer<LongWritable, Text, | |
LongWritable, Text> { | |
public void configure(JobConf job) { | |
} | |
public void reduce(LongWritable key, Iterator<Text> values, | |
OutputCollector<LongWritable, Text> output, | |
Reporter reporter) | |
throws IOException { | |
while (values.hasNext()) { | |
output.collect(key, values.next()); | |
} | |
} | |
public void close() throws IOException { | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment