Skip to content

Instantly share code, notes, and snippets.

@CribberSix
Created January 23, 2022 10:44
Show Gist options
  • Save CribberSix/1f7b1621130219b320933bc3090390ee to your computer and use it in GitHub Desktop.
Save CribberSix/1f7b1621130219b320933bc3090390ee to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
from airflow.utils.trigger_rule import TriggerRule
import json
from datetime import datetime
import requests
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # disable warnings
def get_nifi_processor():
print("Testing connection to NiFi...")
CONN_ID = 'mynifi_connection'
CONN_NIFI = BaseHook.get_connection(CONN_ID)
nifi_api_url = CONN_NIFI.host + ":" + str(CONN_NIFI.port) + "/nifi-api"
# Replace with your own ID!
processor_id = '1462a26f-0000-0000-0000-35550911fe94'
# you could pass a bearer auth-token via the header!
header = {'Content-Type':'application/json'}
rest_endpoint = f'{nifi_api_url}/processors/{processor_id}'
print(f"Rest endpoint: {rest_endpoint}")
# GET processor and parse to JSON
response = requests.get(rest_endpoint
, headers=header
, verify=False)
print(f"Response: {response}")
print(f"Processor JSON: {json.loads(response.content)}")
print("Finished connection-test to NiFi.")
def on_failure():
"""The method gets executed only if at least one previous task fails."""
print("something failed... cleaning up.")
with DAG(
dag_id='hello_nifi',
start_date=datetime(2021, 9, 5),
schedule_interval='0 3 * * 7', # on Sunday at 03:00:00
catchup=False,
tags=['something'],
) as dag:
startup_task = PythonOperator(
task_id='get_nifi_processor_task',
python_callable=get_nifi_processor,
provide_context=False,
)
failure_logging = PythonOperator(
task_id='failure_logging',
python_callable=on_failure,
trigger_rule=TriggerRule.ONE_FAILED,
provide_context=False,
)
startup_task > failure_logging
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment