Skip to content

Instantly share code, notes, and snippets.

@JeffreyMFarley
Created April 17, 2024 19:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JeffreyMFarley/7802e39953ceb4c4001f86195335b96c to your computer and use it in GitHub Desktop.
Save JeffreyMFarley/7802e39953ceb4c4001f86195335b96c to your computer and use it in GitHub Desktop.
# Generated by ChatGPT; Modified to work by me
import json
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from csm_snowflake_elt.constants import MANAGER, SNOWFLAKE_CONNECTION
# Parameters
queue_url = "https://sqs.us-east-1.amazonaws.com/xxxxxx" # Change to your SQS queue URL
aws_conn_id = "aws_default" # Connection ID for AWS in Airflow
# -------------------------------------------------------------------------------------
def process_message(**kwargs):
"""
Process the message pulled from SQS.
"""
message = kwargs["ti"].xcom_pull(key="messages")
if message:
for i, record in enumerate(message):
# Extract the tv_insights_id from the message
body = json.loads(record.get("Body", "{}"))
sns_message = json.loads(body.get("Message", "{}"))
# -------------------------------------------------------------------------------------
with MANAGER.create_dag(
dag_id="process_sqs",
render_template_as_native_obj=True,
schedule_interval=timedelta(minutes=5), # Poll every 5 minutes; adjust as needed
tags=["SQS"],
catchup=False,
) as dag:
start = EmptyOperator(task_id="init")
# Tasks
check_sqs_for_message = SqsSensor(
task_id="check_sqs_for_message",
sqs_queue=queue_url,
aws_conn_id=aws_conn_id,
max_messages=1, # Adjust as needed
wait_time_seconds=20,
dag=dag,
)
process_sqs_message = PythonOperator(
task_id="process_sqs_message",
python_callable=process_message,
provide_context=True,
dag=dag,
)
end = EmptyOperator(task_id="end")
# Setting dependencies
start >> check_sqs_for_message >> process_sqs_message >> end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment