Skip to content

Instantly share code, notes, and snippets.

@thomas-a-neil
Created August 22, 2016 20:59
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomas-a-neil/c56409976002739a3511948fc3e29d77 to your computer and use it in GitHub Desktop.
Save thomas-a-neil/c56409976002739a3511948fc3e29d77 to your computer and use it in GitHub Desktop.
Sample DAG to download from S3, sleep, and reupload
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# default arguments for each task
default_args = {
'owner': 'nthomas',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('test_s3_download',
default_args=default_args,
schedule_interval=None) # "schedule_interval=None" means this dag will only be run by external commands
TEST_BUCKET = '23andme-nthomas'
TEST_KEY = 'foo.txt'
LOCAL_FILE = '/Users/nthomas/scratch/pipeline/airflow/foo.txt'
# simple download task
def download_file(bucket, key, destination):
import boto3
s3 = boto3.resource('s3')
s3.meta.client.download_file(bucket, key, destination)
# simple upload task
def upload_file(source, bucket, key):
import boto3
s3 = boto3.resource('s3')
s3.Bucket(bucket).upload_file(source, key)
download_from_s3 = PythonOperator(
task_id='download_from_s3',
python_callable=download_file,
op_kwargs={'bucket': TEST_BUCKET, 'key': TEST_KEY, 'destination': LOCAL_FILE},
dag=dag)
sleep_task = BashOperator(
task_id='sleep_for_1',
bash_command='sleep 1',
dag=dag)
upload_to_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_file,
op_kwargs={'bucket': TEST_BUCKET, 'key': TEST_KEY, 'source': LOCAL_FILE},
dag=dag)
download_from_s3.set_downstream(sleep_task)
sleep_task.set_downstream(upload_to_s3)
@quenchua
Copy link

quenchua commented Jan 7, 2021

Nice example script.
How would you perform something like this if you were using CeleryExecutor, where each task will run on a different machine?

@irshadgit
Copy link

@quencha: In that case you can follow any of the below methodes.

  • Mount a common file system like glusterfs to all your workers, So that the file system is shared across workers.
  • Use dedicated queue for your workers and mention that queue in your dag task. This way you only pushes certain tasks to single workers. You can find detailed explanation here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment