Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Getting Started With Airflow

Install Airflow

1. Install Airflow

Follow the installation instructions on the Airflow website.

Update Airflow Configurations

To configure Airflow to use Postgres rather than the default Sqlite3, go to airflow.cfg and update this configuration to LocalExecutor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor

The LocalExecutor can parallelize task instances locally.

Also update the SequelAlchemy string to point to a database you are about to create.

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://localhost/airflow

Next open a PostgreSQL shell.

psql

And create a new postgres database.

CREATE DATABASE airflow

Your now ready to initialize the DB in Airflow. In bash run:

airflow initdb

Create a DAG

1. Create a DAG folder.

In the console run:

mkdir airflow/dags

2. Add the necessary connections.

The first connection for my API call:

  • A connection type of HTTP.
  • A connection identifier of moves_profile.
  • A host string of the full API endpoint: https://moves....

The second connection for my project database:

  • A connection type of Postgres.
  • A connection identifier of users (name of the table).
  • A host string of 127.0.0.1.
  • A schema string (database name) of kojak.
  • A login of postgres (default).

3. Create a DAG python configuration file.

In the console run:

touch ~/airflow/dags/moves_profile.py

Then add your DAG configs.

"""
DAG pulls a user's profile information from the Moves API.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json

def get_profile(ds, **kwargs):
    pg_hook = PostgresHook(postgres_conn_id='users')
    api_hook = HttpHook(http_conn_id='moves_profile', method='GET')

    # Get profile info from Moves API
    resp = api_hook.run('')
    profile = json.loads(resp.content.decode('utf-8'))
    moves_user_id = profile['userId']
    moves_first_date = profile['profile']['firstDate']
    timezone = profile['profile']['currentTimeZone']['id']

    # Insert profile values into Postgres DB
    user_insert = """INSERT INTO users (moves_user_id, moves_start_date, timezone)
                      VALUES (%s, %s, %s);"""

    pg_hook.run(user_insert, parameters=(moves_user_id, moves_first_date, timezone))

default_args = {
    'owner': 'rosiehoyem',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 21),
    'email': ['rosiehoyem@gmail.com.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('moves_profile', default_args=default_args, schedule_interval=timedelta(1))


get_profile_task = \
    PythonOperator(task_id='get_profile',
                   provide_context=True,
                   python_callable=get_profile,
                   dag=dag)

Deploy with Docker

1. Setup an EC2 instance

Instructions can be found here and here.

Be sure you've set-up Port 8080:

  • Custom TCP Rule
  • Port Range: 80 (for web REST)
  • Source: Anywhere).

2. Install Docker on the EC2 instance.

Instructions to do this can be found here.

3. Pull and run the docker-airflow image onto your EC2 instance

Instructions for this instance can be found on the image Github page.

docker pull puckel/docker-airflow
docker run -d -p 8080:8080 puckel/docker-airflow

4. Create a tunnel from your local terminal into your EC2 instance on port 8080.

ssh -i ~/.ssh/aws_key_file.pem -NL 12345:localhost:8080 ubuntu@XX.XXX.XXX.XXX
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment