Skip to content

Instantly share code, notes, and snippets.

@tf0054
Created March 22, 2012 14:54
Show Gist options
  • Save tf0054/2158785 to your computer and use it in GitHub Desktop.
Save tf0054/2158785 to your computer and use it in GitHub Desktop.
Hack#48
package org.hadoophacks.pig;
import org.apache.pig.StoreFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class SequenceFileStore extends StoreFunc{
private RecordWriter<Text , Text> writer;
private String delim = " ";
public SequenceFileStore(){}
public SequenceFileStore(String delim){
this.delim = delim;
}
@Override
public void setStoreLocation(String location , Job job)throws IOException{
FileOutputFormat.setOutputPath(job , new Path(location));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
}
@Override
public OutputFormat getOutputFormat(){
return new SequenceFileOutputFormat<Text , Text>();
}
@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(RecordWriter writer){
this.writer = (RecordWriter<Text , Text>)writer;
}
@Override
public void putNext(Tuple tuple)throws IOException{
try{
int tupleSize = tuple.size();
if(tupleSize >= 2){
Object obj1 = tuple.get(0);
if(obj1 == null){
return;
}
Text key = new Text(obj1.toString());
StringBuilder builder = new StringBuilder(tuple.get(1).toString());
for(int i = 2;i < tupleSize;i++){
builder.append(delim + tuple.get(i).toString());
}
writer.write(key , new Text(builder.toString()));
}
}catch(InterruptedException e){
throw new ExecException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment