Skip to content

Instantly share code, notes, and snippets.

@hernanliendo
Created August 26, 2014 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hernanliendo/9a4d9c592a3feb6060cf to your computer and use it in GitHub Desktop.
Save hernanliendo/9a4d9c592a3feb6060cf to your computer and use it in GitHub Desktop.
Hackademy - BigQueryClient
package ar.com.zupcat.lib.bean.audit;
import ar.com.zupcat.lib.bean.IClosure;
import ar.com.zupcat.lib.bean.RetryingExecutor;
import ar.com.zupcat.lib.bean.enums.ErrorType;
import ar.com.zupcat.lib.exception.GAEException;
import ar.com.zupcat.lib.util.RandomUtils;
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.*;
import java.io.IOException;
import java.util.*;
public final class BigQueryClient {
private final String projectNumberId;
private final Bigquery bigquery;
protected BigQueryClient(final String _projectNumberId, final Bigquery _bigquery) {
this.projectNumberId = _projectNumberId;
this.bigquery = _bigquery;
}
public void createDatasetAndTable(final String datasetName, final String tableName, final LinkedHashMap<String, String> fields) {
try {
final Dataset dataset = new Dataset();
final DatasetReference datasetRef = new DatasetReference();
datasetRef.setProjectId(projectNumberId);
datasetRef.setDatasetId(datasetName);
dataset.setDatasetReference(datasetRef);
bigquery.datasets().insert(projectNumberId, dataset).execute();
final TableSchema schema = new TableSchema();
final List<TableFieldSchema> tableFieldSchema = new ArrayList<>();
for (final Map.Entry<String, String> entry : fields.entrySet()) {
final TableFieldSchema schemaEntry = new TableFieldSchema();
schemaEntry.setName(entry.getKey());
schemaEntry.setType(entry.getValue());
tableFieldSchema.add(schemaEntry);
}
schema.setFields(tableFieldSchema);
final Table table = new Table();
table.setSchema(schema);
final TableReference tableRef = new TableReference();
tableRef.setDatasetId(datasetName);
tableRef.setProjectId(projectNumberId);
tableRef.setTableId(tableName);
table.setTableReference(tableRef);
bigquery.tables().insert(projectNumberId, datasetName, table).execute();
} catch (final Exception _exception) {
throw new GAEException(ErrorType.PROGRAMMING, _exception);
}
}
private void saveRows(final List<TableRow> rows, final List<Map<String, Object>> result) {
if (rows != null) {
for (final TableRow row : rows) {
int i = 0;
final Map<String, Object> map = new HashMap<>();
result.add(map);
for (final TableCell cell : row.getF()) {
map.put("" + i, Data.isNull(cell.getV()) ? "null" : cell.getV().toString());
i++;
}
}
}
}
public List<Map<String, Object>> executeQuery(final String query) {
final List<Map<String, Object>> result = new ArrayList<>();
try {
final QueryRequest queryRequest = new QueryRequest().setQuery(query);
final QueryResponse queryResponse = bigquery.jobs().query(projectNumberId, queryRequest).execute();
if (queryResponse.getJobComplete()) {
saveRows(queryResponse.getRows(), result);
if (null == queryResponse.getPageToken()) {
return result;
}
}
// This loop polls until results are present, then loops over result pages.
String pageToken = null;
while (true) {
final GetQueryResultsResponse queryResults = bigquery.jobs()
.getQueryResults(projectNumberId, queryResponse.getJobReference()
.getJobId())
.setPageToken(pageToken).execute();
if (queryResults.getJobComplete()) {
saveRows(queryResults.getRows(), result);
pageToken = queryResults.getPageToken();
if (null == pageToken || queryResults.getRows() == null) {
return result;
}
}
}
} catch (final Exception _exception) {
throw new GAEException(ErrorType.PROGRAMMING, _exception);
}
}
public TableDataInsertAllResponse insertRow(final String datasetName, final String tableName, final Map<String, Object> rowValues) {
final List<Map<String, Object>> items = new ArrayList<>();
items.add(rowValues);
return insertRows(datasetName, tableName, items);
}
public TableDataInsertAllResponse insertRows(final String datasetName, final String tableName, final List<Map<String, Object>> rowValues) {
try {
final List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<>(rowValues.size());
long randomValue = RandomUtils.getInstance().getRandomLong() - 10000;
for (final Map<String, Object> rowValue : rowValues) {
final TableRow row = new TableRow();
for (final Map.Entry<String, Object> entry : rowValue.entrySet()) {
row.set(entry.getKey(), entry.getValue());
}
final TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows();
rows.setInsertId("" + randomValue);
rows.setJson(row);
rowList.add(rows);
randomValue++;
}
final TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList);
final TableDataInsertAllResponse[] responses = new TableDataInsertAllResponse[1];
responses[0] = null;
final RetryingExecutor retryingExecutor = new RetryingExecutor(5, 1000, new IClosure() {
@Override
public void execute(final Object params) throws Exception {
responses[0] = bigquery.tabledata().insertAll(projectNumberId, datasetName, tableName, content).execute();
}
}, null);
retryingExecutor.startExecution();
return responses[0];
} catch (final Exception _exception) {
throw new GAEException(ErrorType.PROGRAMMING, _exception);
}
}
public List<String> getAllTableNames() {
final List<String> result = new ArrayList<>();
try {
final Bigquery.Datasets.List datasetRequest = bigquery.datasets().list(projectNumberId);
final DatasetList datasetList = datasetRequest.execute();
for (final DatasetList.Datasets dataset : datasetList.getDatasets()) {
result.add(dataset.getDatasetReference().getDatasetId());
}
} catch (final IOException _ioException) {
throw new GAEException(ErrorType.PROGRAMMING, _ioException);
}
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment