public
Created

Reducer implementation using GenericRecord as output instead of Pair

  • Download Gist
BAMCoverage.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
public static class AvroBamCoverageReducer
extends AvroReducer<String, Integer, GenericRecord> {
 
public void reduce(String key, Iterator<Integer> values,
AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
int sum = 0;
while(values.hasNext()) {
Integer value = values.next();
sum += value.intValue();
}
 
GenericRecord record = new GenericData.Record(coverageSchema);
String[] parts = key.toString().split(":");
record.put("chromosome", parts[0]);
record.put("position", parts[1]);
record.put("depth", sum);
collector.collect(record);
}
}
 
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: BAMCoverage <in> <out>");
System.exit(2);
}
 
Job job = new Job(conf, "BAM Coverage");
job.setJarByClass(BAMCoverage.class);
 
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
Path outputPath = new Path(otherArgs[1]);
outputPath.getFileSystem(conf).delete(outputPath);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
job.setMapperClass(BamCoverageMapper.class);
job.setInputFormatClass(AnySAMInputFormat.class);
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema intSchema = Schema.create(Schema.Type.INT);
Schema mapPairSchema = Pair.getPairSchema(stringSchema, intSchema);
AvroJob.setMapOutputSchema(conf, mapPairSchema);
 
AvroJob.setReducerClass(conf, AvroBamCoverageReducer.class);
AvroJob.setOutputSchema(conf, coverageSchema);
 
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Stack Trace
1 2 3 4 5 6 7 8 9 10 11 12 13
java.lang.IllegalArgumentException: Not a Pair schema: {"type":"record","name":"PerPositionCoverage","namespace":"edu.wustl.genome.hadoop.coverage","fields":[{"name":"chromosome","type":"string"},{"name":"position","type":"int"},{"name":"depth","type":"int"}]}
at org.apache.avro.mapred.Pair.checkIsPairSchema(Pair.java:61)
at org.apache.avro.mapred.Pair.<init>(Pair.java:49)
at org.apache.avro.mapred.AvroReducer.reduce(AvroReducer.java:47)
at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:61)
at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:30)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:469)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
pom.xml
XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.wustl.genome.hadoop.coverage</groupId>
<artifactId>bam</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
 
<name>bam_coverage</name>
<url>http://maven.apache.org</url>
 
<repositories>
<repository>
<id>cloudera-repos</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
 
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>net.sf.samtools</groupId>
<artifactId>sam</artifactId>
<version>1.76</version>
</dependency>
<dependency>
<groupId>net.sf.picard</groupId>
<artifactId>picard</artifactId>
<version>1.76</version>
</dependency>
<dependency>
<groupId>fi.tkk.ics.hadoop.bam</groupId>
<artifactId>hadoop-bam</artifactId>
<version>6.0-alpha</version>
</dependency>
</dependencies>
 
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.