Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created September 7, 2012 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mattyb149/3669583 to your computer and use it in GitHub Desktop.
Save mattyb149/3669583 to your computer and use it in GitHub Desktop.
PDI UDJC code for Google Docs Fusion Table Input step
import com.google.gdata.client.ClientLoginAccountType;
import com.google.gdata.client.GoogleService;
import com.google.gdata.client.Service.GDataRequest;
import com.google.gdata.client.Service.GDataRequest.RequestType;
import com.google.gdata.util.AuthenticationException;
import com.google.gdata.util.ContentType;
import com.google.gdata.util.ServiceException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
/**
* Google Fusion Tables API URL.
* All requests to the Google Fusion Tables service begin with this URL.
*/
final String SERVICE_URL = "https://www.google.com/fusiontables/api/query";
final SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy");
/**
* Service to handle requests to Google Fusion Tables.
*/
GoogleService service;
String tableIdField;
String emailField;
String passwordField;
// TODO - replace these with the desired fields from the table
String quoteField;
String whoField;
String dateField;
String ratingField;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop
//
Object[] r = getRow();
// If the row object is null, we are done processing.
//
if (r == null) {
setOutputDone();
return false;
}
// Let's look up parameters only once for performance reason.
//
if (first) {
tableIdField = getParameter("TABLEID_FIELD");
emailField = getParameter("EMAIL_FIELD");
passwordField = getParameter("PWORD_FIELD");
// TODO - Replace with the fields from the table
quoteField = getParameter("QUOTE_FIELD");
whoField = getParameter("WHO_FIELD");
dateField = getParameter("DATE_FIELD");
ratingField = getParameter("RATING_FIELD");
first=false;
}
try {
service = new GoogleService("fusiontables", "fusiontables.ApiExample");
service.setUserCredentials(get(Fields.In,emailField).getString(r), get(Fields.In,passwordField).getString(r), ClientLoginAccountType.GOOGLE);
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
//
//Object[] outputRow = createOutputRow(null, data.outputRowMeta.size());
String tableIdString = get(Fields.In,tableIdField).getString(r);
String query = "SELECT * FROM "+tableIdString;
boolean isUsingEncId = true;
String lowercaseQuery = query.toLowerCase();
String encodedQuery = URLEncoder.encode(query, "UTF-8");
GDataRequest request;
// If the query is a select, describe, or show query, run a GET request.
if (lowercaseQuery.startsWith("select") ||
lowercaseQuery.startsWith("describe") ||
lowercaseQuery.startsWith("show")) {
URL url = new URL(SERVICE_URL + "?sql=" + encodedQuery + "&encid=" + isUsingEncId);
request = service.getRequestFactory().getRequest(RequestType.QUERY, url,
ContentType.TEXT_PLAIN);
} else {
// Otherwise, run a POST request.
URL url = new URL(SERVICE_URL + "?encid=" + isUsingEncId);
request = service.getRequestFactory().getRequest(RequestType.INSERT, url,
new ContentType("application/x-www-form-urlencoded"));
OutputStreamWriter writer = new OutputStreamWriter(request.getRequestStream());
writer.append("sql=" + encodedQuery);
writer.flush();
}
request.execute();
InputStreamReader inputStreamReader = new InputStreamReader(request.getResponseStream());
BufferedReader bufferedStreamReader = new BufferedReader(inputStreamReader);
Object[] outputRow = new Object[5];
for(int i=0;i<5;i++) {outputRow[i] = new String("");}
String line = bufferedStreamReader.readLine();
line = bufferedStreamReader.readLine();
while(line != null) {
Object[] cols = splitCSV(line,",");
data.outputRowMeta = new RowMeta();
meta.getFields(data.outputRowMeta, null, null, null, null);
// Set the value in the output field
//
get(Fields.Out,tableIdField).setValue(outputRow,tableIdString);
get(Fields.Out,quoteField).setValue(outputRow,cols[0]);
get(Fields.Out,whoField).setValue(outputRow,cols[1]);
get(Fields.Out,dateField).setValue(outputRow,sdf.parse((String)cols[2]));
get(Fields.Out,ratingField).setValue(outputRow,cols[3]);
// putRow will send the row on to the default output hop.
//
putRow(data.outputRowMeta, outputRow);
line = bufferedStreamReader.readLine();
}
}
catch(Exception e) {throw new KettleException(e.getLocalizedMessage());}
setOutputDone();
return false;
}
public Object[] splitCSV(String line, String delimiter) {
if(line.contains("\"")) {
ArrayList strList = new ArrayList();
boolean inString = false;
StringBuffer currString = new StringBuffer("");
int currIndex = 0;
char currChar;
while(currIndex < line.length()) {
currChar = line.charAt(currIndex);
if(currChar == '"') {
inString = !inString;
}
else if(currChar == delimiter.charAt(0)) {
if(inString)
currString.append(currChar);
else {
strList.add(currString.toString());
currString = new StringBuffer("");
}
}
else {
currString.append(currChar);
}
currIndex++;
}
strList.add(currString.toString());
return strList.toArray(new String[]{});
}
else {
return line.split(delimiter);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment