Skip to content

Instantly share code, notes, and snippets.

@rosiehoyem
Last active January 14, 2023 18:09
Show Gist options
  • Star 22 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save rosiehoyem/9e111067fe4373eb701daf9e7abcc423 to your computer and use it in GitHub Desktop.
Save rosiehoyem/9e111067fe4373eb701daf9e7abcc423 to your computer and use it in GitHub Desktop.
Getting Started With Airflow

Install Airflow

1. Install Airflow

Follow the installation instructions on the Airflow website.

Update Airflow Configurations

To configure Airflow to use Postgres rather than the default Sqlite3, go to airflow.cfg and update this configuration to LocalExecutor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor

The LocalExecutor can parallelize task instances locally.

Also update the SequelAlchemy string to point to a database you are about to create.

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://localhost/airflow

Next open a PostgreSQL shell.

psql

And create a new postgres database.

CREATE DATABASE airflow

Your now ready to initialize the DB in Airflow. In bash run:

airflow initdb

Create a DAG

1. Create a DAG folder.

In the console run:

mkdir airflow/dags

2. Add the necessary connections.

The first connection for my API call:

  • A connection type of HTTP.
  • A connection identifier of moves_profile.
  • A host string of the full API endpoint: https://moves....

The second connection for my project database:

  • A connection type of Postgres.
  • A connection identifier of users (name of the table).
  • A host string of 127.0.0.1.
  • A schema string (database name) of kojak.
  • A login of postgres (default).

3. Create a DAG python configuration file.

In the console run:

touch ~/airflow/dags/moves_profile.py

Then add your DAG configs.

"""
DAG pulls a user's profile information from the Moves API.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json

def get_profile(ds, **kwargs):
    pg_hook = PostgresHook(postgres_conn_id='users')
    api_hook = HttpHook(http_conn_id='moves_profile', method='GET')

    # Get profile info from Moves API
    resp = api_hook.run('')
    profile = json.loads(resp.content.decode('utf-8'))
    moves_user_id = profile['userId']
    moves_first_date = profile['profile']['firstDate']
    timezone = profile['profile']['currentTimeZone']['id']

    # Insert profile values into Postgres DB
    user_insert = """INSERT INTO users (moves_user_id, moves_start_date, timezone)
                      VALUES (%s, %s, %s);"""

    pg_hook.run(user_insert, parameters=(moves_user_id, moves_first_date, timezone))

default_args = {
    'owner': 'rosiehoyem',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 21),
    'email': ['rosiehoyem@gmail.com.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('moves_profile', default_args=default_args, schedule_interval=timedelta(1))


get_profile_task = \
    PythonOperator(task_id='get_profile',
                   provide_context=True,
                   python_callable=get_profile,
                   dag=dag)

Deploy with Docker

1. Setup an EC2 instance

Instructions can be found here and here.

Be sure you've set-up Port 8080:

  • Custom TCP Rule
  • Port Range: 80 (for web REST)
  • Source: Anywhere).

2. Install Docker on the EC2 instance.

Instructions to do this can be found here.

3. Pull and run the docker-airflow image onto your EC2 instance

Instructions for this instance can be found on the image Github page.

docker pull puckel/docker-airflow
docker run -d -p 8080:8080 puckel/docker-airflow

4. Create a tunnel from your local terminal into your EC2 instance on port 8080.

ssh -i ~/.ssh/aws_key_file.pem -NL 12345:localhost:8080 ubuntu@XX.XXX.XXX.XXX
@evanagovino
Copy link

FYI, your link to install docker on an EC2 instance goes to a private page :)

@homebrewcode
Copy link

Were you able to successfully setup airflow with postgres backend ?
I tried exact instructions on mac but hit the following error on 'airflow initdb', did you face a similar issue ?

sqlalchemy.exc.ProgrammingError: (psycopg2.errors.InvalidSchemaName) no schema has been selected to create in LINE 2: CREATE TABLE alembic_version ( ^

@jandersson
Copy link

Hi, thanks for the article. Your airflow installation docs point to http://pythonhosted.org/airflow/installation.html which is 404.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment