Skip to content

Instantly share code, notes, and snippets.

@abridgett
Last active April 29, 2021 22:03
Show Gist options
  • Save abridgett/a0cc167d4d6ddcd1fef9 to your computer and use it in GitHub Desktop.
Save abridgett/a0cc167d4d6ddcd1fef9 to your computer and use it in GitHub Desktop.
airflow XCOM notification example
MAP_SLACK_ATTACHMENTS = [
{
"fallback": "{{params.map}} {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
"pretext": "{{params.map}} update {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
"fields": [
{
"title": "Copied",
"value": "{{ task_instance.xcom_pull(task_ids=params.map, key='copied') }}",
"short": True
}
]
}
]
MAP_EMAIL_CONTENT = """
<b>Map:</b> {{ params.map }}<br>
<b>Date:</b> {{ ds }}<br>
<p>
<b>Copied:</b> {{ task_instance.xcom_pull(task_ids=params.map, key='copied') }}<br>
"""
def update_hdfs(ds, **kwargs):
....
kwargs['ti'].xcom_push(key='slack_status', value='danger')
if success:
kwargs['ti'].xcom_push(key='slack_status', value='good')
kwargs['ti'].xcom_push(key='copied', value=int(m.group(1)))
speedmap = PythonOperator(
task_id='speedmap',
python_callable=update_hdfs,
params={'map': 'speedmap'},
provide_context=True,
pool = 'speedmap', # no simultaneous runs
dag=dag)
map_slack = SlackAPIPostOperator(
task_id='speedmap_slack',
channel="#airflow-test",
token=Variable.get('slack_token'),
params={'map': speedmap},
text='',
attachments=MAP_SLACK_ATTACHMENTS,
trigger_rule='all_done',
dag=dag)
dag.set_dependency('speedmap', 'speedmap_slack')
map_email = EmailOperator(
task_id='speedmap_email',
to="adrian@opensignal.com",
params={'map': map},
subject=map + " {{ ds }} {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
html_content=MAP_EMAIL_CONTENT,
trigger_rule='all_done',
dag=dag)
# Just to show an alternative approach
map_email.set_upstream(speedmap)
@hardkap
Copy link

hardkap commented Apr 7, 2017

When I try to run this in my local Airflow, it gives me the following error (any ideas?)
[2017-04-07 11:34:44,659] {models.py:1286} ERROR - (sqlite3.InterfaceError) Error binding parameter 1 - probably unsupported type. [SQL: u'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom."key" = ? AND xcom.task_id = ? AND xcom.dag_id = ? AND xcom.execution_date = ? ORDER BY xcom.execution_date DESC, xcom.timestamp DESC\n LIMIT ? OFFSET ?'] [parameters: ('slack_status', <Task(PythonOperator): speedmap>, 'end-date_v22', '2017-04-06 00:00:00.000000', 1, 0)]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment