Skip to content

Instantly share code, notes, and snippets.

@CribberSix
Last active October 24, 2021 17:45
Show Gist options
  • Save CribberSix/7c276caa551db1728cfc1011da59e461 to your computer and use it in GitHub Desktop.
Save CribberSix/7c276caa551db1728cfc1011da59e461 to your computer and use it in GitHub Desktop.
def wait_for_update():
"""DAG task."""
# Initialize the following variables according to your setup / needs:
url_nifi_api = "https://our.cluster.address.com:9443/nifi-api/"
processor_id = "" # e.g. pass them via the DAG's `provide_context` functionality
access_payload = "" # e.g. retrieve the via Airflow's `BaseHook` functionality
timestamp_property= "last_tms" # the processor's attribute name
token = get_token(url_nifi_api, access_payload)
# Get current timestamp
processor_state = get_processor_state(url_nifi_api, processor_id, token=token)
value_start = parse_state(processor_state, timestamp_property)
# query and wait until an update happens or we time out.
while True:
processor_state = get_processor_state(url_nifi_api, processor_id, token=token)
value_current = parse_state(processor_state, timestamp_property)
if value_start == value_current:
print("Waiting...")
pause(60)
else:
print(f"Update found: {value_current}")
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment