Skip to content

Instantly share code, notes, and snippets.

@andrewm4894
Last active July 7, 2022 11:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewm4894/d83fe3a9aa194ae40a7c4acb6ee5eb02 to your computer and use it in GitHub Desktop.
Save andrewm4894/d83fe3a9aa194ae40a7c4acb6ee5eb02 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
"""
A python script to run many "airflow run dags trigger {dag} -e {execution_datetime}" commands via the airflow rest api.
Example usage:
python airflow_trigger_dags.py --dag 'dev_dag' --start '2021-10-01 00:00:01' --end '2021-10-31 00:00:01'
python airflow_trigger_dags.py -d 'dev_dag' -s '2022-05-20 00:00:01' -e '2022-05-24 00:00:01'
Example usage to just trigger dag for now:
python airflow_trigger_dags.py -d 'dev_dag' -s 'now'
"""
import argparse
from datetime import datetime, timedelta
import os
import requests
from dotenv import load_dotenv
from pprint import pprint
from requests.auth import HTTPBasicAuth
load_dotenv()
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--dag", help="name of dag to run.", type=str)
parser.add_argument("-s", "--start", help="start date.", type=str)
parser.add_argument("-e", "--end", help="end date.", type=str)
args = parser.parse_args()
def main():
# get airflow host,user,pwd from ENV variables
airflow_host = os.getenv("AIRFLOW_HOST")
airflow_username = os.getenv("AIRFLOW_USERNAME")
airflow_password = os.getenv("AIRFLOW_PASSWORD")
# handle args
dag = args.dag
if args.start == 'now':
start = datetime.now() - timedelta(hours=1)
end = start + timedelta(days=1)
else:
start = datetime.strptime(args.start, '%Y-%m-%d %H:%M:%S')
end = datetime.strptime(args.end, '%Y-%m-%d %H:%M:%S')
days = (end - start).days
print(dag, start, end, days)
# trigger dag for each day
for i in range(days):
execution_datetime = (start + timedelta(days=i)).strftime('%Y-%m-%dT%H:%M:%SZ')
print(f'dag={dag}, e={execution_datetime}')
url = f'http://{airflow_host}/api/v1/dags/{dag}/dagRuns'
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
data = {
'dag_run_id': f'manual_api_{execution_datetime}',
'logical_date': execution_datetime,
}
result = requests.post(
url,
json=data,
headers=headers,
auth=HTTPBasicAuth(airflow_username, airflow_password)
)
result_json = result.json()
pprint(result_json)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment