Skip to content

Instantly share code, notes, and snippets.

@cheeyeo
Created May 22, 2024 15:27
Show Gist options
  • Save cheeyeo/274fa8f06e53860bd26338469b7975a0 to your computer and use it in GitHub Desktop.
Save cheeyeo/274fa8f06e53860bd26338469b7975a0 to your computer and use it in GitHub Desktop.
First DAG for processing S3 files
from airflow.models.dag import DAG
from airflow.api.client.local_client import Client
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
def analyze_data(ti):
airflow_client = Client(None, None)
messages = ti.xcom_pull(task_ids="sqs_sensor", key="messages")
for msg in messages:
records = json.loads(msg['Body'])
for rec in records["Records"]:
file = rec["s3"]["object"]["key"]
p = Path(file)
filename = p.stem
ext = p.suffix
source_key = f"s3://inputs/{file}"
dest_key = f"s3://targets/{filename}_processed{ext}"
logger.info(dest_key)
airflow_client.trigger_dag(
dag_id='s3_processor',
conf={"source_filename": source_key, "dest_filename": dest_key},
replace_microseconds=False
)
return True
with DAG(
dag_id="s3_files_controller",
start_date=datetime(year=2024, month=5, day=16),
catchup=False,
schedule=None,
tags=["CHEE", "S3"],
) as dag:
sensor = SqsSensor(
task_id="sqs_sensor",
sqs_queue=SQS_QUEUE,
max_messages=5,
num_batches=1,
delete_message_on_reception=True,
)
analyze_data = PythonOperator(
task_id="analyze_data",
python_callable=analyze_data
)
sensor >> analyze_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment