Skip to content

Instantly share code, notes, and snippets.

@sureshsaggar
Created August 13, 2012 15:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sureshsaggar/3342047 to your computer and use it in GitHub Desktop.
Save sureshsaggar/3342047 to your computer and use it in GitHub Desktop.
Custom loader UDF extending LoadFunc of PIG Latin.
package com.ss.analytics.pig;
public class SSPigLoader extends LoadFunc{
private byte fieldDel = '|';
protected RecordReader in = null;
private ArrayList<Object> tokensArrayList = null;
private TupleFactory tupleFactory = TupleFactory.getInstance();
private List<String> reqKeyNames;
@Override
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}
public SSPigLoader() {
this.reqKeyNames = new ArrayList<String>();
this.reqKeyNames.add("date");
this.reqKeyNames.add("type");
this.reqKeyNames.add("raw");
}
public SSPigLoader(String...keysAsStrings) {
this.reqKeyNames = new ArrayList<String>();
for(String key : keysAsStrings) {
reqKeyNames.add(key);
}
}
public SSPigLoader(String keysAsStringsDelimited) {
StringTokenizer strToken = new StringTokenizer(keysAsStringsDelimited, " ");
this.reqKeyNames = new ArrayList<String>();
while(strToken.hasMoreTokens()){
reqKeyNames.add(strToken.nextToken());
}
}
@Override
public Tuple getNext() throws IOException {
try {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
Text value = (Text) in.getCurrentValue();
byte[] buf = value.getBytes();
int len = value.getLength();
int start = 0;
int fieldID = -1;
for (int i = 0; i < len; i++) {
if (buf[i] == fieldDel) {
fieldID++;
readField(buf, start, i, false, fieldID);
// Skip the delimiter itself
start = i + 1;
}
}
fieldID++;
// pick up the last field
readField(buf, start, len, true, fieldID);
Tuple t = tupleFactory.newTupleNoCopy(tokensArrayList);
tokensArrayList = null;
System.out.println("Answer:" + t.toString());
return t;
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e);
}
}
private void readField(byte[] buf, int start, int end, boolean is_value_json, int fieldID) {
if (tokensArrayList == null) {
tokensArrayList = new ArrayList<Object>();
}
if (start == end) {
tokensArrayList.add(null);
} else {
String valueStr = new String(buf, start, end-start);
if(!is_value_json){
/*
* These is done just for fields r not (key, value) pair i.e.
* basically they don't have a name embedded in the log. So need
* position based indexing.
*/
if((fieldID == 0 && reqKeyNames.contains("date")) || (fieldID == 1 && reqKeyNames.contains("type"))){
tokensArrayList.add(valueStr);
}
}else{
if(fieldID == 2 && reqKeyNames.contains("raw")){
tokensArrayList.add(valueStr);
}
JSONObject json = null;
try {
json = (JSONObject)new JSONParser().parse(valueStr);
Set<Object> keys = json.keySet();
Iterator<Object> ir = keys.iterator();
while(ir.hasNext()){
String key = ir.next().toString();
if(reqKeyNames.contains(key)){
tokensArrayList.add(json.get(key));
}
}
} catch (ParseException e) {
System.out.println(e.getMessage());
}
}
}
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
in = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment