Skip to content

Instantly share code, notes, and snippets.

@naturalett
Created June 5, 2023 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save naturalett/3ec8e7ba690577ea279063408533b941 to your computer and use it in GitHub Desktop.
Save naturalett/3ec8e7ba690577ea279063408533b941 to your computer and use it in GitHub Desktop.
Airflow running bash tasks on AWS Batch Operator based on tasks.txt
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator
from airflow.operators.python_operator import PythonOperator
import logging
import json
LOGGER = logging.getLogger("airflow.task")
LOGGER.info("airflow.task >>> 2 - INFO logger test")
BUCKET='workshop-111-222-333'
LOCAL_FILE_DOWNLOAD='/tmp/tasks.txt'
FILE_IN_S3='tasks.txt'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['admin@admin.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def download_file(source, bucket, key):
import boto3
s3 = boto3.resource('s3')
s3.Bucket(bucket).download_file(source, key)
def download_from_s3():
PythonOperator(
task_id='download_from_s3',
python_callable=download_file,
op_kwargs={'bucket': BUCKET, 'key': LOCAL_FILE_DOWNLOAD, 'source': FILE_IN_S3}
)
def print_txt():
return BashOperator(
task_id='print_txt',
bash_command=f'cat {LOCAL_FILE_DOWNLOAD}'
)
batch_params = [
"sh","-c",
"yum install awscli -y; \
aws s3 cp s3://%s/%s .; \
LINE=$((AWS_BATCH_JOB_ARRAY_INDEX + 1)); \
eval $(sed -n ${LINE}p %s)" % (BUCKET, FILE_IN_S3, FILE_IN_S3)
]
containerOverrides={
'command': batch_params,
'environment': [
{
'name': 'BUCKET',
'value': BUCKET
},
{
'name': 'FILE',
'value': FILE_IN_S3
}
]
}
def run_aws_batch_operator(ti=None):
AwsBatchOperator(
job_name=f'task-batch-based-s3',
task_id=f'task-batch-based-s3',
array_properties={ "size": read_file_lines()},
overrides=containerOverrides,
job_definition='getting-started',
job_queue='getting-started'
)
def read_file_lines(**kwargs):
try:
return len(open(LOCAL_FILE_DOWNLOAD).readlines())
except:
return False
def read_txt(**kwargs):
try:
text_file = open(f'{LOCAL_FILE_DOWNLOAD}', 'r')
tasks = text_file.read().split(',')
ti = kwargs['ti']
ti.xcom_push(key='campaign_id', value=tasks)
return tasks
except:
return False
with DAG(
'tasks_batch_based_s3',
default_args=default_args,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
download_from_s3()
print_txt()
read_txt()
tasks = PythonOperator(
task_id='read_it',
python_callable=read_txt,
)
run_aws_batch_operator()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment