Created
August 13, 2012 15:43
-
-
Save sureshsaggar/3342047 to your computer and use it in GitHub Desktop.
Custom loader UDF extending LoadFunc of PIG Latin.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ss.analytics.pig; | |
public class SSPigLoader extends LoadFunc{ | |
private byte fieldDel = '|'; | |
protected RecordReader in = null; | |
private ArrayList<Object> tokensArrayList = null; | |
private TupleFactory tupleFactory = TupleFactory.getInstance(); | |
private List<String> reqKeyNames; | |
@Override | |
public InputFormat getInputFormat() throws IOException { | |
return new TextInputFormat(); | |
} | |
public SSPigLoader() { | |
this.reqKeyNames = new ArrayList<String>(); | |
this.reqKeyNames.add("date"); | |
this.reqKeyNames.add("type"); | |
this.reqKeyNames.add("raw"); | |
} | |
public SSPigLoader(String...keysAsStrings) { | |
this.reqKeyNames = new ArrayList<String>(); | |
for(String key : keysAsStrings) { | |
reqKeyNames.add(key); | |
} | |
} | |
public SSPigLoader(String keysAsStringsDelimited) { | |
StringTokenizer strToken = new StringTokenizer(keysAsStringsDelimited, " "); | |
this.reqKeyNames = new ArrayList<String>(); | |
while(strToken.hasMoreTokens()){ | |
reqKeyNames.add(strToken.nextToken()); | |
} | |
} | |
@Override | |
public Tuple getNext() throws IOException { | |
try { | |
boolean notDone = in.nextKeyValue(); | |
if (!notDone) { | |
return null; | |
} | |
Text value = (Text) in.getCurrentValue(); | |
byte[] buf = value.getBytes(); | |
int len = value.getLength(); | |
int start = 0; | |
int fieldID = -1; | |
for (int i = 0; i < len; i++) { | |
if (buf[i] == fieldDel) { | |
fieldID++; | |
readField(buf, start, i, false, fieldID); | |
// Skip the delimiter itself | |
start = i + 1; | |
} | |
} | |
fieldID++; | |
// pick up the last field | |
readField(buf, start, len, true, fieldID); | |
Tuple t = tupleFactory.newTupleNoCopy(tokensArrayList); | |
tokensArrayList = null; | |
System.out.println("Answer:" + t.toString()); | |
return t; | |
} catch (InterruptedException e) { | |
int errCode = 6018; | |
String errMsg = "Error while reading input"; | |
throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e); | |
} | |
} | |
private void readField(byte[] buf, int start, int end, boolean is_value_json, int fieldID) { | |
if (tokensArrayList == null) { | |
tokensArrayList = new ArrayList<Object>(); | |
} | |
if (start == end) { | |
tokensArrayList.add(null); | |
} else { | |
String valueStr = new String(buf, start, end-start); | |
if(!is_value_json){ | |
/* | |
* These is done just for fields r not (key, value) pair i.e. | |
* basically they don't have a name embedded in the log. So need | |
* position based indexing. | |
*/ | |
if((fieldID == 0 && reqKeyNames.contains("date")) || (fieldID == 1 && reqKeyNames.contains("type"))){ | |
tokensArrayList.add(valueStr); | |
} | |
}else{ | |
if(fieldID == 2 && reqKeyNames.contains("raw")){ | |
tokensArrayList.add(valueStr); | |
} | |
JSONObject json = null; | |
try { | |
json = (JSONObject)new JSONParser().parse(valueStr); | |
Set<Object> keys = json.keySet(); | |
Iterator<Object> ir = keys.iterator(); | |
while(ir.hasNext()){ | |
String key = ir.next().toString(); | |
if(reqKeyNames.contains(key)){ | |
tokensArrayList.add(json.get(key)); | |
} | |
} | |
} catch (ParseException e) { | |
System.out.println(e.getMessage()); | |
} | |
} | |
} | |
} | |
@Override | |
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { | |
in = reader; | |
} | |
@Override | |
public void setLocation(String location, Job job) throws IOException { | |
FileInputFormat.setInputPaths(job, location); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment