Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samklr/a600aad7c4bda8a389f02deb2f73563c to your computer and use it in GitHub Desktop.
Save samklr/a600aad7c4bda8a389f02deb2f73563c to your computer and use it in GitHub Desktop.
Generate dynamic dag
with DAG(**dag_config) as dag:
# Declare pipeline start and end task
start_task = DummyOperator(task_id='pipeline_start')
end_task = DummyOperator(task_id='pipeline_end')
for account_details in pipeline_config['task_details']['accounts']:
#Declare Account Start and End Task
if account_details['runable']:
acct_start_task = DummyOperator(task_id=account_details['account_id'] + '_start')
acct_start_task.set_upstream(start_task)
acct_end_task = DummyOperator(task_id=account_details['account_id'] + '_end')
for file_details in account_details['files']:
if file_details['runable']:
minio_sensor = CustomS3Sensor(
task_id=file_details['file_id'] + "_minio_sensor"
, poke_interval=10
, retry_delay=timedelta(seconds=25)
, xcom_task_id_key=file_details['file_id'] + "_minio_sensor_xcom_key"
, refresh_xcom=True
, conn_type="minio"
, endpoint_url="http://minio:9000"
, bucket_name="airflow"
, identifier=file_details['trigger_prefix'] + "/" + file_details['trigger_identifier'] + datetime.now().strftime("%Y%m%d") + ".txt"
, wildcard_match=False
, dag=dag
)
s3_sensor = CustomS3Sensor(
task_id=file_details['file_id'] + "_s3_sensor"
, poke_interval=10
, retry_delay=timedelta(seconds=25)
, xcom_task_id_key=file_details['file_id'] + "_s3_sensor_xcom_key"
, refresh_xcom=False
, conn_type="aws"
, wildcard_match=False
, from_xcom=True
, xcom_source_task_id=file_details['file_id'] + "_minio_sensor"
, xcom_key=file_details['file_id'] + "_minio_sensor_xcom_key"
, dag=dag
)
data_processor = CustomFileProcessingOperator(
task_id=file_details['file_id'] + "_data_processor"
, xcom_task_id_key=file_details['file_id'] + "_data_processor_xcom_key"
, xcom_source_task_id=file_details['file_id'] + "_minio_sensor"
, xcom_key=file_details['file_id'] + "_minio_sensor_xcom_key"
, source_bed_type="aws"
, source_conn_id="aws_default"
, dag=dag
)
minio_sensor.set_upstream(acct_start_task)
s3_sensor.set_upstream(minio_sensor)
data_processor.set_upstream(s3_sensor)
acct_end_task.set_upstream(data_processor)
end_task.set_upstream(acct_end_task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment