Skip to content

Instantly share code, notes, and snippets.

@danharvey
Created January 31, 2011 18:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danharvey/804551 to your computer and use it in GitHub Desktop.
Save danharvey/804551 to your computer and use it in GitHub Desktop.
package uk.co.danharvey.pig.storage;
import java.io.IOException;
public class JsonStorage extends StoreFunc {
private RecordWriter<Text, NullWritable> writer;
private ResourceSchema schema;
@Override
public OutputFormat getOutputFormat() throws IOException {
return new TextOutputFormat<Text, NullWritable>();
}
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}
@Override
public void putNext(Tuple tuple) throws IOException {
JSONObject json = tupleToJson(null, tuple);
try {
writer.write(new Text(json.toString()), NullWritable.get());
writer.write(new Text(s), NullWritable.get());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
if (location.endsWith(".bz2")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
} else if (location.endsWith(".gz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
}
@Override
public void checkSchema(ResourceSchema s) {
this.schema = s;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment