Skip to content

Instantly share code, notes, and snippets.

View saumalya75's full-sized avatar

Saumalya Sarkar saumalya75

  • Wipro Limited
  • Kolkata
View GitHub Profile
{
"dag": {
"dag_id": "configurable_data_pipeline_demo",
"schedule_interval_unit": "days",
"schedule_interval_val": 1,
"default_args": {
"owner": "saumalya75",
"depends_on_past": false,
"start_date": {
"year": 2020,
@saumalya75
saumalya75 / airflow_arena:airflow_on_container:timedelta_helper.py
Last active May 29, 2020 06:22
Helper function to handle date-time related configuration
def _get_timedelta(unit, value):
if unit.upper() == "DAYS":
return timedelta(days=value)
if unit.upper() == "MONTHS":
return timedelta(months=value)
if unit.upper() == "YEARS":
return timedelta(years=value)
if unit.upper() == "HOURS":
return timedelta(hours=value)
if unit.upper() == "MINUTES":
try:
CONFIG_FILE_NAME = "dag_configuration_demo.json"
CONFIG_DB_KEY = "configurable_data_pipeline_demo_config"
config_file = Path(__file__).with_name(CONFIG_FILE_NAME)
with config_file.open() as config_data:
pipeline_config = json.loads(config_data.read())
except Exception as e:
print("Something went wrong while reading the dag configuration: " + str(e))
print("~" * 100)
try:
dag_config = pipeline_config['dag']
if dag_config['schedule_interval_unit'] and dag_config['schedule_interval_val']:
dag_config['schedule_interval'] = _get_timedelta(
dag_config['schedule_interval_unit']
, dag_config['schedule_interval_val']
)
del(dag_config['schedule_interval_unit'])
del(dag_config['schedule_interval_val'])
else:
with DAG(**dag_config) as dag:
# Declare pipeline start and end task
start_task = DummyOperator(task_id='pipeline_start')
end_task = DummyOperator(task_id='pipeline_end')
for account_details in pipeline_config['task_details']['accounts']:
#Declare Account Start and End Task
if account_details['runable']:
acct_start_task = DummyOperator(task_id=account_details['account_id'] + '_start')
acct_start_task.set_upstream(start_task)
import boto3
import os
import re
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
class S3Creds(object):
"""S3 credentials"""
class S3SensorFromProvidedValue(BaseSensorOperator):
@apply_defaults
def __init__(self,
conn_type:str = '',
endpoint_url:str = 'http://127.0.0.1:9000',
bucket_name:str = 'default_bucket',
identifier:str = 'default_ind',
wildcard_match:bool = False,
aws_conn_id='',
from airflow.models.baseoperator import BaseOperator
from hooks.custom_s3_minio_hook import CustomS3MinioHook
from airflow.utils.decorators import apply_defaults
# Demo application integration imports
import myscript as ms
from myscript.run import check
from myscript import run as r
class CustomFileProcessingOperator(BaseOperator):
from airflow.plugins_manager import AirflowPlugin
from sensors.custom_s3_sensor import CustomS3Sensor
from operators.custom_file_load_operator import CustomFileProcessingOperator
# Defining the plugin class
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
sensors = [CustomS3Sensor]
operators = [CustomFileProcessingOperator]
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from hooks.custom_s3_minio_hook import CustomS3MinioHook
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import json, traceback, sys
def _trigger_file_to_xcom(hook, key, bucket, task_instance, task_key):
source_file_details = hook.read_key(key, bucket)
if source_file_details:
source_file_details_json = json.loads(source_file_details)