Created
June 5, 2023 22:44
-
-
Save naturalett/3ec8e7ba690577ea279063408533b941 to your computer and use it in GitHub Desktop.
Airflow running bash tasks on AWS Batch Operator based on tasks.txt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.operators.bash import BashOperator | |
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator | |
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator | |
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator | |
from airflow.operators.python_operator import PythonOperator | |
import logging | |
import json | |
LOGGER = logging.getLogger("airflow.task") | |
LOGGER.info("airflow.task >>> 2 - INFO logger test") | |
BUCKET='workshop-111-222-333' | |
LOCAL_FILE_DOWNLOAD='/tmp/tasks.txt' | |
FILE_IN_S3='tasks.txt' | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'email': ['admin@admin.com'], | |
'email_on_failure': True, | |
'email_on_retry': False, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5), | |
} | |
def download_file(source, bucket, key): | |
import boto3 | |
s3 = boto3.resource('s3') | |
s3.Bucket(bucket).download_file(source, key) | |
def download_from_s3(): | |
PythonOperator( | |
task_id='download_from_s3', | |
python_callable=download_file, | |
op_kwargs={'bucket': BUCKET, 'key': LOCAL_FILE_DOWNLOAD, 'source': FILE_IN_S3} | |
) | |
def print_txt(): | |
return BashOperator( | |
task_id='print_txt', | |
bash_command=f'cat {LOCAL_FILE_DOWNLOAD}' | |
) | |
batch_params = [ | |
"sh","-c", | |
"yum install awscli -y; \ | |
aws s3 cp s3://%s/%s .; \ | |
LINE=$((AWS_BATCH_JOB_ARRAY_INDEX + 1)); \ | |
eval $(sed -n ${LINE}p %s)" % (BUCKET, FILE_IN_S3, FILE_IN_S3) | |
] | |
containerOverrides={ | |
'command': batch_params, | |
'environment': [ | |
{ | |
'name': 'BUCKET', | |
'value': BUCKET | |
}, | |
{ | |
'name': 'FILE', | |
'value': FILE_IN_S3 | |
} | |
] | |
} | |
def run_aws_batch_operator(ti=None): | |
AwsBatchOperator( | |
job_name=f'task-batch-based-s3', | |
task_id=f'task-batch-based-s3', | |
array_properties={ "size": read_file_lines()}, | |
overrides=containerOverrides, | |
job_definition='getting-started', | |
job_queue='getting-started' | |
) | |
def read_file_lines(**kwargs): | |
try: | |
return len(open(LOCAL_FILE_DOWNLOAD).readlines()) | |
except: | |
return False | |
def read_txt(**kwargs): | |
try: | |
text_file = open(f'{LOCAL_FILE_DOWNLOAD}', 'r') | |
tasks = text_file.read().split(',') | |
ti = kwargs['ti'] | |
ti.xcom_push(key='campaign_id', value=tasks) | |
return tasks | |
except: | |
return False | |
with DAG( | |
'tasks_batch_based_s3', | |
default_args=default_args, | |
start_date=datetime(2021, 1, 1), | |
schedule_interval=None, | |
catchup=False | |
) as dag: | |
download_from_s3() | |
print_txt() | |
read_txt() | |
tasks = PythonOperator( | |
task_id='read_it', | |
python_callable=read_txt, | |
) | |
run_aws_batch_operator() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment