Created
April 17, 2024 19:13
-
-
Save JeffreyMFarley/7802e39953ceb4c4001f86195335b96c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Generated by ChatGPT; Modified to work by me | |
import json | |
from datetime import datetime, timedelta | |
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor | |
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook | |
from airflow.operators.empty import EmptyOperator | |
from airflow.operators.python import PythonOperator | |
from airflow.utils.dates import days_ago | |
from csm_snowflake_elt.constants import MANAGER, SNOWFLAKE_CONNECTION | |
# Parameters | |
queue_url = "https://sqs.us-east-1.amazonaws.com/xxxxxx" # Change to your SQS queue URL | |
aws_conn_id = "aws_default" # Connection ID for AWS in Airflow | |
# ------------------------------------------------------------------------------------- | |
def process_message(**kwargs): | |
""" | |
Process the message pulled from SQS. | |
""" | |
message = kwargs["ti"].xcom_pull(key="messages") | |
if message: | |
for i, record in enumerate(message): | |
# Extract the tv_insights_id from the message | |
body = json.loads(record.get("Body", "{}")) | |
sns_message = json.loads(body.get("Message", "{}")) | |
# ------------------------------------------------------------------------------------- | |
with MANAGER.create_dag( | |
dag_id="process_sqs", | |
render_template_as_native_obj=True, | |
schedule_interval=timedelta(minutes=5), # Poll every 5 minutes; adjust as needed | |
tags=["SQS"], | |
catchup=False, | |
) as dag: | |
start = EmptyOperator(task_id="init") | |
# Tasks | |
check_sqs_for_message = SqsSensor( | |
task_id="check_sqs_for_message", | |
sqs_queue=queue_url, | |
aws_conn_id=aws_conn_id, | |
max_messages=1, # Adjust as needed | |
wait_time_seconds=20, | |
dag=dag, | |
) | |
process_sqs_message = PythonOperator( | |
task_id="process_sqs_message", | |
python_callable=process_message, | |
provide_context=True, | |
dag=dag, | |
) | |
end = EmptyOperator(task_id="end") | |
# Setting dependencies | |
start >> check_sqs_for_message >> process_sqs_message >> end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment