In this post I'll discuss at a high-level why Geoblink chose Apache Airflow as its workflow management tool and give a brief introduction on its most relevant features.
Data pipelines at Geoblink cover a bunch of use cases such as:
- Downloading, cleaning and formatting sociodemographic data
- Spatial aggregation and disaggregation of sociodemographic and commercial indicators based on land registry data
- Processing large quantities of GPS or Telco data in order to estimate footfall footfall or population flows
- Retraining Machine Learning models
Managing those pipelines, most of them run periodically, can quickly become extremely costly to maintain. Data pipelines tend to have the unpleasant ability to become "hairballs", i.e. dependencies between tasks tend to get more and more intrincate with time. Last year we talked about how we were automating data science pipelines with Jenkins. While that worked well for us when our pipelines were relatively simple, as it's common in high-velocity start-ups you need to quickly adapt to new requirements. In our case, new requirements often mean handling new data sources that provide added value to our location intelligence platform. So a few months ago we decided to get into action and search for a more powerful and flexible workflow management tool.
The data engineering community has overseen in the last years the development of several tools designed to tackle the aforementioned problems, such as Luigi (open-sourced by Spotify), Azkaban (open-sourced by Linkedin) or Apache Oozie. However, it's the latest kid in the block, Apache Airflow, the one that has gained a lot of traction in the last couple of years. Airflow attemps to become the glue for (batch) data infrastructure/pipelining as opposed to the other tools (e.g. Luigi lacks its own scheduler, Oozie and Azkaban are very much focused on running Hadoop jobs) and the flood of talks/articles related to Airflow appearing lately online and in data engineering conferences seem to indicate that the conceptual modelit proposes is probably on the right track.
TODO: add here picture of google trends
Airflow was open-sourced by AirBnb in 2014 and in 2016 entered the Apache Incubator project. As put by Airflow's creator, Maxime Beauchemin, Airflow is a "platform to programmatically author, schedule and monitor workflows (a.k.a. DAG = Directed Acyclic Graph)" or, in other words, a "batch workflow orchestrator".
Some of the features that Airflow provides that convinced us to make it a key part of our data engineering infrastructure are:
- Open-source
- Glue for (batch) data infrastructure: provides common conceptual framework for data pipelining tasks
- Written in Python: main programming language used by Geoblink's data scientists, who are in charge of adding pipelines to handle new data sources.
- Workflows-as-code: allows to build reusable abstractions for data processing tasks by extending existing Airflow's Operators and Hooks (see below) or rolling out your own.
- Thriving community
- Dynamic workflows
- Ships with
- DAG Scheduler
- Web application (UI)
- Powerful CLI
- Kubernetes executor
Airflow puts the concept of Directed Acyclic Graphs (DAG) at its core. In this way, each pipeline is just a DAG where the nodes of the graph are tasks and the edges define dependencies between tasks. Tasks are basically instantiations of an Airflow construct called Operator, which either performs a certain action, moves data between different systems or keeps running until a certain criterion is met (sensors).
TODO: add DAG image
Airflow's architecture is composed by the following main blocks:
- Metadata database
- Scheduler
- Executor
The metadata database stores information about the state of tasks and pipelines. The scheduler monitors all tasks and DAGs, and launches task instances accordingly when dependencies are met. Launching a task instance means starting an executor instance, and executors can be of several types. In general, they are message queueing processes which assign tasks to specific workers (Celery worker, Kubernetes pod, subprocess...).
- Architecture: slide here with the architecture: TODO: add picture of architecture and credit https://towardsdatascience.com/data-pipelines-luigi-airflow-everything-you-need-to-know-18dc741449b7
Finally, Airflow provides a powerful UI to monitor DAGs and task instances and to interact with them (activating/deactivating, restarting failed pipelines, etc.). In particular, there are multiple ways to visualize the dependencies between tasks in a DAG, the time taken by each of them in successive DAG runs, etc.
TODO: add images
I am not going to go into more detail as this post doesn't intend to be an Airflow tutorial and there are already some very good resources out there (see this basic tutorial or this more advanced one). However, I would like to give a taste of how easy it is to extend Airflow's custom hooks/operators for our needs. The following example deals with a very common use case at Geoblink, where we need to ingest a geographic layer into Postgis for further cleaning. In this case, we extend the PostgresHook (hooks are interfaces to external platforms and databases like S3, Postgres...) by adding a method that loads a shapefile to a Postgres instance.
import logging
import subprocess
from airflow.hooks.postgres_hook import PostgresHook
class PostgresWithPostgisHook(PostgresHook):
"""
Extends the basic postgres hook to be able to load not only from CSV but also shapefiles
"""
def __init__(self, *args, **kwargs):
super(PostgresWithPostgisHook, self).__init__(*args, **kwargs)
def load_shapefile(self, shapefile_path, output_table, original_srid,
output_schema='public', create_spatial_index=True):
"""
:param :
"""
logger = logging.getLogger(__name__)
conn = self.get_connection(self.postgres_conn_id)
conn_args = dict(
host=conn.host,
user=conn.login,
password=conn.password,
dbname=self.schema or conn.schema,
port=conn.port)
# Drop table if exists (not supported by shp2pgsql yet) (-d uses DROP TABLE)
self.run("DROP TABLE IF EXISTS {schema}.{table};".format(schema=output_schema, table=output_table))
if create_spatial_index:
create_index_flag = '-I'
else:
create_index_flag = ''
bash_command = "shp2pgsql -D {create_index_flag} -s {original_srid} {path_to_shp} {output_schema}.{output_table} | PGPASSWORD={password} psql -U {user} -p {port} -d {database} -h {host} -v ON_ERROR_STOP=1".format(
password=conn_args['password'],
create_index_flag=create_index_flag,
original_srid=original_srid,
path_to_shp=shapefile_path,
output_schema=output_schema,
output_table=output_table,
user=conn_args['user'],
port=conn_args['port'],
database=conn_args['dbname'],
host=conn_args['host']
)
logger.debug('shp2pgsql command: {shp2pgsql}'.format(shp2pgsql=bash_command))
try:
process = subprocess.check_call(bash_command, shell=True)
except subprocess.CalledProcessError as ex:
raise subprocess.CalledProcessError
By extending hooks in such a way, we can develop a common interface to the different systems with which our pipelines interact, hiding the details of the underlying access methods. The official and community-contributed hooks, operators and executors provide a solid basis on which to develop functionality tailored to your business needs.
One of the features at which Airflow excels is the dynamic generation and
definition of DAGs. This differentiates Airflow from other workflow management
tools, and in particular from Luigi: in Airflow a task is defined by
instantiating an Operator
, whereas in Luigi the creation of the task takes
place by inheriting the base Task
class. Dynamic inheritance is obviously much
harder than instantiation, and that's probably a design decision that hampers
Luigi's adoption in the workflow management ecosystem.
In short, adopting Airflow at Geoblink has allowed us
- Use as glue for all (batch) data science pipelines
- Bring a lot of sanity into our data pipelines by building reusable abstractions and improving dependency tracking & visualization
While Airflow is a relatively young project and there are parts of it that might
not be as developed as we'd like or plain confusing (see the FAQ
section
discussing the meaning of the start_date
of a DAG), it's also true that the
community is really active and both the features introduced in the latest
release
and the announced future
developments are
really exciting. In fact, the introduction of the experimental
KubernetesExecutor
in version 1.10 looks really promising: by allowing users to auto-scale workers
via Kubernetes (making sure that resources are not wasted), it makes Airflow
more cloud-native than ever.
Airflow has come to Geoblink to stay: in following blog posts we'll exploring
the new KubernetesExecutor
and how to integrate data validation into our DAGs.
Stay tuned!