This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.contrib.operators.bigquery_operator import BigQueryOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from datetime import datetime, timedelta | |
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator | |
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator | |
default_args = { | |
'owner': 'Tuan Nguyen', | |
'depends_on_past': False, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.contrib.operators.bigquery_operator import BigQueryOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from datetime import datetime, timedelta | |
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator | |
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator | |
default_args = { | |
'owner': 'Tuan Nguyen', | |
'depends_on_past': False, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
project_id = 'cloud-data-lake' | |
staging_dataset = 'IMMIGRATION_DWH_STAGING' | |
dwh_dataset = 'IMMIGRATION_DWH' | |
gs_bucket = 'cloud-data-lake-gcp' | |
dag = DAG('cloud-data-lake-pipeline', | |
start_date=datetime.now(), | |
schedule_interval='@once', | |
concurrency=5, | |
max_active_runs=1, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
check_us_cities_demo = BigQueryCheckOperator( | |
task_id = 'check_us_cities_demo', | |
use_legacy_sql=False, | |
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.us_cities_demo`' | |
) | |
check_airports = BigQueryCheckOperator( | |
task_id = 'check_airports', | |
use_legacy_sql=False, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
load_us_cities_demo = GoogleCloudStorageToBigQueryOperator( | |
task_id = 'load_us_cities_demo', | |
bucket = gs_bucket, | |
source_objects = ['cities/us-cities-demographics.csv'], | |
destination_project_dataset_table = f'{project_id}:{staging_dataset}.us_cities_demo', | |
schema_object = 'cities/us_cities_demo.json', | |
write_disposition='WRITE_TRUNCATE', | |
source_format = 'csv', | |
field_delimiter=';', | |
skip_leading_rows = 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define task dependencies | |
dag >> start_pipeline >> [load_us_cities_demo, load_airports, load_weather, load_immigration_data] | |
load_us_cities_demo >> check_us_cities_demo | |
load_airports >> check_airports | |
load_weather >> check_weather | |
load_immigration_data >> check_immigration_data | |
[check_us_cities_demo, check_airports, check_weather,check_immigration_data] >> loaded_data_to_staging |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
resource "google_storage_bucket" "gcs-data-lake-landing" { | |
name = "${google_project.data-lake.project_id}-landing" | |
project = google_project.data-lake.project_id | |
location = local.region | |
force_destroy = true | |
storage_class = "STANDARD" | |
} | |
resource "google_storage_bucket" "gcs-data-lake-sensitive" { | |
name = "${google_project.data-lake.project_id}-sensitive" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
resource "google_storage_bucket_acl" "gcs-data-lake-landing-acl" { | |
bucket = google_storage_bucket.gcs-data-lake-landing.name | |
role_entity = [ | |
"OWNER:${local.unique_id}-de@gmail.com", | |
"READER:${local.unique_id}-ds@gmail.com", | |
] | |
} | |
resource "google_storage_bucket_acl" "gcs-data-lake-sensitive-acl" { | |
bucket = google_storage_bucket.gcs-data-lake-sensitive.name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
resource "google_storage_bucket_acl" "gcs-data-lake-landing-acl" { | |
bucket = google_storage_bucket.gcs-data-lake-landing.name | |
role_entity = [ | |
"OWNER:${local.unique_id}-de@gmail.com", | |
"READER:${local.unique_id}-ds@gmail.com", | |
] | |
} | |
resource "google_storage_bucket_acl" "gcs-data-lake-sensitive-acl" { | |
bucket = google_storage_bucket.gcs-data-lake-sensitive.name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Project definition | |
resource "google_project" "data-lake" { | |
name = "Data Lake" | |
project_id = "cloud-iac-data-lake" # Replace with an unique project ID | |
billing_account = "" # Replace with your billing ID | |
} | |
resource "google_project" "data-warehouse" { | |
name = "Data Warehouse" | |
project_id = "cloud-iac-data-warehouse" # Replace with an unique project ID |
OlderNewer