public
Created

  • Download Gist
PigJsonLoader.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
import java.io.IOException;
import java.util.Map;
 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.google.common.collect.Maps;
 
public class PigJsonLoader extends LoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class);
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
private final JSONParser jsonParser_ = new JSONParser();
private LineRecordReader in = null;
public PigJsonLoader() {
}
 
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
 
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
 
String line;
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
 
line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
 
if (t != null) {
return t;
}
}
 
return null;
}
 
protected Tuple parseStringToTuple(String line) {
try {
Map<String, String> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
for (Object key : jsonObj.keySet()) {
Object value = jsonObj.get(key);
values.put(key.toString(), value != null ? value.toString()
: null);
}
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
 
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
 
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}

Can you show a pig script sample that uses your this JSONLoader?

Thanks for sharing!

--example rolling up logs by day/user/method

--jar containing jsonpigloader
register shadoop-0.0.1.jar;
register guava-r06.jar;
register json-simple-1.1.jar;
register piggybank.jar;
register joda-time-1.6.jar;
DEFINE PigJsonLoader com.simplegeo.elephantgeo.pig.load.PigJsonLoader();
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE ISOToDay org.apache.pig.piggybank.evaluation.datetime.truncate.ISOToDay();

logs = LOAD '/logs' using PigJsonLoader as (json: map[]);

logs = FOREACH data GENERATE UnixToISO((long)((double)json#'timestamp')) AS ISOTime:chararray, json#'method' as method, json#'user:id' as user, json#'path' as path;

logs_by_day_user = FOREACH logs GENERATE (chararray) user, (chararray) ISOToDay(ISOTime) as day, method;

grouped_by_day_user = GROUP logs_by_day_user by ((chararray) user, day, (chararray) method) PARALLEL 32;
counted_by_day_user = FOREACH grouped_by_day_user GENERATE FLATTEN(logs_by_day_user), COUNT(logs_by_day_user);

STORE counted_by_day_user INTO '/output' using PigStorage(',');

Thanks!

Can this be added to the Piggy Bank or core PIg? It looks really useful.

Thanks again Kim. I've compiled this into my own piggybank and got a basic json file loaded. Is it supposed to deal with nested json data structures? I see you have json#'user:id' in the example above - but I can't get anything like that to work. In my case I try something like
samplecols = FOREACH fewlogs GENERATE json#'user' , json#'text' ;
and json#'user' is itself something like
{"location":"Chicago","statuses_count":57,"profile_background_tile":true,"lang":"en","profile_link_color":"f51883","id":237881805,"following":null,"protected":false,......}
And I can't access any entries in there.
Do I need to take your code and turn it into another UDF which works on fields already loaded rather than only parsing JSOn on loading?

Kim, nice parser. One enhancement that makes accessing nested Json accessible would be to return back a Map if a value is also JSON. I have this working with:
public Map parse(String line) throws ParseException {
JSONParser parser = new JSONParser();
return parse(line, parser);
}

private Map<String, Object> parse(String line, JSONParser jsonParser) throws ParseException {
  Map<String, Object> resultMap = new HashMap<String, Object>();
  JSONObject jsonObj = null;

  try {
    jsonObj = (JSONObject) jsonParser.parse(line);
  } catch (ClassCastException e) {
    throw new ParseException(1, "unexpected object");
  }

  if (jsonObj.isEmpty()) {
    return resultMap;
  }

  for (Object key : jsonObj.keySet()) {
    String keyStr = String.valueOf(key);
    String valueStr = String.valueOf(jsonObj.get(key));

    try {
      resultMap.put(keyStr, parse(valueStr));
    } catch (ParseException e) {
      // Nope not json
      resultMap.put(keyStr, valueStr);
    }
    return resultMap;     

}

So for accessing valB from X = {"keyA", {"keyB":"valB", "keyC","valC"}}, one can use x#'keyA'#'keyB' where X is the record being read.

}

}
Thanks again.

We should probably put this stuff into piggybank. I just haven't had the time to figure out how yet :-/

Definitely useful contribution if made :-)

Hi Folks,

I have a version of this which is nearly ready for Piggybank. I'd appreciate some hand holding - or I can post it as a gist for someone else to submit.

I have gotten it creating a nested tree of maps AND also inserts arrays as new Tuples. (If arrays are found in the above code then it gets inserted as one big bytearray)

I have removed the reference to Google Collections /Guava as that is one extra jar needed.

If I submit it to piggybank it will make sense to use the Jackson JSON parser instead of JSon Simple because that is already packaged with piggybank/pig.

alexmc6,
There are docs here on contributing to piggybank: http://wiki.apache.org/pig/PiggyBank
I've never done it so I can't give you any lessons learned. If you need additional help, I would email the pig forum.

[on a related topic]

I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird

com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.

com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map that Pig can use.

This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;

I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.

I modified this code to convert JSONArray to Bags and nested Structures to Maps.

protected Tuple parseStringToTuple(String line) {
    try {
        Map<String, Object> values = Maps.newHashMap();
        JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
        flatten_value(jsonObj, values);
        return tupleFactory_.newTuple(values);
    } catch (ParseException e) {
        LOG.warn("Could not json-decode string: " + line, e);
        return null;
    } catch (NumberFormatException e) {
        LOG.warn("Very big number exceeds the scale of long: " + line, e);
        return null;
    }
}

private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
    for (Object key : jsonObj.keySet()) {
        String pref = key.toString();

        Object value = jsonObj.get(key);
        if(value instanceof JSONArray) {
            JSONArray array = (JSONArray)value;
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            int i = 0;
            for(Object innervalue :array) {
                flatten_array(innervalue, bag);
            }
            values.put(pref, bag);
        } else if (value instanceof JSONObject){
            Map<String, Object> values2 = Maps.newHashMap();
            flatten_value((JSONObject)value, values2);
            values.put(pref, tupleFactory_.newTuple(values2));
        } else {
            values.put(pref, value != null ? value.toString(): null);
        }

    }
}


private void flatten_array(Object value, DataBag bag) {
    if(value instanceof JSONArray) {
        JSONArray array = (JSONArray)value;
        DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
        int i = 0;
        for(Object innervalue :array) {
            flatten_array(innervalue, b);
            i++;
        }
        bag.addAll(b);
    } else if (value instanceof JSONObject){
        Map<String, Object> values2 = Maps.newHashMap();
        flatten_value((JSONObject)value, values2);
        bag.add(tupleFactory_.newTuple(values2));
    } else {
        if(value !=null) {
            bag.add( tupleFactory_.newTuple(value));
        }
    }

}

Hi - Thank you for this, it's great! Nice work.

Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.

Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(

Thanks!

Pig already has guava, and so does ElephantBird, so there is no real extra dependency.

Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?

What version of pig are you using? I'm not sure if they've included it with
more recent versions.

On Tue, Jun 21, 2011 at 1:33 PM, mmay <
reply@reply.github.com>wrote:

Well, that's weird....When I try to run this script without registering the
google-collections (guava) jar, I get a ClassNotFoundException:
com.google.common.collect.Maps. I am doing something wrong?

Reply to this email directly or view it on GitHub:
https://gist.github.com/601331

I see. I'm running 0.8.0

Yeah, just add the jar to your PIG_CLASSPATH

On Tue, Jun 21, 2011 at 2:22 PM, mmay <
reply@reply.github.com>wrote:

I see. I'm running 0.8.0

Reply to this email directly or view it on GitHub:
https://gist.github.com/601331

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

Sure go for it. I might try to hack on a jsonloader that works with version
9 of pig.

On Thu, Jun 23, 2011 at 2:28 PM, mmay <
reply@reply.github.com>wrote:

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little
class into a patch and get it submitted into PiggyBank for all of you. I've
never done it before, but it seems straight-forward enough from the
directions. The only thing I would need to add are some unit tests
(shouldn't be a big deal). There is a jira issue pertaining to just this at:
https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who
helped write this that I left out) are cool with the apache license then I'd
love to go ahead and get a patch going. I think this is definitely a
must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

Reply to this email directly or view it on GitHub:
https://gist.github.com/601331

I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?

e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple

Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?

Thanks.

Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.