This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
subscriptions: | |
- subscriptionName: projects/my-project/subscriptions/backup.radar.aliens-detected | |
topicName: radar.aliens-detected | |
displayName: aliens-detected | |
- subscriptionName: projects/my-project/subscriptions/backup.tower.spaceships-launched | |
topicName: tower.spaceships-launched | |
displayName: spaceships-launched | |
timestampAttribute: event_ts |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pubsub.apply(Flatten.<TableRow>pCollections()) | |
.apply("Fixed Windows", | |
Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1)))) | |
.apply("BQWrite", BigQueryIO.writeTableRows() | |
.to(TableRefPartition.perDay( | |
"my-project", | |
"my-dataset", | |
"my-tables")) | |
.withSchema(SchemaFor.backupEvent()) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package backup; | |
import com.google.api.services.bigquery.model.TableReference; | |
import com.google.api.services.bigquery.model.TableRow; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.values.ValueInSingleWindow; | |
import org.joda.time.format.DateTimeFormat; | |
import org.joda.time.format.DateTimeFormatter; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.apply("SessionWindows for products", Window.<Event>into(Sessions.withGapDuration(Duration.standardDays(10))) | |
.triggering(Repeatedly.forever(AfterProcessingTime | |
.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardSeconds(60)) | |
).orFinally(AfterWatermark.pastEndOfWindow())) | |
.accumulatingFiredPanes() | |
.withAllowedLateness(Duration.ZERO) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
return input.apply("UserSessionWindows", | |
Window.<Event>into(Sessions.withGapDuration(Duration.standardMinutes(20))) | |
.withAllowedLateness(Duration.standardHours(1))) | |
.apply("SessionSwitch", ParDo.of(new SessionSwitchFn())); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gcs2bq_avro_with_schema = GoogleCloudStorageToBigQueryOperator( | |
task_id='gcs2bq_avro_with_schema', | |
bucket='{{var.value.gcs_bucket}}', | |
source_objects=[ | |
'path/{{ ds_nodash }}/part-*' | |
], | |
destination_project_dataset_table='{{var.value.gcq_tempset}}.avro_with_schema{{ ds_nodash }}', | |
source_format='AVRO', | |
schema_fields=schemas.gsob(), | |
create_disposition='CREATE_IF_NEEDED', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bq_to_gcp_avro = BigQueryToCloudStorageOperator( | |
task_id='bq_to_gcp_avro', | |
source_project_dataset_table='{{var.value.gcq_dataset}}.gsod_partition{{ ds_nodash }}', | |
destination_cloud_storage_uris=[ | |
'gs://{{var.value.gcs_bucket}}/path/{{ ds_nodash }}/part-*.avro' | |
], | |
export_format='AVRO', | |
bigquery_conn_id='gcp_smoke', | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#standardSQL | |
SELECT | |
..., | |
station.lon AS station_longitude | |
FROM | |
`bigquery-public-data.noaa_gsod.gsod{{execution_date.year-8}}` AS gsob | |
JOIN | |
`bigquery-public-data.noaa_gsod.stations` AS station | |
ON | |
CAST(gsob.stn AS INT64)=station.usaf |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with DAG('gcp_smoke_bq', schedule_interval=timedelta(days=1), | |
default_args=default_args) as dag: | |
bq_extract_one_day = BigQueryOperator( | |
task_id='bq_extract_one_day', | |
bql='gcp_smoke/gsob_extract_day.sql', | |
destination_dataset_table= | |
'airflow.gsod_partition{{ ds_nodash }}', | |
write_disposition='WRITE_TRUNCATE', | |
bigquery_conn_id='gcp_smoke', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from logging import config | |
logger = logging.getLogger('my-app') | |
if __name__ == "__main__": | |
config.fileConfig("logging.ini", disable_existing_loggers=False) | |
logger.info("Yes, we love Cloud Logging") |
NewerOlder