Skip to content

Instantly share code, notes, and snippets.

@jflatow
Created January 10, 2009 20:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jflatow/45551 to your computer and use it in GitHub Desktop.
Save jflatow/45551 to your computer and use it in GitHub Desktop.
Hadoop FASTA reader
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.io.Text;
/**
* @author Jared Flatow
* A FileInputFormat for reading FASTA records.
*/
public class FastaInputFormat extends FileInputFormat<Text, Text> {
public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
return new FastaRecordReader(job, (FileSplit) genericSplit);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
/**
* @author Jared Flatow
* A FASTA record has a header line that is the key, and data lines that are the value
* >header...
* data
* ...
*/
public class FastaRecordReader implements RecordReader<Text, Text> {
protected byte[] _RECORD_BEGIN, _KEY_VAL_SEP;
private FSDataInputStream _in;
private long _start, _end;
public FastaRecordReader(JobConf job, FileSplit split) throws IOException {
Path path = split.getPath();
_start = split.getStart();
_end = _start + split.getLength();
_in = path.getFileSystem(job).open(path);
_in.seek(_start);
_RECORD_BEGIN = ">".getBytes("UTF-8");
_KEY_VAL_SEP = "\n".getBytes("UTF-8");
}
public void close() throws IOException {
_in.close();
}
public Text createKey() {
return new Text();
}
public Text createValue() {
return new Text();
}
public long getPos() throws IOException {
return _in.getPos();
}
public float getProgress() throws IOException {
float length = _end - _start,
progress = getPos() - _start;
return length > 0 ? progress / length : 1;
}
public boolean next(Text key, Text value) throws IOException {
seekNextRecordBoundary();
if (getPos() >= _end)
return false;
return readKey(key) && readValue(value);
}
public boolean readKey(Text key) throws IOException {
DataOutputBuffer out = new DataOutputBuffer();
if (readUntilMatch(_KEY_VAL_SEP, true, false, out)) {
key.set(new String(out.getData(), 0, out.getLength()).replaceAll("\t", " "));
return true;
}
return false;
}
public boolean readValue(Text value) throws IOException {
DataOutputBuffer out = new DataOutputBuffer();
readUntilMatch(_RECORD_BEGIN, false, false, out);
value.set(new String(out.getData(), 0, out.getLength()).replaceAll("\n", ""));
return true;
}
private boolean readUntilMatch(byte[] pattern, boolean consumePattern, boolean emitPattern, DataOutputBuffer outBufOrNull) throws IOException {
long pos = getPos();
for (int numMatchingBytes = 0, b = _in.read(); b != -1; b = _in.read()) {
byte next = (byte) b;
if (next == pattern[numMatchingBytes] && ++numMatchingBytes == pattern.length) {
if (!consumePattern)
_in.seek(pos);
if (emitPattern && outBufOrNull != null)
outBufOrNull.write(pattern);
return true;
} else {
if (outBufOrNull != null) {
outBufOrNull.write(pattern, 0, numMatchingBytes);
outBufOrNull.write(next);
}
numMatchingBytes = 0;
pos = getPos();
}
}
return false;
}
private void seekNextRecordBoundary() throws IOException {
readUntilMatch(_RECORD_BEGIN, true, true, null);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment