Skip to content

Instantly share code, notes, and snippets.

@ayonsinha
Forked from kimsterv/PigJsonLoader.java
Created May 19, 2011 00:30
Show Gist options
  • Save ayonsinha/979913 to your computer and use it in GitHub Desktop.
Save ayonsinha/979913 to your computer and use it in GitHub Desktop.
Parses JSONArrays into Bags
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class PigJsonLoader extends LoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class);
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
private final JSONParser jsonParser_ = new JSONParser();
private LineRecordReader in = null;
public PigJsonLoader() {
}
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
String line;
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (t != null) {
return t;
}
}
return null;
}
protected Tuple parseStringToTuple(String line) {
try {
Map<String, Object> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
flatten_value(jsonObj, values);
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
for (Object key : jsonObj.keySet()) {
String pref = key.toString();
Object value = jsonObj.get(key);
if(value instanceof JSONArray) {
JSONArray array = (JSONArray)value;
DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
int i = 0;
for(Object innervalue :array) {
flatten_array(innervalue, bag);
}
values.put(pref, bag);
} else if (value instanceof JSONObject){
Map<String, Object> values2 = Maps.newHashMap();
flatten_value((JSONObject)value, values2);
values.put(pref, tupleFactory_.newTuple(values2));
} else {
values.put(pref, value != null ? value.toString(): null);
}
}
}
private void flatten_array(Object value, DataBag bag) {
if(value instanceof JSONArray) {
JSONArray array = (JSONArray)value;
DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
int i = 0;
for(Object innervalue :array) {
flatten_array(innervalue, b);
i++;
}
bag.addAll(b);
} else if (value instanceof JSONObject){
Map<String, Object> values2 = Maps.newHashMap();
flatten_value((JSONObject)value, values2);
bag.add(tupleFactory_.newTuple(values2));
} else {
if(value !=null) {
bag.add( tupleFactory_.newTuple(value));
}
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment