Skip to content

Instantly share code, notes, and snippets.

View sandboxws's full-sized avatar
🏠
Debug → Analyze → Refactor → Performance Gains → Repeat

Ahmed Elhossaini sandboxws

🏠
Debug → Analyze → Refactor → Performance Gains → Repeat
View GitHub Profile
@sandboxws
sandboxws / get_addons_host.gql
Created March 17, 2020 17:23
GetAddOns as a host
{
"operationName": "GetAddOns",
"variables": {
"hostId": "5dd43565611ffd0029bc80b2",
"listingId": "5e6b8d8ed39f1e00505f0294",
"currency": "CAD",
"offset": 0,
"limit": 15,
"state": "active"
},
// Extract errors PCollectionTuple
PCollection<String> bqTableRowsErrors =
bqTableRowsTuple.get(bqTableRowsFailedTag)
.setCoder(NullableCoder.of(StringUtf8Coder.of()));
// Log errors to a text file under cloud storage.
bqTableRowsErrors
.apply(
"Write Errors",
TextIO.write().to("gs://beam-tutorial/album_errors.txt")
PCollectionTuple bqTableRowsTuple =
rows.apply(
"HashMap to TableRow",
ParDo.of(new HashMapToTableRowFn(bqTableRowsSuccessTag, bqTableRowsFailedTag))
.withOutputTags(bqTableRowsSuccessTag, TupleTagList.of(bqTableRowsFailedTag))
);
final TupleTag<TableRow> bqTableRowsSuccessTag =
new TupleTag<TableRow>() {};
final TupleTag<String> bqTableRowsFailedTag =
new TupleTag<String>() {};
package com.sandboxws.chinook;
import com.google.api.services.bigquery.model.TableRow;
import com.sandboxws.beam.AppOptions;
import com.sandboxws.beam.coders.TableRowCoder;
import com.sandboxws.chinook.bigquery.schema.AlbumTableSchema;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
gradle albums --args="--pgDatabase=chinook_development --pgUsername=root --project=GOOGLE_CLOUD_PROJECT_ID --outputTable=dwh.albums --tempLocation=gs://beam_tutorial/temp --stagingLocation=gs://beam_tutorial/staging"
bqTableRows.apply("Write to BigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputTable()) // Passed as an argument from the command line
.withSchema(AlbumTableSchema.schema()) // The schema for the BigQuery table
.ignoreUnknownValues() // Ignore any values passed but not defined on the table schema
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) // Append to the BigQuery table.
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) // Create the BigQuery table if it doesn't exist
);
PCollection<TableRow> bqTableRows = rows.apply(
"HashMap to TableRow",
ParDo.of(new HashMapToTableRowFn())
).setCoder(TableRowJsonCoder.of());
AppOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(AppOptions.class);
PipelineOptionsFactory.register(AppOptions.class);