Skip to content

Instantly share code, notes, and snippets.

@bderickson
Created November 2, 2012 15:45
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save bderickson/4002132 to your computer and use it in GitHub Desktop.
Reducer implementation using GenericRecord as output instead of Pair
public static class AvroBamCoverageReducer
extends AvroReducer<String, Integer, GenericRecord> {
public void reduce(String key, Iterator<Integer> values,
AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
int sum = 0;
while(values.hasNext()) {
Integer value = values.next();
sum += value.intValue();
}
GenericRecord record = new GenericData.Record(coverageSchema);
String[] parts = key.toString().split(":");
record.put("chromosome", parts[0]);
record.put("position", parts[1]);
record.put("depth", sum);
collector.collect(record);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: BAMCoverage <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "BAM Coverage");
job.setJarByClass(BAMCoverage.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
Path outputPath = new Path(otherArgs[1]);
outputPath.getFileSystem(conf).delete(outputPath);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setMapperClass(BamCoverageMapper.class);
job.setInputFormatClass(AnySAMInputFormat.class);
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema intSchema = Schema.create(Schema.Type.INT);
Schema mapPairSchema = Pair.getPairSchema(stringSchema, intSchema);
AvroJob.setMapOutputSchema(conf, mapPairSchema);
AvroJob.setReducerClass(conf, AvroBamCoverageReducer.class);
AvroJob.setOutputSchema(conf, coverageSchema);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.wustl.genome.hadoop.coverage</groupId>
<artifactId>bam</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>bam_coverage</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>cloudera-repos</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>net.sf.samtools</groupId>
<artifactId>sam</artifactId>
<version>1.76</version>
</dependency>
<dependency>
<groupId>net.sf.picard</groupId>
<artifactId>picard</artifactId>
<version>1.76</version>
</dependency>
<dependency>
<groupId>fi.tkk.ics.hadoop.bam</groupId>
<artifactId>hadoop-bam</artifactId>
<version>6.0-alpha</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
java.lang.IllegalArgumentException: Not a Pair schema: {"type":"record","name":"PerPositionCoverage","namespace":"edu.wustl.genome.hadoop.coverage","fields":[{"name":"chromosome","type":"string"},{"name":"position","type":"int"},{"name":"depth","type":"int"}]}
at org.apache.avro.mapred.Pair.checkIsPairSchema(Pair.java:61)
at org.apache.avro.mapred.Pair.<init>(Pair.java:49)
at org.apache.avro.mapred.AvroReducer.reduce(AvroReducer.java:47)
at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:61)
at org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:30)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:469)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment