Skip to content

Instantly share code, notes, and snippets.

@anderser
Created April 12, 2016 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anderser/56e95ef76520df65f7b550d75154be66 to your computer and use it in GitHub Desktop.
Save anderser/56e95ef76520df65f7b550d75154be66 to your computer and use it in GitHub Desktop.
A sample pipeline/DAG for Airflow
  1. Fetch some json/XML file from an external API
  2. Store the file on S3 (as a backup) with a file name with a i.e. a timestamp. Pass the filename on to next task
  3. Read the json/xml from S3 and into some table structure (pandas, agate etc) and change field types etc.
  4. Store the table in a postgres database in a temp table
  5. Compare the temp table to a "main" table and see if there are changes (some SQL diff). Find out which records have to be added/removed/updated in the "main" table.
  6. If nothing has changed, abort everything. If it has, pass on which records are new, deleted and updated.
  7. a) insert new records in main table, alert newsroom on slack of new items.b) delete items in main table not in temp-table. Alert via slack. c) Update records in main table, alert via slack
  8. The end
@r39132
Copy link

r39132 commented Apr 12, 2016

To pass context between operators, especially python callables, you can use Xcom


1. Fetch some json/XML file from an external API - Use PythonOperator
2. Store the file on S3 (as a backup) with a file name with a i.e. a timestamp. Pass the filename on to next task - Use PythonOperator with boto
3. Read the json/xml from S3 and into some table structure (pandas, agate etc) and change field types etc. Use PythonOperator with boto to read from S3 and pandas/agate to do some analysis on it.
4. Store the table in a postgres database in a temp table - Use PythonOperator to write to a db -- you can use psycopg2
5. Compare the temp table to a "main" table and see if there are changes (some SQL diff). Find out which records have to be added/removed/updated in the "main" table. - Use ShortCircuitOperator, which takes a python callable, which can use psycopg2 to read data from your db
6. If nothing has changed, abort everything. If it has, pass on which records are new, deleted and updated. — maybe combine this with the item above?
7. a) insert new records in main table, alert newsroom on slack of new items.b) delete items in main table not in temp-table.  Alert via slack. c) Update records in main table, alert via slack — Use SlackOperator to post to slack and PythonOperator to do the DB work
8 The end

@dgies
Copy link

dgies commented Apr 13, 2016

For 5/6/7, give careful thought to how large the result size could get. If it's only ever going to be a few rows, it would be reasonable to stuff it in xcom and pass it on to a BranchPythonOperator which checks if its xcom payload was empty or not and.
If you could find yourself returning larger result sets you might want to store the delta in a temp table and return a "delta not empty" flag.
Or you could, as Sid said, combine them into one big PythonOperator.

One other thing is that while SlackOperator is easy/useful, it's a bit limiting in that the message has to be defined by a template and any user-supplied params must be specified at task definition time, not execution time. This makes it hard to pass in the result of an upstream task's xcom to create messages like "Job Complete: 5 new records, 3 updated, 2 deleted". You can get around this by either creating an Airflow plugin that defines a new macro for reading those values, or by replacing the SlackOperator with a PythonOperator that reads the result and makes the same API call SlackOperator would do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment