Skip to content

Instantly share code, notes, and snippets.

@kimsterv
Created September 28, 2010 16:42
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kimsterv/601331 to your computer and use it in GitHub Desktop.
Save kimsterv/601331 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class PigJsonLoader extends LoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class);
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
private final JSONParser jsonParser_ = new JSONParser();
private LineRecordReader in = null;
public PigJsonLoader() {
}
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
String line;
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (t != null) {
return t;
}
}
return null;
}
protected Tuple parseStringToTuple(String line) {
try {
Map<String, String> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
for (Object key : jsonObj.keySet()) {
Object value = jsonObj.get(key);
values.put(key.toString(), value != null ? value.toString()
: null);
}
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}
@ccattuto
Copy link

[on a related topic]

I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird

com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.

com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map<String,Object> that Pig can use.

This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;

I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.

@ayonsinha
Copy link

I modified this code to convert JSONArray to Bags and nested Structures to Maps.

protected Tuple parseStringToTuple(String line) {
    try {
        Map<String, Object> values = Maps.newHashMap();
        JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
        flatten_value(jsonObj, values);
        return tupleFactory_.newTuple(values);
    } catch (ParseException e) {
        LOG.warn("Could not json-decode string: " + line, e);
        return null;
    } catch (NumberFormatException e) {
        LOG.warn("Very big number exceeds the scale of long: " + line, e);
        return null;
    }
}

private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
    for (Object key : jsonObj.keySet()) {
        String pref = key.toString();

        Object value = jsonObj.get(key);
        if(value instanceof JSONArray) {
            JSONArray array = (JSONArray)value;
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            int i = 0;
            for(Object innervalue :array) {
                flatten_array(innervalue, bag);
            }
            values.put(pref, bag);
        } else if (value instanceof JSONObject){
            Map<String, Object> values2 = Maps.newHashMap();
            flatten_value((JSONObject)value, values2);
            values.put(pref, tupleFactory_.newTuple(values2));
        } else {
            values.put(pref, value != null ? value.toString(): null);
        }

    }
}


private void flatten_array(Object value, DataBag bag) {
    if(value instanceof JSONArray) {
        JSONArray array = (JSONArray)value;
        DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
        int i = 0;
        for(Object innervalue :array) {
            flatten_array(innervalue, b);
            i++;
        }
        bag.addAll(b);
    } else if (value instanceof JSONObject){
        Map<String, Object> values2 = Maps.newHashMap();
        flatten_value((JSONObject)value, values2);
        bag.add(tupleFactory_.newTuple(values2));
    } else {
        if(value !=null) {
            bag.add( tupleFactory_.newTuple(value));
        }
    }

}

@mmay
Copy link

mmay commented Jun 15, 2011

Hi - Thank you for this, it's great! Nice work.

Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.

Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(

Thanks!

@dvryaboy
Copy link

Pig already has guava, and so does ElephantBird, so there is no real extra dependency.

@mmay
Copy link

mmay commented Jun 21, 2011

Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?

@kimsterv
Copy link
Author

kimsterv commented Jun 21, 2011 via email

@mmay
Copy link

mmay commented Jun 21, 2011

I see. I'm running 0.8.0

@kimsterv
Copy link
Author

kimsterv commented Jun 21, 2011 via email

@mmay
Copy link

mmay commented Jun 23, 2011

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

@kimsterv
Copy link
Author

kimsterv commented Jun 23, 2011 via email

@mmay
Copy link

mmay commented Jun 29, 2011

I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?

e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple

Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?

Thanks.

@ayonsinha
Copy link

Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment