Skip to content

Instantly share code, notes, and snippets.

@johnmuller87
Created January 5, 2018 15:56
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnmuller87/876ce014e0a633bfe17d343d331d5155 to your computer and use it in GitHub Desktop.
Save johnmuller87/876ce014e0a633bfe17d343d331d5155 to your computer and use it in GitHub Desktop.

TL;DR

Real data behaves in many unexpected ways that can break even the best engineered data pipelines. To catch as much of this weird behaviour the Wholesale Banking Advanced Analytics team has created 7 layers of data testing that they use in their CI setup and Apache Airflow pipelines to stay in control of their data. The 7 layers are:

  • DAG Integrity Tests; have your CI check if your DAG is an actual DAG
  • Split your ingestion from your deployment; keep the logic you use to ingest data sources separate from the logic that deploys your application
  • Data Tests; check if your logic is outputting what you'd expect
  • Chuck Norris; Get alerts from your data pipelines when they blow up
  • Nuclear GIT; Always make sure you're running your latest verified code
  • Mock Pipeline Tests; In your CI create fake data so you know exactly what to expect when testing your logic
  • DTAP; DTAP your data into four different environments, development is really small, just to see if it runs, test to take a representative sample of your data to do first sanity checks, acceptance is a carbon copy of production, allowing you to test performance and have a Product Owner do checks before releasing to production

Examples on 7 of these layers can be found at: https://github.com/danielvdende/data-testing-with-airflow

Split Ingestion from Deployment

In an environment where you ingest (multiple) data sources that end up in your project, you often have ETL specifically for your ingestion that can be logically separated from the ETL that goes into your project. At WBAA we ended up doing this by creating one Airflow DAG per data source that we ingest and one DAG that does the ETL for our project. There were several reasons for us to do this:

  • If your ingestion fails, you do not want to immediately fail your project ETL. It might be perfectly fine that (one of) your data sources is not completely up-to-date. Your project can just use the data that was previously ingested correctly
  • It created a nice place to enforce an interface between data sources that are logically the same, but arrive in (vastly) different formats
  • It also allowed us to have one GIT repository per ingestion and per project, this meant that we could separate all the code and CI tests that we want to run per repository
  • It makes it easy to add and remove data sources from a project

How to use

This layer is faily easy to implement, all you need to do is logically separate your ingestion and deployment. The way it is implemented at WBAA:

  1. Create a GIT repository per data source ingestion and one per project
  2. Keep all the logic and CI tests belonging to source/project isolated
  3. Define an interface per logical part

We will give an example of an interface we defined that forces all the different systems in ING that do payment transactions into the same shape. At the end of the ETL of the ingestion of a payment transaction data source we force that it looks like the following:

  • payer account
  • payer name
  • beneficiary account
  • beneficiary name
  • amount
  • currency
  • date

Why is this a necessary? Because one source system could send things in a party, counterparty format where the + or - sign indicates which way the money went. Yet another could do the exact same but have a credit or debit column, you get the picture :-).

Nuclear GIT (WIP)

Chuck Norris

When things go wrong (and we assume that this will happen), it is important that we, as engineers, are notified, so that we can respond quickly. Chuck Norris helps us with this. Chuck Norris is a Slack 'bot' that posts to a designated Slack channel when a DAG fails, with some details on which DAG and which task in this DAG failed. For privacy reasons, we don't post any more information on the error in Slack, but you could add this if your data is not privacy-sensitive. Chuck Norris allows us to avoid constantly checking a DAG's progress. One danger is that you can become reliant on Chuck to notify you of failure, so if Chuck himself fails (for instance, because a firewall opening is closed), you would wrongfully assume all is well.

How to use

When we initially added Chuck Norris, Airflow's Slack operator did not meet our requirements. We therefore use a simple BashOperator. Because we don't want you good people posting into our Slack channel, we haven't included this layer in the GitHub repository. You can, however, easily integrate this in your own code with the following steps:

  1. In the default_args object that you use to create your DAG object, add the following: https://gist.github.com/77c207916acdb0ea16d6d0c546512b01

  2. Add the following function to your airflowfile: https://gist.github.com/e462c841541a8488cff41dae6f4a4ebb

Now, every time a task fails, the send_slack_alert() function is called, which can perform any logic you want (in this case, post a Slack message).

DAG integrity tests

DAG integrity tests are the first layer of the data inferno environment. The idea of this layer is to prevent any typo’s or mistakes in the definition of our Airflow DAG. It does this by performing a very simple check: is the DAG you have written a valid DAG. It verifies this by using Airflow’s own internal API to check if the DAG object is a correct, valid instance of the Airflow DAG model. These tests can be easily integrated into your CI pipeline, which enables you to automatically check a DAG’s validity every time you push to git. The power of this test is not only to catch typo’s. One of the easiest mistakes to make in Airflow is to accidentally define cycles. An example of a DAG with a cycle could be: https://gist.github.com/cf46f31f7c1220dd6e047b4f9b5d0ddd

The mistake here is the task_id value given to the final operator. This kind of mistake can easily happen with quick copy-pasting, and can be very annoying to debug. But the DAG integrity tests flag this issue before you deploy your code to your Airflow environment. So instead of seeing this in Airflow: IMAGE OF CYCLE IN DAG ERROR IN AIRFLOW You’ll see this error in your CI: IMAGE OF CYCLE IN DAG ERROR IN CI

How to use

Integrating this layer requires a number of steps (assuming you already have a CI environment set up):

  1. In your CI, setup a task/stage that installs pytest, airflow, and coverage.

  2. Define a conftest.py file in which you add any custom Airflow configuration you might need.

  3. For every file in your repository (get the paths to these files yourself), run the following code: https://gist.github.com/7a72838f46a637194640d3b564548bab

    This code verifies that the file in which you claim to have defined a DAG actually contains a valid Airflow DAG.

Data Tests

You can see a data test as a way to check if the logic you have just used in an ETL process, has done what you expected it to do. It combines a sort of unit test for your ETL step with a more statistical test to see if the data still makes sense after processing.

As previously mentioned at WBAA we have split our Airflow DAGs in those for ingestion and those for project ETL, we also make a distinction in what kind of data tests we want to run in these DAGs to make sure that after running the DAG it has done what we expect.

Examples of data tests for ingestion

  • Are there files available for ingestion?
  • Did we get the columns that we expected?
  • Are the rows that are in there valid?
  • Did the row count of your table only increase?

The valid rows one is where we check if the customer ID's that we got in a data source, actually reference a real record in our central customer database. You'd be surprised how often a simple test like this picks out weird records. Same with checking if after ingestion, you have more records. This simple test has let us know on several occasions that when we changed logic, we made a mistake because suddenly the number of records unexpectedly decreased.

Examples of project ETL data tests

Imagine you're dealing with payment transaction data that has both information of companies and private individuals and you need to perform an ETL step that aggregates these PIs in to one line per week for all transactions from PIs to companies and vice versa, filters out PI to PI and keeps company to company untouched.

An example of tests can then be:

  • Are the known private individuals filtered out?
  • Are the known companies still in?
  • Has the aggregation of PI’s worked correctly?

How to use

Fake data tests

One of the most difficult aspects of designing and implementing reliable data pipelines is the fact that when working with real data, there are two moving parts: the data (and its quality) and your code. In order to be able to reliably test your code, it’s very ipmortant to ensure that your code is the only ‘moving part’. This layer of tests fixes the data in place, allowing the tests to focus on the code. By fixing the data, you know what data goes in to the functions in your code, and you can verify that the output of the functions in your code matches what you expect. In a sense, we are treating our code as a black box, and verifying its behaviour given certain input data. IMAGE OF BLACKBLOX INPUT/OUTPUT Implementing this layer is quite involved, and requires a good understanding and definition of your data. The usefulness of the tests here will very strongly depend on how well your fake data models the real data. Of course, as you learn more about your data and its characteristics, you can and should update your fake data.

How to use

We have implemented our fake data tests with PySpark, as almost all of our data processing logic leverages Spark. In order to use the fake data tests, you’ll need a couple of things (assuming you have a working CI environment:

  1. In your CI, setup a task/stage that installs pytest, pyspark, and coverage
  2. Create a conftest.py file in which you define the schemas of your data. Once that is in place, use these schemas to generate a given number of rows of data in Spark dataframes. Be sure to create rows of data that are representative of your real data. Write out each Spark dataframe to a table. By default, Spark will use Derby as its metastore. For testing purposes this is fine, you should be careful to ensure that your data generation in conftest.py uses the same metastore and spark-warehouse that the tests will use (otherwise the data will be missing).
  3. Create as many test files as you want. The general rule of thumb here is one test file for every Spark script in your data pipeline, possibly with multiple tests per file. In each test, call the function you’re testing. Once this completes, use Spark SQL to verify that the output tables now contain the data you would expect given the input data you defined. As an example, imagine you work for a bank and have to ensure that North Korean transactions are removed from your dataset.

Step 1: Define your fake data

https://gist.github.com/d3096c5ead8e195762f28eaf568cb026

Step 2: Run your function

https://gist.github.com/f3e177ebd9d6e1121304754954c6b09b

Step 3: Verify the output of the function

https://gist.github.com/a2bc53eacac33412af794db46ab0f9e7

DTAP (WIP)

Splitting application back-ends and front-ends across multiple environments has been a best practice in software development for many years. Generally, four environments are defined: development, test, acceptance, and production. Each of these serves a distinct purpose. At WBAA, in order to deal with the reality of real data, we have extended this concept of multiple environments to our data. Normally, only the production environment of an application runs on production (i.e. real) data. In our case, however, we have created 4 distinct environments, in which real data is used in all. We do this, because real data always contains unexpected, surprising characteristics that cannot be predicted beforehand. We use 4 environments:

  • Development: contains a completely random, very small (0.0025% in our case) sample of your data. The data won’t make any sense, and as a consequence checking the development application using this data will not make any sense for a user. This environment’s purpose is to verify that the tasks in your data pipeline can be executed, and can complete with non-zero exit codes. Because the amount of data is very small, this pipeline will run very quickly compared to the production one.
  • Test: contains a slightly larger, non-random sample of the data. This non-random sample is achieved in our case by handpicking some entities for which to keep the data. The advantage of this is that the data for these entities will now make sense in your application. Therefore, you can now access the test application and verify (for this limited set of entities) that the data has been processed correctly. The definition of these entities is very important, and will depend heavily on the application and its use cases. Although the amount of data is significantly smaller than for the production pipeline (and so much quicker), the Test environment contains slightly more data than the Development environment due to the sampling method, and therefore will be slightly slower.
  • Acceptance: contains a carbon copy of the production data. The value of the acceptance environment lies in multiple aspects;
    1. This environment contains the full dataset. In some cases, mistakes in processing logic will only come to light when running on the full dataset (for example, incorrect statistics).
    2. Due to the sampling in the previous two environments, the Acceptance environment is also the first place in which the computational performance of the data pipeline can be examined and verified.
    3. Because all data is present, this environment can be used for your Product Owner to verify changes before pushing them to production (where the users will see them)
  • Production: contains all production data. This is the environment your users will use. As such, it is very important that problems with the data are kept to a minimum. Of course, it is very difficult to achieve to never have any data issues in your application, but this circle of data testing hell (and the others) can help keeping the number of issues to a minimum. Each of these environments is a separate git branch in our data pipeline repository. Each branch is separately checked out in our Airflow environment, so we have 4 branches of the same repository checked out in our Airflow environment, resulting in 4 DAGs. In order to define which environment a DAG represents, we add a simple text file in each checked out directory, identifying the environment. In order to keep manual work to a minimum, we have added automatic promotion functionality to our DTAP procedure. In short, an average workflow on a feature will be as follows:
  • One of our engineer works on the feature on a branch off the development branch.
  • When finished, his/her code is reviewed and merged into the development branch.
  • The development DAG is manually triggered.
  • If the development DAG successfully runs through, the final two tasks firstly promote the code to test, then trigger the test DAG to run.
  • If the test DAG successfully runs through, the same procedure
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment