Skip to content

Instantly share code, notes, and snippets.

View tuanchris's full-sized avatar

Tuan Nguyen tuanchris

View GitHub Profile
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
default_args = {
'owner': 'Tuan Nguyen',
'depends_on_past': False,
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
default_args = {
'owner': 'Tuan Nguyen',
'depends_on_past': False,
project_id = 'cloud-data-lake'
staging_dataset = 'IMMIGRATION_DWH_STAGING'
dwh_dataset = 'IMMIGRATION_DWH'
gs_bucket = 'cloud-data-lake-gcp'
dag = DAG('cloud-data-lake-pipeline',
start_date=datetime.now(),
schedule_interval='@once',
concurrency=5,
max_active_runs=1,
load_us_cities_demo = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_us_cities_demo',
bucket = gs_bucket,
source_objects = ['cities/us-cities-demographics.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.us_cities_demo',
schema_object = 'cities/us_cities_demo.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
field_delimiter=';',
skip_leading_rows = 1
check_us_cities_demo = BigQueryCheckOperator(
task_id = 'check_us_cities_demo',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.us_cities_demo`'
)
check_airports = BigQueryCheckOperator(
task_id = 'check_airports',
use_legacy_sql=False,
# Transform, load, and check fact data
create_immigration_data = BigQueryOperator(
task_id = 'create_immigration_data',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/F_IMMIGRATION_DATA.sql'
# Define task dependencies
dag >> start_pipeline >> [load_us_cities_demo, load_airports, load_weather, load_immigration_data]
load_us_cities_demo >> check_us_cities_demo
load_airports >> check_airports
load_weather >> check_weather
load_immigration_data >> check_immigration_data
[check_us_cities_demo, check_airports, check_weather,check_immigration_data] >> loaded_data_to_staging
@tuanchris
tuanchris / project.tf
Last active August 15, 2020 06:59
cloud-iac
# Project definition
resource "google_project" "data-lake" {
name = "Data Lake"
project_id = "cloud-iac-data-lake" # Replace with an unique project ID
billing_account = "" # Replace with your billing ID
}
resource "google_project" "data-warehouse" {
name = "Data Warehouse"
project_id = "cloud-iac-data-warehouse" # Replace with an unique project ID
@tuanchris
tuanchris / gcs.tf
Last active August 15, 2020 05:56
cloud-iac-2
resource "google_storage_bucket" "gcs-data-lake-landing" {
name = "${google_project.data-lake.project_id}-landing"
project = google_project.data-lake.project_id
location = local.region
force_destroy = true
storage_class = "STANDARD"
}
resource "google_storage_bucket" "gcs-data-lake-sensitive" {
name = "${google_project.data-lake.project_id}-sensitive"
@tuanchris
tuanchris / gcs.tf
Created August 15, 2020 06:10
cloud-iac-2
resource "google_storage_bucket_acl" "gcs-data-lake-landing-acl" {
bucket = google_storage_bucket.gcs-data-lake-landing.name
role_entity = [
"OWNER:${local.unique_id}-de@gmail.com",
"READER:${local.unique_id}-ds@gmail.com",
]
}
resource "google_storage_bucket_acl" "gcs-data-lake-sensitive-acl" {
bucket = google_storage_bucket.gcs-data-lake-sensitive.name