Skip to content

Instantly share code, notes, and snippets.

@anson0370
Created December 15, 2011 03:58
Show Gist options
  • Save anson0370/1479771 to your computer and use it in GitHub Desktop.
Save anson0370/1479771 to your computer and use it in GitHub Desktop.
core run of kongur
private void run(KongurContext context, ActivityNode currentNode, Long processInstanceId, String nodeType,
PvmExecution executor) throws SchedulerException, BizException {
// 如果有流程实例 先更新流程实例 再去做节点内容
// 先select再update 避免mysql中的死锁
if (processInstanceId != null) {
Activity activity = activityDAO.selectFailedByProcessInstanceIdAndActivityName(processInstanceId,
currentNode.getName());
if (activity == null) {
if (nodeType.endsWith(NodeExecutor.NOTIFY_EXECUTOR)) {
// 这种情况允许继续执行 去重在节点的notify执行器中做
// 现在可能的就是taskComplete和webpageComplete
logger.info("notify当前节点时没有未完成的activity记录[processId:" + processInstanceId + " activityName:"
+ currentNode.getName() + "],继续执行");
// 直接插入成功的activity记录
saveActivityRecord(context, currentNode, processInstanceId, KongurContextKeys.STATUS_NOTIFY, true);
} else {
logger.warn("不存在未完成的activity记录[processId:" + processInstanceId + " activityName:"
+ currentNode.getName() + "]");
// 挂起流程
context.setStatus(KongurContextKeys.STATUS_WAIT);
return;
}
} else {
// 乐观锁去重
int rows = activityDAO.updateActivityToSuccess(activity.getId());
if (rows == 0) {
logger.warn("重复执行被拦截[activityId:" + activity.getId() + "]");
// 挂起流程
context.setStatus(KongurContextKeys.STATUS_WAIT);
return;
}
}
}
logger.debug("ready for execution " + "[nodeName:" + currentNode.getName() + ", nodeType:" + nodeType
+ ", executor:" + executor.getClass().getName() + "]");
context.getProps().put(VariableKeys.CURRENT_PROCESS_NODE_NAME, currentNode.getName());
context.getProps().put(VariableKeys.CURRENT_PROCESS_NODE_TYPE, currentNode.getType());
try {
// 执行执行器
ExecutionExecutor.execute(executor, context);
// 异常处理
ExceptionStrategyHelper.doExceptionStrategy(context, currentNode);
// 执行后置拦截器
InterceptorExecutor.runAfterInterceptors(context);
} catch (ExecutionException e) {
throw new SchedulerException("执行中出错,执行器:" + executor.getClass().getName(), e);
}
if (processInstanceId != null) {
variableHandler.persistVariables(context.getProps(), processInstanceId, context.getProcessDefinition()
.getBizApp(), context.getProcessDefinition().getBizModule());
}
// 知道下一步要去哪里就应该在当前事务中预先创建下一步的activity记录
if (context.isReady() || context.isAsync()) {
//异常策略会直接设置currentNode
if (!"end".equals(context.getCurrentNode().getType())) {
context.setCurrentNode(computeNextNode(context));
}
if (processInstanceId != null) {
ProcessInstance processInstance = processInstanceDAO.selectByPrimaryKey(processInstanceId);
processInstance.setLastActivity(processInstance.getCurrntActivity());
processInstance.setCurrntActivity(context.getCurrentNode().getName());
processInstance.setGmtModified(new Date());
processInstanceDAO.updateByPrimaryKey(processInstance);
saveActivityRecord(context, context.getCurrentNode(), processInstanceId,
KongurContextKeys.STATUS_READY, false);
}
} else if (context.isFork()) {
// 对于fork的节点 就把所有fork路径开始的activity都创建了
context.getPrivateProps().put(KongurContextKeys.CURRENT_FORK_ACTIVITY_LIST,
forkActivity(context, currentNode, processInstanceId));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment