-
-
Save Tandoy/a74cb53e2d166bb791ba8269dee380a4 to your computer and use it in GitHub Desktop.
COMPLEMENT_DATA.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import time | |
import requests | |
def main(server_id,userName,userPassword,projectName,process_name,sec_process_name,scheduleTime): | |
# 根据调度时间和时间间隔处理调度时间 | |
split_time = scheduleTime.split(",") | |
# 海豚调度系统接口调用 | |
# 完成登录后execute_headers | |
execute_headers = { | |
"token": "xxxxxxxxxxxxxxxxx" | |
} | |
# 获取指定项目下的工作流定义 | |
process_dict = {} # 流程对应字典集 | |
process_list_url = "/dolphinscheduler/projects/" + projectName + "/process/list" # 根据项目名称获取工作流列表接口 | |
process_get = requests.get(url=server_id + process_list_url, headers=execute_headers) | |
process_list = process_get.json()["data"] | |
for process_def in process_list: | |
id = process_def["id"] | |
name = process_def["name"] | |
process_dict[name] = id | |
# 轮询判断调度执行时间 | |
for i in range(0,len(split_time) - 1,1): | |
execute_day = split_time[i] # 调度执行时间 | |
execute_time = execute_day + "," + execute_day | |
print("当前调度时间:", execute_day) | |
if i <= len(split_time) - 1: #第一工作流执行时间限定为调度中止时间 | |
while True: | |
time.sleep(10) | |
# 根据流程定义ID查询当前流程是否存在未完成任务流程 - 查询流程实例 | |
process_instance = "/dolphinscheduler/projects/" + projectName + "/instance/list-paging" | |
process_instance_param_running = { | |
"projectName": projectName, | |
"pageNo": 1, | |
"pageSize": 2, | |
"processDefinitionId": process_dict[process_name], | |
"stateType": "RUNNING_EXECUTION" | |
# SUCCESS,FAILURE,WAITTING_THREAD,RUNNING_EXECUTION,SUBMITTED_SUCCESS | |
} | |
instance_get_running = requests.get(url=server_id + process_instance, | |
params=process_instance_param_running, headers=execute_headers) | |
running_count = instance_get_running.json()["data"]["total"] # 工作流stateType对应状态的任务数量 | |
# 查询已提交成功工作流 | |
process_instance_param_submitted = { | |
"projectName": projectName, | |
"pageNo": 1, | |
"pageSize": 2, | |
"processDefinitionId": process_dict[process_name], | |
"stateType": "SUBMITTED_SUCCESS" | |
# SUCCESS,FAILURE,WAITTING_THREAD,RUNNING_EXECUTION,SUBMITTED_SUCCESS | |
} | |
instance_get_submitted = requests.get(url=server_id + process_instance,params=process_instance_param_submitted, headers=execute_headers) | |
submitted_count = instance_get_submitted.json()["data"]["total"] | |
total_count = running_count + submitted_count # 提交成功+在运行状态工作流 | |
print("total_count:", total_count) | |
if total_count == 0: # 正在执行的流程数量为0,可以执行下一次调度 | |
# 工作流执行接口 | |
start_process = "/dolphinscheduler/projects/" + projectName + "/executors/start-process-instance" | |
start_process_param = { | |
"failureStrategy": "CONTINUE", # END,CONTINUE | |
"processDefinitionId": process_dict[process_name], | |
"processInstancePriority": "MEDIUM", | |
"projectName": projectName, | |
"scheduleTime": execute_time, | |
"warningGroupId": 0, | |
"warningType": "NONE", | |
"execType": "COMPLEMENT_DATA", # 补数:COMPLEMENT_DATA,执行工作流:START_PROCESS | |
"runMode": "RUN_MODE_SERIAL" | |
} | |
r = requests.post(url=server_id + start_process, data=start_process_param,headers=execute_headers) # 工作流执行接口 | |
print("data:",start_process_param) | |
print("execute_headers:",execute_headers) | |
print("post:", r.text) | |
break # 当次调度执行完成,退出循环 | |
time.sleep(20) # sleep time 秒 | |
if sec_process_name != "": # 合并流程从第二次调度时间开始执行前一批次补数 | |
if i > 0 : # 有合并任务需要执行 | |
sec_execute_day = scheduler_start_time + datetime.timedelta(days=i - time_interval) # 合并任务调度执行时间 | |
sec_execute_day_string = sec_execute_day.strftime("%Y-%m-%d %H:%M:%S") | |
sec_next_execute_time = sec_execute_day_string + "," + sec_execute_day_string | |
if i > days+1-time_interval : | |
while True: | |
time.sleep(10) | |
# 根据流程定义ID查询当前流程是否存在未完成任务流程 - 查询流程实例 | |
process_instance = "/dolphinscheduler/projects/" + projectName + "/instance/list-paging" | |
process_instance_param_running = { | |
"projectName": projectName, | |
"pageNo": 1, | |
"pageSize": 2, | |
"processDefinitionId": process_dict[process_name], | |
"stateType": "RUNNING_EXECUTION" | |
# SUCCESS,FAILURE,WAITTING_THREAD,RUNNING_EXECUTION,SUBMITTED_SUCCESS | |
} | |
instance_get_running = requests.get(url=server_id + process_instance, | |
params=process_instance_param_running, | |
headers=execute_headers) | |
running_count = instance_get_running.json()["data"]["total"] # 工作流stateType对应状态的任务数量 | |
# 查询已提交成功工作流 | |
process_instance_param_submitted = { | |
"projectName": projectName, | |
"pageNo": 1, | |
"pageSize": 2, | |
"processDefinitionId": process_dict[process_name], | |
"stateType": "SUBMITTED_SUCCESS" | |
} | |
instance_get_submitted = requests.get(url=server_id + process_instance, | |
params=process_instance_param_submitted, | |
headers=execute_headers) | |
submitted_count = instance_get_submitted.json()["data"]["total"] | |
total_count = running_count + submitted_count # 提交成功+在运行状态工作流 | |
if total_count == 0: | |
break #判断最后一次前置任务完成后,中断循环,执行合并流程 | |
# 工作流执行接口 | |
start_process = "/dolphinscheduler/projects/" + projectName + "/executors/start-process-instance" | |
start_process_param = { | |
"failureStrategy": "CONTINUE", # END,CONTINUE | |
"processDefinitionId": process_dict[sec_process_name], | |
"processInstancePriority": "MEDIUM", | |
"projectName": projectName, | |
"scheduleTime": sec_next_execute_time, | |
"warningGroupId": 0, | |
"warningType": "NONE", | |
"execType": "COMPLEMENT_DATA", # 补数:COMPLEMENT_DATA,执行工作流:START_PROCESS | |
"runMode": "RUN_MODE_SERIAL" | |
} | |
requests.post(url=server_id + start_process, data=start_process_param,headers=execute_headers) # 工作流执行接口 | |
if __name__ == '__main__': | |
server_id = "http://xxxx:12345" # 服务器地址 | |
userName ="xxxx", | |
userPassword = "xxx" | |
projectName = '${projectName}' #项目名称 "mgt_behavior_flow" | |
process_name = '${process_name}' #需要执行的工作流名称 "tapd_story" | |
sec_process_name = '${sec_process_name}' #需要执行合并流程,合并工作流名称,不需要合并默认 "" | |
scheduleTime = '${scheduleTime}' # 调度时间 例:2020-01-31 00:00:00,2020-02-29 00:00:00 | |
main(server_id,userName,userPassword,projectName,process_name,sec_process_name,scheduleTime) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment