Created
March 3, 2021 21:50
-
-
Save anna-anisienia/ae0218cab383cba3e551a8dc0ce9861b to your computer and use it in GitHub Desktop.
for medium post
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import boto3 | |
import tempfile | |
from airflow import DAG | |
from airflow.decorators import task | |
from airflow.utils.dates import days_ago | |
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator | |
from airflow.utils.task_group import TaskGroup | |
S3_BUCKET = 'test_bucket_123' | |
S3_KEY = 'some_data.txt' | |
default_args = {'start_date': days_ago(1)} | |
@task | |
def extract_task_source_1(): | |
return 'some extracted data' | |
@task | |
def extract_task_source_2(): | |
return 'other extracted data' | |
@task | |
def transform_task_1(data): | |
return data.replace('extracted', 'transformed') | |
@task | |
def transform_task_2(data): | |
return data.replace('extracted', 'transformed') | |
@task | |
def merge_data(data_1, data_2): | |
return data_1 + data_2 | |
@task | |
def load_data_to_s3(data): | |
with tempfile.TemporaryDirectory() as tempdir: | |
local_path = os.path.join(tempdir, 'data.txt') | |
with open(local_path, 'w') as myfile: | |
myfile.write(data) | |
s3_client = boto3.client('s3') | |
s3_client.upload_file(local_path, S3_BUCKET, S3_KEY) | |
with DAG('ex_taskflow_taskgroup', | |
schedule_interval='@once', | |
default_args=default_args, | |
catchup=False) as dag: | |
with TaskGroup('source_1') as source_1: | |
data = extract_task_source_1() | |
transformed_data_1 = transform_task_1(data) | |
with TaskGroup('source_2') as source_2: | |
data = extract_task_source_2() | |
transformed_data_2 = transform_task_2(data) | |
final_data = merge_data(transformed_data_1, transformed_data_2) | |
redshift_task = S3ToRedshiftOperator(task_id="redshift_task", | |
redshift_conn_id="redshift_default", | |
aws_conn_id="aws_default", | |
schema="stage", | |
table="some_data", | |
s3_bucket=S3_BUCKET, | |
s3_key=S3_KEY) | |
redshift_task.set_upstream(load_data_to_s3(final_data)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment