Skip to content

Instantly share code, notes, and snippets.

@mrshu
Last active June 7, 2019 08:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrshu/f04dd8e39963dcf4dd243b8075b4b3b0 to your computer and use it in GitHub Desktop.
Save mrshu/f04dd8e39963dcf4dd243b8075b4b3b0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import pandas as pd
import sys
pd.read_csv(sys.argv[1]).to_parquet(sys.argv[2])
from airflow.models import DAG
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
from datetime import datetime
class XComEnabledAWSAthenaOperator(AWSAthenaOperator):
def execute(self, context):
super(XComEnabledAWSAthenaOperator, self).execute(context)
# just so that this gets `xcom_push`(ed)
return self.query_execution_id
with DAG(dag_id='athena_query_and_move',
schedule_interval=None,
start_date=datetime(2019, 6, 7)) as dag:
run_query = XComEnabledAWSAthenaOperator(
task_id='run_query',
query='SELECT * FROM UNNEST(SEQUENCE(0, 100))',
output_location='s3://my-bucket/my-path/',
database='my_database'
)
move_results = S3FileTransformOperator(
task_id='move_results',
source_s3_key='s3://mybucket/mypath/{{ task_instance.xcom_pull(task_ids="run_query") }}.csv',
dest_s3_key='s3://mybucket/otherpath/myresults.parquet',
transform_script='csv_to_parquet.py'
)
move_results.set_upstream(run_query)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment