Skip to content

Instantly share code, notes, and snippets.

@cheeyeo
Created May 22, 2024 15:37
Show Gist options
  • Save cheeyeo/03af6902585ac4a242b0d01fbbb91df3 to your computer and use it in GitHub Desktop.
Save cheeyeo/03af6902585ac4a242b0d01fbbb91df3 to your computer and use it in GitHub Desktop.
Example second workflow for processing images
from urllib.parse import urlparse
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator
def file_from_s3(prefix):
"""
Fetches S3 processed file from remote to local /tmp
to allow inference to run on it
"""
fname = urlparse(prefix).path.lstrip('/')
s3_hook = S3Hook()
f_source = s3_hook.download_file(
fname,
bucket_name="airflow-targets",
)
return f_source
def file_to_s3(tmp_filepath, s3_target_path):
"""
Saves file to S3
"""
fname = urlparse(s3_target_path).path.lstrip('/')
s3_hook = S3Hook()
s3_hook.load_file(
tmp_filepath,
fname,
bucket_name="airflow-targets",
replace=True
)
with DAG(
dag_id='s3_processor',
catchup=False,
schedule=None,
tags=["CHEE", "S3"],
):
transformer = S3FileTransformOperator(
task_id='transform_file',
source_s3_key="{{ dag_run.conf.get('source_filename') }}",
dest_s3_key="{{ dag_run.conf.get('dest_filename') }}",
transform_script='/opt/airflow/dags/image_processor.py',
replace=True
)
download_file = PythonOperator(
task_id="download_file",
python_callable=file_from_s3,
provide_context=True,
op_kwargs={"prefix": "{{ dag_run.conf.get('dest_filename') }}"}
)
run_dino_inference = BashOperator(
task_id='run_dino_inference',
bash_command="python /opt/airflow/dags/model_dino_inference.py {{ ti.xcom_pull(task_ids='download_file') }}",
do_xcom_push=True
)
run_sam_inference = BashOperator(
task_id="run_sam_inference",
bash_command="python /opt/airflow/dags/model_sam_mobile_inference.py {{ ti.xcom_pull(task_ids='download_file') }} '{{ ti.xcom_pull(task_ids='run_dino_inference', key='return_value') }}'",
)
upload_file = PythonOperator(
task_id="upload_file",
python_callable=file_to_s3,
provide_context=True,
op_kwargs={"tmp_filepath": "{{ ti.xcom_pull(task_ids='download_file') }}", "s3_target_path": "{{ dag_run.conf.get('dest_filename') }}"}
)
transformer >> download_file >> run_dino_inference >> run_sam_inference >> upload_file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment