Skip to content

Instantly share code, notes, and snippets.

@nnguyen168
Last active June 19, 2020 07:09
Show Gist options
  • Save nnguyen168/36c3853f4e4db8e1c2f86b7946bde996 to your computer and use it in GitHub Desktop.
Save nnguyen168/36c3853f4e4db8e1c2f86b7946bde996 to your computer and use it in GitHub Desktop.
An example of job scheduling in Apache Airflow
# import libraries
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# the default parameters for each operation/task
default_args = {
'owner': 'big_data_guy',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1),
'email': ['big_data_guy@example.com'],
'email_on_failure': True,
'retries': 42,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=48)
}
# intialize the DAG
dag = DAG(
'airflow_example',
default_args=default_args,
description='A simple example of Apache Airflow',
schedule_interval=timedelta(days=1),
}
# Define custom operators
def api_web_crawler(config):
# retrieve data from an API endpoint
def hdfs_storage(config):
# Store the data to HDFS
config = get_config()
# Listing the operators
op1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
op2 = BashOperator(task_id='introduction',
bash_command='echo Executing Airflow Big Data example',
dag=dag)
op3 = PythonOperator(task_id='api_web',
python_callable=api_web_crawler,
op_kwargs={'config': config},
dag=dag)
op4 = PythonOperator(task_id='hdfs_storage',
python_callable=hdfs_storage,
op_kwargs={'config': config},
dag=dag)
# Setup the depedencies
op1 >> op2 >> op3 >> op4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment