Skip to content

Instantly share code, notes, and snippets.

@Tandoy

Tandoy/py Secret

Last active July 20, 2022 03:13
Show Gist options
  • Save Tandoy/a74cb53e2d166bb791ba8269dee380a4 to your computer and use it in GitHub Desktop.
Save Tandoy/a74cb53e2d166bb791ba8269dee380a4 to your computer and use it in GitHub Desktop.
COMPLEMENT_DATA.py
import datetime
import time
import requests
def main(server_id,userName,userPassword,projectName,process_name,sec_process_name,scheduleTime):
# 根据调度时间和时间间隔处理调度时间
split_time = scheduleTime.split(",")
# 海豚调度系统接口调用
# 完成登录后execute_headers
execute_headers = {
"token": "xxxxxxxxxxxxxxxxx"
}
# 获取指定项目下的工作流定义
process_dict = {} # 流程对应字典集
process_list_url = "/dolphinscheduler/projects/" + projectName + "/process/list" # 根据项目名称获取工作流列表接口
process_get = requests.get(url=server_id + process_list_url, headers=execute_headers)
process_list = process_get.json()["data"]
for process_def in process_list:
id = process_def["id"]
name = process_def["name"]
process_dict[name] = id
# 轮询判断调度执行时间
for i in range(0,len(split_time) - 1,1):
execute_day = split_time[i] # 调度执行时间
execute_time = execute_day + "," + execute_day
print("当前调度时间:", execute_day)
if i <= len(split_time) - 1: #第一工作流执行时间限定为调度中止时间
while True:
time.sleep(10)
# 根据流程定义ID查询当前流程是否存在未完成任务流程 - 查询流程实例
process_instance = "/dolphinscheduler/projects/" + projectName + "/instance/list-paging"
process_instance_param_running = {
"projectName": projectName,
"pageNo": 1,
"pageSize": 2,
"processDefinitionId": process_dict[process_name],
"stateType": "RUNNING_EXECUTION"
# SUCCESS,FAILURE,WAITTING_THREAD,RUNNING_EXECUTION,SUBMITTED_SUCCESS
}
instance_get_running = requests.get(url=server_id + process_instance,
params=process_instance_param_running, headers=execute_headers)
running_count = instance_get_running.json()["data"]["total"] # 工作流stateType对应状态的任务数量
# 查询已提交成功工作流
process_instance_param_submitted = {
"projectName": projectName,
"pageNo": 1,
"pageSize": 2,
"processDefinitionId": process_dict[process_name],
"stateType": "SUBMITTED_SUCCESS"
# SUCCESS,FAILURE,WAITTING_THREAD,RUNNING_EXECUTION,SUBMITTED_SUCCESS
}
instance_get_submitted = requests.get(url=server_id + process_instance,params=process_instance_param_submitted, headers=execute_headers)
submitted_count = instance_get_submitted.json()["data"]["total"]
total_count = running_count + submitted_count # 提交成功+在运行状态工作流
print("total_count:", total_count)
if total_count == 0: # 正在执行的流程数量为0,可以执行下一次调度
# 工作流执行接口
start_process = "/dolphinscheduler/projects/" + projectName + "/executors/start-process-instance"
start_process_param = {
"failureStrategy": "CONTINUE", # END,CONTINUE
"processDefinitionId": process_dict[process_name],
"processInstancePriority": "MEDIUM",
"projectName": projectName,
"scheduleTime": execute_time,
"warningGroupId": 0,
"warningType": "NONE",
"execType": "COMPLEMENT_DATA", # 补数:COMPLEMENT_DATA,执行工作流:START_PROCESS
"runMode": "RUN_MODE_SERIAL"
}
r = requests.post(url=server_id + start_process, data=start_process_param,headers=execute_headers) # 工作流执行接口
print("data:",start_process_param)
print("execute_headers:",execute_headers)
print("post:", r.text)
break # 当次调度执行完成,退出循环
time.sleep(20) # sleep time 秒
if sec_process_name != "": # 合并流程从第二次调度时间开始执行前一批次补数
if i > 0 : # 有合并任务需要执行
sec_execute_day = scheduler_start_time + datetime.timedelta(days=i - time_interval) # 合并任务调度执行时间
sec_execute_day_string = sec_execute_day.strftime("%Y-%m-%d %H:%M:%S")
sec_next_execute_time = sec_execute_day_string + "," + sec_execute_day_string
if i > days+1-time_interval :
while True:
time.sleep(10)
# 根据流程定义ID查询当前流程是否存在未完成任务流程 - 查询流程实例
process_instance = "/dolphinscheduler/projects/" + projectName + "/instance/list-paging"
process_instance_param_running = {
"projectName": projectName,
"pageNo": 1,
"pageSize": 2,
"processDefinitionId": process_dict[process_name],
"stateType": "RUNNING_EXECUTION"
# SUCCESS,FAILURE,WAITTING_THREAD,RUNNING_EXECUTION,SUBMITTED_SUCCESS
}
instance_get_running = requests.get(url=server_id + process_instance,
params=process_instance_param_running,
headers=execute_headers)
running_count = instance_get_running.json()["data"]["total"] # 工作流stateType对应状态的任务数量
# 查询已提交成功工作流
process_instance_param_submitted = {
"projectName": projectName,
"pageNo": 1,
"pageSize": 2,
"processDefinitionId": process_dict[process_name],
"stateType": "SUBMITTED_SUCCESS"
}
instance_get_submitted = requests.get(url=server_id + process_instance,
params=process_instance_param_submitted,
headers=execute_headers)
submitted_count = instance_get_submitted.json()["data"]["total"]
total_count = running_count + submitted_count # 提交成功+在运行状态工作流
if total_count == 0:
break #判断最后一次前置任务完成后,中断循环,执行合并流程
# 工作流执行接口
start_process = "/dolphinscheduler/projects/" + projectName + "/executors/start-process-instance"
start_process_param = {
"failureStrategy": "CONTINUE", # END,CONTINUE
"processDefinitionId": process_dict[sec_process_name],
"processInstancePriority": "MEDIUM",
"projectName": projectName,
"scheduleTime": sec_next_execute_time,
"warningGroupId": 0,
"warningType": "NONE",
"execType": "COMPLEMENT_DATA", # 补数:COMPLEMENT_DATA,执行工作流:START_PROCESS
"runMode": "RUN_MODE_SERIAL"
}
requests.post(url=server_id + start_process, data=start_process_param,headers=execute_headers) # 工作流执行接口
if __name__ == '__main__':
server_id = "http://xxxx:12345" # 服务器地址
userName ="xxxx",
userPassword = "xxx"
projectName = '${projectName}' #项目名称 "mgt_behavior_flow"
process_name = '${process_name}' #需要执行的工作流名称 "tapd_story"
sec_process_name = '${sec_process_name}' #需要执行合并流程,合并工作流名称,不需要合并默认 ""
scheduleTime = '${scheduleTime}' # 调度时间 例:2020-01-31 00:00:00,2020-02-29 00:00:00
main(server_id,userName,userPassword,projectName,process_name,sec_process_name,scheduleTime)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment