public
Last active

Maximum value using Counters in MR

  • Download Gist
MaxUsingCounters.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
 
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
 
import junit.framework.Assert;
 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class MaxUsingCounters extends Configured implements Tool {
 
public static class CounterMapper extends
Mapper<LongWritable, Text, NullWritable, NullWritable> {
 
long max = -1;
 
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
max = ((FileSplit) context.getInputSplit()).getPath().getName()
.endsWith("1") ? 1 : 2;
System.out.println("Max is: " + max);
}
 
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
context.getCounter("foo", "bar").increment(max);
}
}
 
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
FileContext fc = FileContext.getFileContext(getConf());
 
createSampleFile(fc, new Path("sample1"));
createSampleFile(fc, new Path("sample2"));
 
Job job = new Job(getConf());
job.setMapperClass(CounterMapper.class);
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path("sample*"));
job.setOutputFormatClass(NullOutputFormat.class);
job.waitForCompletion(false);
 
if (job.isSuccessful()) {
JobClient client = new JobClient(getConf());
List<Long> ctrsList = new ArrayList<Long>();
for (TaskReport mapCtrs : client.getMapTaskReports(new JobID(job
.getJobID().getJtIdentifier(), job.getJobID().getId()))) {
ctrsList
.add(mapCtrs.getCounters().findCounter("foo", "bar").getValue());
}
long max = Collections.max(ctrsList);
System.out.println("Max foo-bar found as (should be 2): " + max);
} else {
return 1;
}
return 0;
}
 
private void createSampleFile(FileContext fc, Path inputFile)
throws IOException, FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
FSDataOutputStream os = fc.create(inputFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
CreateOpts.repFac((short) 1));
os.write("foo bar\n".getBytes());
os.close();
}
 
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MaxUsingCounters(), args));
}
 
}
Output.txt
1 2 3
➜ mr-max-using-counters $HADOOP_MR1_HOME/bin/hadoop jar /Users/harshchouraria/Work/workspace/mr-max-using-counters/target/mr-max-using-counters-0.1.jar MaxUsingCounters
Max foo-bar found as (should be 2): 2
pom.xml
XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.foo</groupId>
<artifactId>mr-max-using-counters</artifactId>
<version>0.1</version>
<name>A MapReduce job to find a max value using counters.</name>
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>cloudera</id>
<name>cloudera-repos</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.0.0-mr1-cdh4.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
</project>

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.