Skip to content

Instantly share code, notes, and snippets.

@anna-anisienia
Created March 3, 2021 21:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-anisienia/ae0218cab383cba3e551a8dc0ce9861b to your computer and use it in GitHub Desktop.
Save anna-anisienia/ae0218cab383cba3e551a8dc0ce9861b to your computer and use it in GitHub Desktop.
for medium post
import os
import boto3
import tempfile
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.utils.task_group import TaskGroup
S3_BUCKET = 'test_bucket_123'
S3_KEY = 'some_data.txt'
default_args = {'start_date': days_ago(1)}
@task
def extract_task_source_1():
return 'some extracted data'
@task
def extract_task_source_2():
return 'other extracted data'
@task
def transform_task_1(data):
return data.replace('extracted', 'transformed')
@task
def transform_task_2(data):
return data.replace('extracted', 'transformed')
@task
def merge_data(data_1, data_2):
return data_1 + data_2
@task
def load_data_to_s3(data):
with tempfile.TemporaryDirectory() as tempdir:
local_path = os.path.join(tempdir, 'data.txt')
with open(local_path, 'w') as myfile:
myfile.write(data)
s3_client = boto3.client('s3')
s3_client.upload_file(local_path, S3_BUCKET, S3_KEY)
with DAG('ex_taskflow_taskgroup',
schedule_interval='@once',
default_args=default_args,
catchup=False) as dag:
with TaskGroup('source_1') as source_1:
data = extract_task_source_1()
transformed_data_1 = transform_task_1(data)
with TaskGroup('source_2') as source_2:
data = extract_task_source_2()
transformed_data_2 = transform_task_2(data)
final_data = merge_data(transformed_data_1, transformed_data_2)
redshift_task = S3ToRedshiftOperator(task_id="redshift_task",
redshift_conn_id="redshift_default",
aws_conn_id="aws_default",
schema="stage",
table="some_data",
s3_bucket=S3_BUCKET,
s3_key=S3_KEY)
redshift_task.set_upstream(load_data_to_s3(final_data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment