Skip to content

Instantly share code, notes, and snippets.

subscriptions:
- subscriptionName: projects/my-project/subscriptions/backup.radar.aliens-detected
topicName: radar.aliens-detected
displayName: aliens-detected
- subscriptionName: projects/my-project/subscriptions/backup.tower.spaceships-launched
topicName: tower.spaceships-launched
displayName: spaceships-launched
timestampAttribute: event_ts
pubsub.apply(Flatten.<TableRow>pCollections())
.apply("Fixed Windows",
Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("BQWrite", BigQueryIO.writeTableRows()
.to(TableRefPartition.perDay(
"my-project",
"my-dataset",
"my-tables"))
.withSchema(SchemaFor.backupEvent())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
package backup;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
.apply("SessionWindows for products", Window.<Event>into(Sessions.withGapDuration(Duration.standardDays(10)))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60))
).orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
return input.apply("UserSessionWindows",
Window.<Event>into(Sessions.withGapDuration(Duration.standardMinutes(20)))
.withAllowedLateness(Duration.standardHours(1)))
.apply("SessionSwitch", ParDo.of(new SessionSwitchFn()));
gcs2bq_avro_with_schema = GoogleCloudStorageToBigQueryOperator(
task_id='gcs2bq_avro_with_schema',
bucket='{{var.value.gcs_bucket}}',
source_objects=[
'path/{{ ds_nodash }}/part-*'
],
destination_project_dataset_table='{{var.value.gcq_tempset}}.avro_with_schema{{ ds_nodash }}',
source_format='AVRO',
schema_fields=schemas.gsob(),
create_disposition='CREATE_IF_NEEDED',
bq_to_gcp_avro = BigQueryToCloudStorageOperator(
task_id='bq_to_gcp_avro',
source_project_dataset_table='{{var.value.gcq_dataset}}.gsod_partition{{ ds_nodash }}',
destination_cloud_storage_uris=[
'gs://{{var.value.gcs_bucket}}/path/{{ ds_nodash }}/part-*.avro'
],
export_format='AVRO',
bigquery_conn_id='gcp_smoke',
)
#standardSQL
SELECT
...,
station.lon AS station_longitude
FROM
`bigquery-public-data.noaa_gsod.gsod{{execution_date.year-8}}` AS gsob
JOIN
`bigquery-public-data.noaa_gsod.stations` AS station
ON
CAST(gsob.stn AS INT64)=station.usaf
with DAG('gcp_smoke_bq', schedule_interval=timedelta(days=1),
default_args=default_args) as dag:
bq_extract_one_day = BigQueryOperator(
task_id='bq_extract_one_day',
bql='gcp_smoke/gsob_extract_day.sql',
destination_dataset_table=
'airflow.gsod_partition{{ ds_nodash }}',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='gcp_smoke',
import logging
from logging import config
logger = logging.getLogger('my-app')
if __name__ == "__main__":
config.fileConfig("logging.ini", disable_existing_loggers=False)
logger.info("Yes, we love Cloud Logging")