Skip to content

Instantly share code, notes, and snippets.

@kimsterv
Created September 28, 2010 16:42
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kimsterv/601331 to your computer and use it in GitHub Desktop.
Save kimsterv/601331 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class PigJsonLoader extends LoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class);
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
private final JSONParser jsonParser_ = new JSONParser();
private LineRecordReader in = null;
public PigJsonLoader() {
}
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
String line;
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (t != null) {
return t;
}
}
return null;
}
protected Tuple parseStringToTuple(String line) {
try {
Map<String, String> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
for (Object key : jsonObj.keySet()) {
Object value = jsonObj.get(key);
values.put(key.toString(), value != null ? value.toString()
: null);
}
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}
@kimsterv
Copy link
Author

kimsterv commented Jun 21, 2011 via email

@mmay
Copy link

mmay commented Jun 23, 2011

Cool.

If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914

Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.

Just let me know,
Thanks

@kimsterv
Copy link
Author

kimsterv commented Jun 23, 2011 via email

@mmay
Copy link

mmay commented Jun 29, 2011

I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?

e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple

Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?

Thanks.

@ayonsinha
Copy link

Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment