This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"dag": { | |
"dag_id": "configurable_data_pipeline_demo", | |
"schedule_interval_unit": "days", | |
"schedule_interval_val": 1, | |
"default_args": { | |
"owner": "saumalya75", | |
"depends_on_past": false, | |
"start_date": { | |
"year": 2020, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _get_timedelta(unit, value): | |
if unit.upper() == "DAYS": | |
return timedelta(days=value) | |
if unit.upper() == "MONTHS": | |
return timedelta(months=value) | |
if unit.upper() == "YEARS": | |
return timedelta(years=value) | |
if unit.upper() == "HOURS": | |
return timedelta(hours=value) | |
if unit.upper() == "MINUTES": |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
CONFIG_FILE_NAME = "dag_configuration_demo.json" | |
CONFIG_DB_KEY = "configurable_data_pipeline_demo_config" | |
config_file = Path(__file__).with_name(CONFIG_FILE_NAME) | |
with config_file.open() as config_data: | |
pipeline_config = json.loads(config_data.read()) | |
except Exception as e: | |
print("Something went wrong while reading the dag configuration: " + str(e)) | |
print("~" * 100) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
dag_config = pipeline_config['dag'] | |
if dag_config['schedule_interval_unit'] and dag_config['schedule_interval_val']: | |
dag_config['schedule_interval'] = _get_timedelta( | |
dag_config['schedule_interval_unit'] | |
, dag_config['schedule_interval_val'] | |
) | |
del(dag_config['schedule_interval_unit']) | |
del(dag_config['schedule_interval_val']) | |
else: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with DAG(**dag_config) as dag: | |
# Declare pipeline start and end task | |
start_task = DummyOperator(task_id='pipeline_start') | |
end_task = DummyOperator(task_id='pipeline_end') | |
for account_details in pipeline_config['task_details']['accounts']: | |
#Declare Account Start and End Task | |
if account_details['runable']: | |
acct_start_task = DummyOperator(task_id=account_details['account_id'] + '_start') | |
acct_start_task.set_upstream(start_task) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import os | |
import re | |
from airflow.exceptions import AirflowException | |
from airflow.hooks.S3_hook import S3Hook | |
class S3Creds(object): | |
"""S3 credentials""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class S3SensorFromProvidedValue(BaseSensorOperator): | |
@apply_defaults | |
def __init__(self, | |
conn_type:str = '', | |
endpoint_url:str = 'http://127.0.0.1:9000', | |
bucket_name:str = 'default_bucket', | |
identifier:str = 'default_ind', | |
wildcard_match:bool = False, | |
aws_conn_id='', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.models.baseoperator import BaseOperator | |
from hooks.custom_s3_minio_hook import CustomS3MinioHook | |
from airflow.utils.decorators import apply_defaults | |
# Demo application integration imports | |
import myscript as ms | |
from myscript.run import check | |
from myscript import run as r | |
class CustomFileProcessingOperator(BaseOperator): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.plugins_manager import AirflowPlugin | |
from sensors.custom_s3_sensor import CustomS3Sensor | |
from operators.custom_file_load_operator import CustomFileProcessingOperator | |
# Defining the plugin class | |
class CustomPlugin(AirflowPlugin): | |
name = "custom_plugin" | |
sensors = [CustomS3Sensor] | |
operators = [CustomFileProcessingOperator] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.sensors.base_sensor_operator import BaseSensorOperator | |
from hooks.custom_s3_minio_hook import CustomS3MinioHook | |
from airflow.utils.decorators import apply_defaults | |
from airflow.exceptions import AirflowException | |
import json, traceback, sys | |
def _trigger_file_to_xcom(hook, key, bucket, task_instance, task_key): | |
source_file_details = hook.read_key(key, bucket) | |
if source_file_details: | |
source_file_details_json = json.loads(source_file_details) |
OlderNewer