Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class PigJsonLoader extends LoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class);
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
private final JSONParser jsonParser_ = new JSONParser();
private LineRecordReader in = null;
public PigJsonLoader() {
}
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
String line;
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (t != null) {
return t;
}
}
return null;
}
protected Tuple parseStringToTuple(String line) {
try {
Map<String, String> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
for (Object key : jsonObj.keySet()) {
Object value = jsonObj.get(key);
values.put(key.toString(), value != null ? value.toString()
: null);
}
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}
@LazyBoy248

This comment has been minimized.

Show comment Hide comment
@LazyBoy248

LazyBoy248 Oct 22, 2010

Can you show a pig script sample that uses your this JSONLoader?

Thanks for sharing!

Can you show a pig script sample that uses your this JSONLoader?

Thanks for sharing!

@kimsterv

This comment has been minimized.

Show comment Hide comment
@kimsterv

kimsterv Jan 26, 2011

--example rolling up logs by day/user/method

--jar containing jsonpigloader
register shadoop-0.0.1.jar;
register guava-r06.jar;
register json-simple-1.1.jar;
register piggybank.jar;
register joda-time-1.6.jar;
DEFINE PigJsonLoader com.simplegeo.elephantgeo.pig.load.PigJsonLoader();
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE ISOToDay org.apache.pig.piggybank.evaluation.datetime.truncate.ISOToDay();

logs = LOAD '/logs' using PigJsonLoader as (json: map[]);

logs = FOREACH data GENERATE UnixToISO((long)((double)json#'timestamp')) AS ISOTime:chararray, json#'method' as method, json#'user:id' as user, json#'path' as path;

logs_by_day_user = FOREACH logs GENERATE (chararray) user, (chararray) ISOToDay(ISOTime) as day, method;

grouped_by_day_user = GROUP logs_by_day_user by ((chararray) user, day, (chararray) method) PARALLEL 32;
counted_by_day_user = FOREACH grouped_by_day_user GENERATE FLATTEN(logs_by_day_user), COUNT(logs_by_day_user);

STORE counted_by_day_user INTO '/output' using PigStorage(',');

Owner

kimsterv commented Jan 26, 2011

--example rolling up logs by day/user/method

--jar containing jsonpigloader
register shadoop-0.0.1.jar;
register guava-r06.jar;
register json-simple-1.1.jar;
register piggybank.jar;
register joda-time-1.6.jar;
DEFINE PigJsonLoader com.simplegeo.elephantgeo.pig.load.PigJsonLoader();
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE ISOToDay org.apache.pig.piggybank.evaluation.datetime.truncate.ISOToDay();

logs = LOAD '/logs' using PigJsonLoader as (json: map[]);

logs = FOREACH data GENERATE UnixToISO((long)((double)json#'timestamp')) AS ISOTime:chararray, json#'method' as method, json#'user:id' as user, json#'path' as path;

logs_by_day_user = FOREACH logs GENERATE (chararray) user, (chararray) ISOToDay(ISOTime) as day, method;

grouped_by_day_user = GROUP logs_by_day_user by ((chararray) user, day, (chararray) method) PARALLEL 32;
counted_by_day_user = FOREACH grouped_by_day_user GENERATE FLATTEN(logs_by_day_user), COUNT(logs_by_day_user);

STORE counted_by_day_user INTO '/output' using PigStorage(',');

@alexmc6

This comment has been minimized.

Show comment Hide comment
@alexmc6

alexmc6 Jan 27, 2011

Thanks!

Can this be added to the Piggy Bank or core PIg? It looks really useful.

alexmc6 commented Jan 27, 2011

Thanks!

Can this be added to the Piggy Bank or core PIg? It looks really useful.

@alexmc6

This comment has been minimized.

Show comment Hide comment
@alexmc6

alexmc6 Jan 28, 2011

Thanks again Kim. I've compiled this into my own piggybank and got a basic json file loaded. Is it supposed to deal with nested json data structures? I see you have json#'user:id' in the example above - but I can't get anything like that to work. In my case I try something like
samplecols = FOREACH fewlogs GENERATE json#'user' , json#'text' ;
and json#'user' is itself something like
{"location":"Chicago","statuses_count":57,"profile_background_tile":true,"lang":"en","profile_link_color":"f51883","id":237881805,"following":null,"protected":false,......}
And I can't access any entries in there.
Do I need to take your code and turn it into another UDF which works on fields already loaded rather than only parsing JSOn on loading?

alexmc6 commented Jan 28, 2011

Thanks again Kim. I've compiled this into my own piggybank and got a basic json file loaded. Is it supposed to deal with nested json data structures? I see you have json#'user:id' in the example above - but I can't get anything like that to work. In my case I try something like
samplecols = FOREACH fewlogs GENERATE json#'user' , json#'text' ;
and json#'user' is itself something like
{"location":"Chicago","statuses_count":57,"profile_background_tile":true,"lang":"en","profile_link_color":"f51883","id":237881805,"following":null,"protected":false,......}
And I can't access any entries in there.
Do I need to take your code and turn it into another UDF which works on fields already loaded rather than only parsing JSOn on loading?

@sanjayvacharya

This comment has been minimized.

Show comment Hide comment
@sanjayvacharya

sanjayvacharya Feb 7, 2011

Kim, nice parser. One enhancement that makes accessing nested Json accessible would be to return back a Map if a value is also JSON. I have this working with:
public Map<String, Object> parse(String line) throws ParseException {
JSONParser parser = new JSONParser();
return parse(line, parser);
}

private Map<String, Object> parse(String line, JSONParser jsonParser) throws ParseException {
  Map<String, Object> resultMap = new HashMap<String, Object>();
  JSONObject jsonObj = null;

  try {
    jsonObj = (JSONObject) jsonParser.parse(line);
  } catch (ClassCastException e) {
    throw new ParseException(1, "unexpected object");
  }

  if (jsonObj.isEmpty()) {
    return resultMap;
  }

  for (Object key : jsonObj.keySet()) {
    String keyStr = String.valueOf(key);
    String valueStr = String.valueOf(jsonObj.get(key));

    try {
      resultMap.put(keyStr, parse(valueStr));
    } catch (ParseException e) {
      // Nope not json
      resultMap.put(keyStr, valueStr);
    }
    return resultMap;     

}

So for accessing valB from X = {"keyA", {"keyB":"valB", "keyC","valC"}}, one can use x#'keyA'#'keyB' where X is the record being read.

}

}
Thanks again.

Kim, nice parser. One enhancement that makes accessing nested Json accessible would be to return back a Map if a value is also JSON. I have this working with:
public Map<String, Object> parse(String line) throws ParseException {
JSONParser parser = new JSONParser();
return parse(line, parser);
}

private Map<String, Object> parse(String line, JSONParser jsonParser) throws ParseException {
  Map<String, Object> resultMap = new HashMap<String, Object>();
  JSONObject jsonObj = null;

  try {
    jsonObj = (JSONObject) jsonParser.parse(line);
  } catch (ClassCastException e) {
    throw new ParseException(1, "unexpected object");
  }

  if (jsonObj.isEmpty()) {
    return resultMap;
  }

  for (Object key : jsonObj.keySet()) {
    String keyStr = String.valueOf(key);
    String valueStr = String.valueOf(jsonObj.get(key));

    try {
      resultMap.put(keyStr, parse(valueStr));
    } catch (ParseException e) {
      // Nope not json
      resultMap.put(keyStr, valueStr);
    }
    return resultMap;     

}

So for accessing valB from X = {"keyA", {"keyB":"valB", "keyC","valC"}}, one can use x#'keyA'#'keyB' where X is the record being read.

}

}
Thanks again.

@kimsterv

This comment has been minimized.

Show comment Hide comment
@kimsterv

kimsterv Feb 8, 2011

We should probably put this stuff into piggybank. I just haven't had the time to figure out how yet :-/

Owner

kimsterv commented Feb 8, 2011

We should probably put this stuff into piggybank. I just haven't had the time to figure out how yet :-/

@sanjayvacharya

This comment has been minimized.

Show comment Hide comment
@sanjayvacharya

sanjayvacharya Feb 9, 2011

Definitely useful contribution if made :-)

Definitely useful contribution if made :-)

@alexmc6

This comment has been minimized.

Show comment Hide comment
@alexmc6

alexmc6 Feb 9, 2011

Hi Folks,

I have a version of this which is nearly ready for Piggybank. I'd appreciate some hand holding - or I can post it as a gist for someone else to submit.

I have gotten it creating a nested tree of maps AND also inserts arrays as new Tuples. (If arrays are found in the above code then it gets inserted as one big bytearray)

I have removed the reference to Google Collections /Guava as that is one extra jar needed.

If I submit it to piggybank it will make sense to use the Jackson JSON parser instead of JSon Simple because that is already packaged with piggybank/pig.

alexmc6 commented Feb 9, 2011

Hi Folks,

I have a version of this which is nearly ready for Piggybank. I'd appreciate some hand holding - or I can post it as a gist for someone else to submit.

I have gotten it creating a nested tree of maps AND also inserts arrays as new Tuples. (If arrays are found in the above code then it gets inserted as one big bytearray)

I have removed the reference to Google Collections /Guava as that is one extra jar needed.

If I submit it to piggybank it will make sense to use the Jackson JSON parser instead of JSon Simple because that is already packaged with piggybank/pig.

@kimsterv

This comment has been minimized.

Show comment Hide comment
@kimsterv

kimsterv Feb 9, 2011

alexmc6,
There are docs here on contributing to piggybank: http://wiki.apache.org/pig/PiggyBank
I've never done it so I can't give you any lessons learned. If you need additional help, I would email the pig forum.

Owner

kimsterv commented Feb 9, 2011

alexmc6,
There are docs here on contributing to piggybank: http://wiki.apache.org/pig/PiggyBank
I've never done it so I can't give you any lessons learned. If you need additional help, I would email the pig forum.

@ccattuto

This comment has been minimized.

Show comment Hide comment
@ccattuto

ccattuto Mar 17, 2011

[on a related topic]

I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird

com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.

com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map<String,Object> that Pig can use.

This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;

I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.

[on a related topic]

I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird

com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.

com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map<String,Object> that Pig can use.

This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;

I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.

@ayonsinha

This comment has been minimized.

Show comment Hide comment
@ayonsinha

ayonsinha May 19, 2011

I modified this code to convert JSONArray to Bags and nested Structures to Maps.

protected Tuple parseStringToTuple(String line) {
    try {
        Map<String, Object> values = Maps.newHashMap();
        JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
        flatten_value(jsonObj, values);
        return tupleFactory_.newTuple(values);
    } catch (ParseException e) {
        LOG.warn("Could not json-decode string: " + line, e);
        return null;
    } catch (NumberFormatException e) {
        LOG.warn("Very big number exceeds the scale of long: " + line, e);
        return null;
    }
}

private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
    for (Object key : jsonObj.keySet()) {
        String pref = key.toString();

        Object value = jsonObj.get(key);
        if(value instanceof JSONArray) {
            JSONArray array = (JSONArray)value;
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            int i = 0;
            for(Object innervalue :array) {
                flatten_array(innervalue, bag);
            }
            values.put(pref, bag);
        } else if (value instanceof JSONObject){
            Map<String, Object> values2 = Maps.newHashMap();
            flatten_value((JSONObject)value, values2);
            values.put(pref, tupleFactory_.newTuple(values2));
        } else {
            values.put(pref, value != null ? value.toString(): null);
        }

    }
}


private void flatten_array(Object value, DataBag bag) {
    if(value instanceof JSONArray) {
        JSONArray array = (JSONArray)value;
        DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
        int i = 0;
        for(Object innervalue :array) {
            flatten_array(innervalue, b);
            i++;
        }
        bag.addAll(b);
    } else if (value instanceof JSONObject){
        Map<String, Object> values2 = Maps.newHashMap();
        flatten_value((JSONObject)value, values2);
        bag.add(tupleFactory_.newTuple(values2));
    } else {
        if(value !=null) {
            bag.add( tupleFactory_.newTuple(value));
        }
    }

}

I modified this code to convert JSONArray to Bags and nested Structures to Maps.

protected Tuple parseStringToTuple(String line) {
    try {
        Map<String, Object> values = Maps.newHashMap();
        JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
        flatten_value(jsonObj, values);
        return tupleFactory_.newTuple(values);
    } catch (ParseException e) {
        LOG.warn("Could not json-decode string: " + line, e);
        return null;
    } catch (NumberFormatException e) {
        LOG.warn("Very big number exceeds the scale of long: " + line, e);
        return null;
    }
}

private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
    for (Object key : jsonObj.keySet()) {
        String pref = key.toString();

        Object value = jsonObj.get(key);
        if(value instanceof JSONArray) {
            JSONArray array = (JSONArray)value;
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            int i = 0;
            for(Object innervalue :array) {
                flatten_array(innervalue, bag);
            }
            values.put(pref, bag);
        } else if (value instanceof JSONObject){
            Map<String, Object> values2 = Maps.newHashMap();
            flatten_value((JSONObject)value, values2);
            values.put(pref, tupleFactory_.newTuple(values2));
        } else {
            values.put(pref, value != null ? value.toString(): null);
        }

    }
}


private void flatten_array(Object value, DataBag bag) {
    if(value instanceof JSONArray) {
        JSONArray array = (JSONArray)value;
        DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
        int i = 0;
        for(Object innervalue :array) {
            flatten_array(innervalue, b);
            i++;
        }
        bag.addAll(b);
    } else if (value instanceof JSONObject){
        Map<String, Object> values2 = Maps.newHashMap();
        flatten_value((JSONObject)value, values2);
        bag.add(tupleFactory_.newTuple(values2));
    } else {
        if(value !=null) {
            bag.add( tupleFactory_.newTuple(value));
        }
    }

}
@mmay

This comment has been minimized.

Show comment Hide comment
@mmay

mmay Jun 15, 2011

Hi - Thank you for this, it's great! Nice work.

Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.

Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(

Thanks!

mmay commented Jun 15, 2011

Hi - Thank you for this, it's great! Nice work.

Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.

Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(

Thanks!

@dvryaboy

This comment has been minimized.

Show comment Hide comment
@dvryaboy

dvryaboy Jun 18, 2011

Pig already has guava, and so does ElephantBird, so there is no real extra dependency.

Pig already has guava, and so does ElephantBird, so there is no real extra dependency.

@mmay

This comment has been minimized.

Show comment Hide comment
@mmay

mmay Jun 21, 2011

Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?

mmay commented Jun 21, 2011

Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?

@kimsterv

This comment has been minimized.

Show comment Hide comment
@kimsterv

kimsterv Jun 21, 2011

Owner

kimsterv commented Jun 21, 2011

@mmay

This comment has been minimized.

Show comment Hide comment
@mmay

mmay Jun 21, 2011

I see. I'm running 0.8.0

mmay commented Jun 21, 2011

I see. I'm running 0.8.0

@kimsterv

This comment has been minimized.

Show comment Hide comment
@kimsterv

kimsterv Jun 21, 2011

Owner

kimsterv commented Jun 21, 2011

@mmay

This comment has been minimized.

Show comment Hide comment
@mmay

mmay Jun 23, 2011

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

mmay commented Jun 23, 2011

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

@kimsterv

This comment has been minimized.

Show comment Hide comment
@kimsterv

kimsterv Jun 23, 2011

Owner

kimsterv commented Jun 23, 2011

@mmay

This comment has been minimized.

Show comment Hide comment
@mmay

mmay Jun 29, 2011

I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?

e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple

Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?

Thanks.

mmay commented Jun 29, 2011

I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?

e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple

Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?

Thanks.

@ayonsinha

This comment has been minimized.

Show comment Hide comment
@ayonsinha

ayonsinha Jun 29, 2011

Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;

Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment