Skip to content

Instantly share code, notes, and snippets.

@Montgoner
Created December 28, 2018 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Montgoner/a7e1ccd66f224c395b559cde6c32b8f7 to your computer and use it in GitHub Desktop.
Save Montgoner/a7e1ccd66f224c395b559cde6c32b8f7 to your computer and use it in GitHub Desktop.

In this post I'll discuss at a high-level why Geoblink chose Apache Airflow as its workflow management tool and give a brief introduction on its most relevant features.

Data pipelines at Geoblink cover a bunch of use cases such as:

  • Downloading, cleaning and formatting sociodemographic data
  • Spatial aggregation and disaggregation of sociodemographic and commercial indicators based on land registry data
  • Processing large quantities of GPS or Telco data in order to estimate footfall footfall or population flows
  • Retraining Machine Learning models

Managing those pipelines, most of them run periodically, can quickly become extremely costly to maintain. Data pipelines tend to have the unpleasant ability to become "hairballs", i.e. dependencies between tasks tend to get more and more intrincate with time. Last year we talked about how we were automating data science pipelines with Jenkins. While that worked well for us when our pipelines were relatively simple, as it's common in high-velocity start-ups you need to quickly adapt to new requirements. In our case, new requirements often mean handling new data sources that provide added value to our location intelligence platform. So a few months ago we decided to get into action and search for a more powerful and flexible workflow management tool.

The data engineering community has overseen in the last years the development of several tools designed to tackle the aforementioned problems, such as Luigi (open-sourced by Spotify), Azkaban (open-sourced by Linkedin) or Apache Oozie. However, it's the latest kid in the block, Apache Airflow, the one that has gained a lot of traction in the last couple of years. Airflow attemps to become the glue for (batch) data infrastructure/pipelining as opposed to the other tools (e.g. Luigi lacks its own scheduler, Oozie and Azkaban are very much focused on running Hadoop jobs) and the flood of talks/articles related to Airflow appearing lately online and in data engineering conferences seem to indicate that the conceptual modelit proposes is probably on the right track.

TODO: add here picture of google trends

What is Airflow

Airflow was open-sourced by AirBnb in 2014 and in 2016 entered the Apache Incubator project. As put by Airflow's creator, Maxime Beauchemin, Airflow is a "platform to programmatically author, schedule and monitor workflows (a.k.a. DAG = Directed Acyclic Graph)" or, in other words, a "batch workflow orchestrator".

Some of the features that Airflow provides that convinced us to make it a key part of our data engineering infrastructure are:

  • Open-source
  • Glue for (batch) data infrastructure: provides common conceptual framework for data pipelining tasks
  • Written in Python: main programming language used by Geoblink's data scientists, who are in charge of adding pipelines to handle new data sources.
  • Workflows-as-code: allows to build reusable abstractions for data processing tasks by extending existing Airflow's Operators and Hooks (see below) or rolling out your own.
  • Thriving community
  • Dynamic workflows
  • Ships with
    • DAG Scheduler
    • Web application (UI)
    • Powerful CLI
    • Kubernetes executor

Airflow puts the concept of Directed Acyclic Graphs (DAG) at its core. In this way, each pipeline is just a DAG where the nodes of the graph are tasks and the edges define dependencies between tasks. Tasks are basically instantiations of an Airflow construct called Operator, which either performs a certain action, moves data between different systems or keeps running until a certain criterion is met (sensors).

TODO: add DAG image

Airflow's architecture is composed by the following main blocks:

  • Metadata database
  • Scheduler
  • Executor

The metadata database stores information about the state of tasks and pipelines. The scheduler monitors all tasks and DAGs, and launches task instances accordingly when dependencies are met. Launching a task instance means starting an executor instance, and executors can be of several types. In general, they are message queueing processes which assign tasks to specific workers (Celery worker, Kubernetes pod, subprocess...).

Finally, Airflow provides a powerful UI to monitor DAGs and task instances and to interact with them (activating/deactivating, restarting failed pipelines, etc.). In particular, there are multiple ways to visualize the dependencies between tasks in a DAG, the time taken by each of them in successive DAG runs, etc.

TODO: add images

Example: PostgresWithPostgisHook

I am not going to go into more detail as this post doesn't intend to be an Airflow tutorial and there are already some very good resources out there (see this basic tutorial or this more advanced one). However, I would like to give a taste of how easy it is to extend Airflow's custom hooks/operators for our needs. The following example deals with a very common use case at Geoblink, where we need to ingest a geographic layer into Postgis for further cleaning. In this case, we extend the PostgresHook (hooks are interfaces to external platforms and databases like S3, Postgres...) by adding a method that loads a shapefile to a Postgres instance.

import logging
import subprocess

from airflow.hooks.postgres_hook import PostgresHook


class PostgresWithPostgisHook(PostgresHook):
    """
    Extends the basic postgres hook to be able to load not only from CSV but also shapefiles
    """
    def __init__(self, *args, **kwargs):
        super(PostgresWithPostgisHook, self).__init__(*args, **kwargs)

    def load_shapefile(self, shapefile_path, output_table, original_srid,
                       output_schema='public', create_spatial_index=True):
        """
        :param :
        """
        logger = logging.getLogger(__name__)

        conn = self.get_connection(self.postgres_conn_id)
        conn_args = dict(
            host=conn.host,
            user=conn.login,
            password=conn.password,
            dbname=self.schema or conn.schema,
            port=conn.port)

        # Drop table if exists (not supported by shp2pgsql yet) (-d uses DROP TABLE)
        self.run("DROP TABLE IF EXISTS {schema}.{table};".format(schema=output_schema, table=output_table))

        if create_spatial_index:
            create_index_flag = '-I'
        else:
            create_index_flag = ''

        bash_command = "shp2pgsql -D {create_index_flag} -s {original_srid} {path_to_shp} {output_schema}.{output_table} | PGPASSWORD={password} psql -U {user} -p {port} -d {database} -h {host} -v ON_ERROR_STOP=1".format(
            password=conn_args['password'],
            create_index_flag=create_index_flag,
            original_srid=original_srid,
            path_to_shp=shapefile_path,
            output_schema=output_schema,
            output_table=output_table,
            user=conn_args['user'],
            port=conn_args['port'],
            database=conn_args['dbname'],
            host=conn_args['host']
        )
        logger.debug('shp2pgsql command: {shp2pgsql}'.format(shp2pgsql=bash_command))
        try:
            process = subprocess.check_call(bash_command, shell=True)
        except subprocess.CalledProcessError as ex:
            raise subprocess.CalledProcessError

By extending hooks in such a way, we can develop a common interface to the different systems with which our pipelines interact, hiding the details of the underlying access methods. The official and community-contributed hooks, operators and executors provide a solid basis on which to develop functionality tailored to your business needs.

Dynamic DAGs

One of the features at which Airflow excels is the dynamic generation and definition of DAGs. This differentiates Airflow from other workflow management tools, and in particular from Luigi: in Airflow a task is defined by instantiating an Operator, whereas in Luigi the creation of the task takes place by inheriting the base Task class. Dynamic inheritance is obviously much harder than instantiation, and that's probably a design decision that hampers Luigi's adoption in the workflow management ecosystem.

Final thoughts

In short, adopting Airflow at Geoblink has allowed us

  • Use as glue for all (batch) data science pipelines
  • Bring a lot of sanity into our data pipelines by building reusable abstractions and improving dependency tracking & visualization

While Airflow is a relatively young project and there are parts of it that might not be as developed as we'd like or plain confusing (see the FAQ section discussing the meaning of the start_date of a DAG), it's also true that the community is really active and both the features introduced in the latest release and the announced future developments are really exciting. In fact, the introduction of the experimental KubernetesExecutor in version 1.10 looks really promising: by allowing users to auto-scale workers via Kubernetes (making sure that resources are not wasted), it makes Airflow more cloud-native than ever.

Airflow has come to Geoblink to stay: in following blog posts we'll exploring the new KubernetesExecutor and how to integrate data validation into our DAGs. Stay tuned!

Resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment