Skip to content

@kimsterv /PigJsonLoader.java
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class PigJsonLoader extends LoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class);
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
private final JSONParser jsonParser_ = new JSONParser();
private LineRecordReader in = null;
public PigJsonLoader() {
}
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
String line;
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (t != null) {
return t;
}
}
return null;
}
protected Tuple parseStringToTuple(String line) {
try {
Map<String, String> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
for (Object key : jsonObj.keySet()) {
Object value = jsonObj.get(key);
values.put(key.toString(), value != null ? value.toString()
: null);
}
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}
@LazyBoy248

Can you show a pig script sample that uses your this JSONLoader?

Thanks for sharing!

@kimsterv
Owner

--example rolling up logs by day/user/method

--jar containing jsonpigloader
register shadoop-0.0.1.jar;
register guava-r06.jar;
register json-simple-1.1.jar;
register piggybank.jar;
register joda-time-1.6.jar;
DEFINE PigJsonLoader com.simplegeo.elephantgeo.pig.load.PigJsonLoader();
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE ISOToDay org.apache.pig.piggybank.evaluation.datetime.truncate.ISOToDay();

logs = LOAD '/logs' using PigJsonLoader as (json: map[]);

logs = FOREACH data GENERATE UnixToISO((long)((double)json#'timestamp')) AS ISOTime:chararray, json#'method' as method, json#'user:id' as user, json#'path' as path;

logs_by_day_user = FOREACH logs GENERATE (chararray) user, (chararray) ISOToDay(ISOTime) as day, method;

grouped_by_day_user = GROUP logs_by_day_user by ((chararray) user, day, (chararray) method) PARALLEL 32;
counted_by_day_user = FOREACH grouped_by_day_user GENERATE FLATTEN(logs_by_day_user), COUNT(logs_by_day_user);

STORE counted_by_day_user INTO '/output' using PigStorage(',');

@alexmc6

Thanks!

Can this be added to the Piggy Bank or core PIg? It looks really useful.

@alexmc6

Thanks again Kim. I've compiled this into my own piggybank and got a basic json file loaded. Is it supposed to deal with nested json data structures? I see you have json#'user:id' in the example above - but I can't get anything like that to work. In my case I try something like
samplecols = FOREACH fewlogs GENERATE json#'user' , json#'text' ;
and json#'user' is itself something like
{"location":"Chicago","statuses_count":57,"profile_background_tile":true,"lang":"en","profile_link_color":"f51883","id":237881805,"following":null,"protected":false,......}
And I can't access any entries in there.
Do I need to take your code and turn it into another UDF which works on fields already loaded rather than only parsing JSOn on loading?

@sanjayvacharya

Kim, nice parser. One enhancement that makes accessing nested Json accessible would be to return back a Map if a value is also JSON. I have this working with:
public Map parse(String line) throws ParseException {
JSONParser parser = new JSONParser();
return parse(line, parser);
}

private Map<String, Object> parse(String line, JSONParser jsonParser) throws ParseException {
  Map<String, Object> resultMap = new HashMap<String, Object>();
  JSONObject jsonObj = null;

  try {
    jsonObj = (JSONObject) jsonParser.parse(line);
  } catch (ClassCastException e) {
    throw new ParseException(1, "unexpected object");
  }

  if (jsonObj.isEmpty()) {
    return resultMap;
  }

  for (Object key : jsonObj.keySet()) {
    String keyStr = String.valueOf(key);
    String valueStr = String.valueOf(jsonObj.get(key));

    try {
      resultMap.put(keyStr, parse(valueStr));
    } catch (ParseException e) {
      // Nope not json
      resultMap.put(keyStr, valueStr);
    }
    return resultMap;     

}

So for accessing valB from X = {"keyA", {"keyB":"valB", "keyC","valC"}}, one can use x#'keyA'#'keyB' where X is the record being read.

}

}
Thanks again.

@kimsterv
Owner

We should probably put this stuff into piggybank. I just haven't had the time to figure out how yet :-/

@sanjayvacharya

Definitely useful contribution if made :-)

@alexmc6

Hi Folks,

I have a version of this which is nearly ready for Piggybank. I'd appreciate some hand holding - or I can post it as a gist for someone else to submit.

I have gotten it creating a nested tree of maps AND also inserts arrays as new Tuples. (If arrays are found in the above code then it gets inserted as one big bytearray)

I have removed the reference to Google Collections /Guava as that is one extra jar needed.

If I submit it to piggybank it will make sense to use the Jackson JSON parser instead of JSon Simple because that is already packaged with piggybank/pig.

@kimsterv
Owner

alexmc6,
There are docs here on contributing to piggybank: http://wiki.apache.org/pig/PiggyBank
I've never done it so I can't give you any lessons learned. If you need additional help, I would email the pig forum.

@ccattuto

[on a related topic]

I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird

com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.

com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map that Pig can use.

This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;

I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.

@ayonsinha

I modified this code to convert JSONArray to Bags and nested Structures to Maps.

protected Tuple parseStringToTuple(String line) {
    try {
        Map<String, Object> values = Maps.newHashMap();
        JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
        flatten_value(jsonObj, values);
        return tupleFactory_.newTuple(values);
    } catch (ParseException e) {
        LOG.warn("Could not json-decode string: " + line, e);
        return null;
    } catch (NumberFormatException e) {
        LOG.warn("Very big number exceeds the scale of long: " + line, e);
        return null;
    }
}

private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
    for (Object key : jsonObj.keySet()) {
        String pref = key.toString();

        Object value = jsonObj.get(key);
        if(value instanceof JSONArray) {
            JSONArray array = (JSONArray)value;
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            int i = 0;
            for(Object innervalue :array) {
                flatten_array(innervalue, bag);
            }
            values.put(pref, bag);
        } else if (value instanceof JSONObject){
            Map<String, Object> values2 = Maps.newHashMap();
            flatten_value((JSONObject)value, values2);
            values.put(pref, tupleFactory_.newTuple(values2));
        } else {
            values.put(pref, value != null ? value.toString(): null);
        }

    }
}


private void flatten_array(Object value, DataBag bag) {
    if(value instanceof JSONArray) {
        JSONArray array = (JSONArray)value;
        DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
        int i = 0;
        for(Object innervalue :array) {
            flatten_array(innervalue, b);
            i++;
        }
        bag.addAll(b);
    } else if (value instanceof JSONObject){
        Map<String, Object> values2 = Maps.newHashMap();
        flatten_value((JSONObject)value, values2);
        bag.add(tupleFactory_.newTuple(values2));
    } else {
        if(value !=null) {
            bag.add( tupleFactory_.newTuple(value));
        }
    }

}
@mmay

Hi - Thank you for this, it's great! Nice work.

Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.

Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(

Thanks!

@dvryaboy

Pig already has guava, and so does ElephantBird, so there is no real extra dependency.

@mmay

Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?

@kimsterv
Owner
@mmay

I see. I'm running 0.8.0

@kimsterv
Owner
@mmay

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

@kimsterv
Owner
@mmay

I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?

e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple

Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?

Thanks.

@ayonsinha

Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.