Created
September 7, 2012 21:00
-
-
Save mattyb149/3669583 to your computer and use it in GitHub Desktop.
PDI UDJC code for Google Docs Fusion Table Input step
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.gdata.client.ClientLoginAccountType; | |
import com.google.gdata.client.GoogleService; | |
import com.google.gdata.client.Service.GDataRequest; | |
import com.google.gdata.client.Service.GDataRequest.RequestType; | |
import com.google.gdata.util.AuthenticationException; | |
import com.google.gdata.util.ContentType; | |
import com.google.gdata.util.ServiceException; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.io.OutputStreamWriter; | |
import java.net.URL; | |
import java.net.URLEncoder; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Date; | |
/** | |
* Google Fusion Tables API URL. | |
* All requests to the Google Fusion Tables service begin with this URL. | |
*/ | |
final String SERVICE_URL = "https://www.google.com/fusiontables/api/query"; | |
final SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy"); | |
/** | |
* Service to handle requests to Google Fusion Tables. | |
*/ | |
GoogleService service; | |
String tableIdField; | |
String emailField; | |
String passwordField; | |
// TODO - replace these with the desired fields from the table | |
String quoteField; | |
String whoField; | |
String dateField; | |
String ratingField; | |
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException | |
{ | |
// First, get a row from the default input hop | |
// | |
Object[] r = getRow(); | |
// If the row object is null, we are done processing. | |
// | |
if (r == null) { | |
setOutputDone(); | |
return false; | |
} | |
// Let's look up parameters only once for performance reason. | |
// | |
if (first) { | |
tableIdField = getParameter("TABLEID_FIELD"); | |
emailField = getParameter("EMAIL_FIELD"); | |
passwordField = getParameter("PWORD_FIELD"); | |
// TODO - Replace with the fields from the table | |
quoteField = getParameter("QUOTE_FIELD"); | |
whoField = getParameter("WHO_FIELD"); | |
dateField = getParameter("DATE_FIELD"); | |
ratingField = getParameter("RATING_FIELD"); | |
first=false; | |
} | |
try { | |
service = new GoogleService("fusiontables", "fusiontables.ApiExample"); | |
service.setUserCredentials(get(Fields.In,emailField).getString(r), get(Fields.In,passwordField).getString(r), ClientLoginAccountType.GOOGLE); | |
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large | |
// enough to handle any new fields you are creating in this step. | |
// | |
//Object[] outputRow = createOutputRow(null, data.outputRowMeta.size()); | |
String tableIdString = get(Fields.In,tableIdField).getString(r); | |
String query = "SELECT * FROM "+tableIdString; | |
boolean isUsingEncId = true; | |
String lowercaseQuery = query.toLowerCase(); | |
String encodedQuery = URLEncoder.encode(query, "UTF-8"); | |
GDataRequest request; | |
// If the query is a select, describe, or show query, run a GET request. | |
if (lowercaseQuery.startsWith("select") || | |
lowercaseQuery.startsWith("describe") || | |
lowercaseQuery.startsWith("show")) { | |
URL url = new URL(SERVICE_URL + "?sql=" + encodedQuery + "&encid=" + isUsingEncId); | |
request = service.getRequestFactory().getRequest(RequestType.QUERY, url, | |
ContentType.TEXT_PLAIN); | |
} else { | |
// Otherwise, run a POST request. | |
URL url = new URL(SERVICE_URL + "?encid=" + isUsingEncId); | |
request = service.getRequestFactory().getRequest(RequestType.INSERT, url, | |
new ContentType("application/x-www-form-urlencoded")); | |
OutputStreamWriter writer = new OutputStreamWriter(request.getRequestStream()); | |
writer.append("sql=" + encodedQuery); | |
writer.flush(); | |
} | |
request.execute(); | |
InputStreamReader inputStreamReader = new InputStreamReader(request.getResponseStream()); | |
BufferedReader bufferedStreamReader = new BufferedReader(inputStreamReader); | |
Object[] outputRow = new Object[5]; | |
for(int i=0;i<5;i++) {outputRow[i] = new String("");} | |
String line = bufferedStreamReader.readLine(); | |
line = bufferedStreamReader.readLine(); | |
while(line != null) { | |
Object[] cols = splitCSV(line,","); | |
data.outputRowMeta = new RowMeta(); | |
meta.getFields(data.outputRowMeta, null, null, null, null); | |
// Set the value in the output field | |
// | |
get(Fields.Out,tableIdField).setValue(outputRow,tableIdString); | |
get(Fields.Out,quoteField).setValue(outputRow,cols[0]); | |
get(Fields.Out,whoField).setValue(outputRow,cols[1]); | |
get(Fields.Out,dateField).setValue(outputRow,sdf.parse((String)cols[2])); | |
get(Fields.Out,ratingField).setValue(outputRow,cols[3]); | |
// putRow will send the row on to the default output hop. | |
// | |
putRow(data.outputRowMeta, outputRow); | |
line = bufferedStreamReader.readLine(); | |
} | |
} | |
catch(Exception e) {throw new KettleException(e.getLocalizedMessage());} | |
setOutputDone(); | |
return false; | |
} | |
public Object[] splitCSV(String line, String delimiter) { | |
if(line.contains("\"")) { | |
ArrayList strList = new ArrayList(); | |
boolean inString = false; | |
StringBuffer currString = new StringBuffer(""); | |
int currIndex = 0; | |
char currChar; | |
while(currIndex < line.length()) { | |
currChar = line.charAt(currIndex); | |
if(currChar == '"') { | |
inString = !inString; | |
} | |
else if(currChar == delimiter.charAt(0)) { | |
if(inString) | |
currString.append(currChar); | |
else { | |
strList.add(currString.toString()); | |
currString = new StringBuffer(""); | |
} | |
} | |
else { | |
currString.append(currChar); | |
} | |
currIndex++; | |
} | |
strList.add(currString.toString()); | |
return strList.toArray(new String[]{}); | |
} | |
else { | |
return line.split(delimiter); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment