This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
A python script to run many "airflow run dags trigger {dag} -e {execution_datetime}" commands via the airflow rest api. | |
Example usage: | |
python airflow_trigger_dags.py --dag 'dev_dag' --start '2021-10-01 00:00:01' --end '2021-10-31 00:00:01' | |
python airflow_trigger_dags.py -d 'dev_dag' -s '2022-05-20 00:00:01' -e '2022-05-24 00:00:01' | |
Example usage to just trigger dag for now: | |
python airflow_trigger_dags.py -d 'dev_dag' -s 'now' | |
""" | |
import argparse | |
from datetime import datetime, timedelta | |
import os | |
import requests | |
from dotenv import load_dotenv | |
from pprint import pprint | |
from requests.auth import HTTPBasicAuth | |
load_dotenv() | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-d", "--dag", help="name of dag to run.", type=str) | |
parser.add_argument("-s", "--start", help="start date.", type=str) | |
parser.add_argument("-e", "--end", help="end date.", type=str) | |
args = parser.parse_args() | |
def main(): | |
# get airflow host,user,pwd from ENV variables | |
airflow_host = os.getenv("AIRFLOW_HOST") | |
airflow_username = os.getenv("AIRFLOW_USERNAME") | |
airflow_password = os.getenv("AIRFLOW_PASSWORD") | |
# handle args | |
dag = args.dag | |
if args.start == 'now': | |
start = datetime.now() - timedelta(hours=1) | |
end = start + timedelta(days=1) | |
else: | |
start = datetime.strptime(args.start, '%Y-%m-%d %H:%M:%S') | |
end = datetime.strptime(args.end, '%Y-%m-%d %H:%M:%S') | |
days = (end - start).days | |
print(dag, start, end, days) | |
# trigger dag for each day | |
for i in range(days): | |
execution_datetime = (start + timedelta(days=i)).strftime('%Y-%m-%dT%H:%M:%SZ') | |
print(f'dag={dag}, e={execution_datetime}') | |
url = f'http://{airflow_host}/api/v1/dags/{dag}/dagRuns' | |
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} | |
data = { | |
'dag_run_id': f'manual_api_{execution_datetime}', | |
'logical_date': execution_datetime, | |
} | |
result = requests.post( | |
url, | |
json=data, | |
headers=headers, | |
auth=HTTPBasicAuth(airflow_username, airflow_password) | |
) | |
result_json = result.json() | |
pprint(result_json) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment