Hackademy - BigQueryClient
package ar.com.zupcat.lib.bean.audit; | |
import ar.com.zupcat.lib.bean.IClosure; | |
import ar.com.zupcat.lib.bean.RetryingExecutor; | |
import ar.com.zupcat.lib.bean.enums.ErrorType; | |
import ar.com.zupcat.lib.exception.GAEException; | |
import ar.com.zupcat.lib.util.RandomUtils; | |
import com.google.api.client.util.Data; | |
import com.google.api.services.bigquery.Bigquery; | |
import com.google.api.services.bigquery.model.*; | |
import java.io.IOException; | |
import java.util.*; | |
public final class BigQueryClient { | |
private final String projectNumberId; | |
private final Bigquery bigquery; | |
protected BigQueryClient(final String _projectNumberId, final Bigquery _bigquery) { | |
this.projectNumberId = _projectNumberId; | |
this.bigquery = _bigquery; | |
} | |
public void createDatasetAndTable(final String datasetName, final String tableName, final LinkedHashMap<String, String> fields) { | |
try { | |
final Dataset dataset = new Dataset(); | |
final DatasetReference datasetRef = new DatasetReference(); | |
datasetRef.setProjectId(projectNumberId); | |
datasetRef.setDatasetId(datasetName); | |
dataset.setDatasetReference(datasetRef); | |
bigquery.datasets().insert(projectNumberId, dataset).execute(); | |
final TableSchema schema = new TableSchema(); | |
final List<TableFieldSchema> tableFieldSchema = new ArrayList<>(); | |
for (final Map.Entry<String, String> entry : fields.entrySet()) { | |
final TableFieldSchema schemaEntry = new TableFieldSchema(); | |
schemaEntry.setName(entry.getKey()); | |
schemaEntry.setType(entry.getValue()); | |
tableFieldSchema.add(schemaEntry); | |
} | |
schema.setFields(tableFieldSchema); | |
final Table table = new Table(); | |
table.setSchema(schema); | |
final TableReference tableRef = new TableReference(); | |
tableRef.setDatasetId(datasetName); | |
tableRef.setProjectId(projectNumberId); | |
tableRef.setTableId(tableName); | |
table.setTableReference(tableRef); | |
bigquery.tables().insert(projectNumberId, datasetName, table).execute(); | |
} catch (final Exception _exception) { | |
throw new GAEException(ErrorType.PROGRAMMING, _exception); | |
} | |
} | |
private void saveRows(final List<TableRow> rows, final List<Map<String, Object>> result) { | |
if (rows != null) { | |
for (final TableRow row : rows) { | |
int i = 0; | |
final Map<String, Object> map = new HashMap<>(); | |
result.add(map); | |
for (final TableCell cell : row.getF()) { | |
map.put("" + i, Data.isNull(cell.getV()) ? "null" : cell.getV().toString()); | |
i++; | |
} | |
} | |
} | |
} | |
public List<Map<String, Object>> executeQuery(final String query) { | |
final List<Map<String, Object>> result = new ArrayList<>(); | |
try { | |
final QueryRequest queryRequest = new QueryRequest().setQuery(query); | |
final QueryResponse queryResponse = bigquery.jobs().query(projectNumberId, queryRequest).execute(); | |
if (queryResponse.getJobComplete()) { | |
saveRows(queryResponse.getRows(), result); | |
if (null == queryResponse.getPageToken()) { | |
return result; | |
} | |
} | |
// This loop polls until results are present, then loops over result pages. | |
String pageToken = null; | |
while (true) { | |
final GetQueryResultsResponse queryResults = bigquery.jobs() | |
.getQueryResults(projectNumberId, queryResponse.getJobReference() | |
.getJobId()) | |
.setPageToken(pageToken).execute(); | |
if (queryResults.getJobComplete()) { | |
saveRows(queryResults.getRows(), result); | |
pageToken = queryResults.getPageToken(); | |
if (null == pageToken || queryResults.getRows() == null) { | |
return result; | |
} | |
} | |
} | |
} catch (final Exception _exception) { | |
throw new GAEException(ErrorType.PROGRAMMING, _exception); | |
} | |
} | |
public TableDataInsertAllResponse insertRow(final String datasetName, final String tableName, final Map<String, Object> rowValues) { | |
final List<Map<String, Object>> items = new ArrayList<>(); | |
items.add(rowValues); | |
return insertRows(datasetName, tableName, items); | |
} | |
public TableDataInsertAllResponse insertRows(final String datasetName, final String tableName, final List<Map<String, Object>> rowValues) { | |
try { | |
final List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<>(rowValues.size()); | |
long randomValue = RandomUtils.getInstance().getRandomLong() - 10000; | |
for (final Map<String, Object> rowValue : rowValues) { | |
final TableRow row = new TableRow(); | |
for (final Map.Entry<String, Object> entry : rowValue.entrySet()) { | |
row.set(entry.getKey(), entry.getValue()); | |
} | |
final TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows(); | |
rows.setInsertId("" + randomValue); | |
rows.setJson(row); | |
rowList.add(rows); | |
randomValue++; | |
} | |
final TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList); | |
final TableDataInsertAllResponse[] responses = new TableDataInsertAllResponse[1]; | |
responses[0] = null; | |
final RetryingExecutor retryingExecutor = new RetryingExecutor(5, 1000, new IClosure() { | |
@Override | |
public void execute(final Object params) throws Exception { | |
responses[0] = bigquery.tabledata().insertAll(projectNumberId, datasetName, tableName, content).execute(); | |
} | |
}, null); | |
retryingExecutor.startExecution(); | |
return responses[0]; | |
} catch (final Exception _exception) { | |
throw new GAEException(ErrorType.PROGRAMMING, _exception); | |
} | |
} | |
public List<String> getAllTableNames() { | |
final List<String> result = new ArrayList<>(); | |
try { | |
final Bigquery.Datasets.List datasetRequest = bigquery.datasets().list(projectNumberId); | |
final DatasetList datasetList = datasetRequest.execute(); | |
for (final DatasetList.Datasets dataset : datasetList.getDatasets()) { | |
result.add(dataset.getDatasetReference().getDatasetId()); | |
} | |
} catch (final IOException _ioException) { | |
throw new GAEException(ErrorType.PROGRAMMING, _ioException); | |
} | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment