Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save IFellowSchool/ff22e456be970957230aafb94d613438 to your computer and use it in GitHub Desktop.
Save IFellowSchool/ff22e456be970957230aafb94d613438 to your computer and use it in GitHub Desktop.
package ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.service.impl;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import ru.sber.df.epmp.cc.ce.cm.client.CalculationEngineClient;
import ru.sber.df.epmp.cc.ce.cm.client.CalculationSchedulerClient;
import ru.sber.df.epmp.cc.ce.cm.client.constant.BlockType;
import ru.sber.df.epmp.cc.ce.cm.client.response.ResponseBlockEntities;
import ru.sber.df.epmp.cc.ce.cm.client.response.UpdateScenarioDescriptionResponse;
import ru.sber.df.epmp.cc.ce.cm.common.api.dto.BlockingObjectDto;
import ru.sber.df.epmp.cc.ce.cm.common.api.dto.LockDetailedInfoDto;
import ru.sber.df.epmp.cc.ce.cm.meta.dto.CmMetaObjectDto;
import ru.sber.df.epmp.cc.ce.cm.meta.dto.EditableObjectData;
import ru.sber.df.epmp.cc.ce.cm.meta.dto.ProjectVersionDto;
import ru.sber.df.epmp.cc.ce.cm.meta.exception.MetaObjectException;
import ru.sber.df.epmp.cc.ce.cm.meta.model.CmMetaObjectTypeEnum;
import ru.sber.df.epmp.cc.ce.cm.meta.model.ProjectVersion;
import ru.sber.df.epmp.cc.ce.cm.meta.service.CmMetaObjectService;
import ru.sber.df.epmp.cc.ce.cm.meta.service.ProjectService;
import ru.sber.df.epmp.cc.ce.cm.project.dto.BaseDeployResponse;
import ru.sber.df.epmp.cc.ce.cm.project.exception.ProjectEditException;
import ru.sber.df.epmp.cc.ce.cm.project.service.AbstractEditObjectService;
import ru.sber.df.epmp.cc.ce.cm.repo.service.ProjectListService;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ConnectionDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ExpressionDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ExtendedMappingDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.FormulaVariableDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.InputDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.JoinTypeDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.NodeDetailsDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.OutputDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.RangeDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ScenarioGraphDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ScenarioHistoryDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ScenarioJsonDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ScenarioNodeInfoDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ScenarioPropertiesDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ScenarioViewDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.ShortedScenarioDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.TargetColumnDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.dto.VariableNodeDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.CreateNewNodeRequest;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.CreateNewProjectRequest;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.DeployScenarioClientRequest;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.UpdateNodesEdgesRequest;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.UpdateScenarioNodeRequest;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.UpdateScenarioPropertiesRequest;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.request.WindowOutputDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.api.response.ScenarioProjectResponse;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.EdgeNodes;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.EditModeState;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.ScenarioNode;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.ScenarioProperty;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.ScenarioType;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.Variable;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.input.Input;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.input.MappingObject;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.operation.Join;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.operation.Source;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.operation.Window;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.output.ExpressionVariable;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.output.Output;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.exception.ScenarioNotFoundException;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.exception.ScenarioPanelException;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.exception.ScenarioUpdateException;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.mapper.ScenarioHistoryMapper;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.mapper.ShortedScenarioMapper;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.model.ScenarioEditVersion;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.DefaultNodeOutputBuilder;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.DictionaryService;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.JoinNodeOutputBuilder;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.NodeOutputBuilder;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.ScenarioInfoDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.ScenarioNodeFactory;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.ScenarioViewService;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.UnionNodeOutputBuilder;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.modifier.input.DefaultInputModifier;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.modifier.input.InputModifier;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.modifier.input.JoinNodeInputModifier;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.modifier.input.ResultNodeInputModifier;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.modifier.output.OutputModifier;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.service.modifier.output.UnionOutputsModifier;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.utils.ScenarioGraphUtils;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.utils.ScenarioNodeUtils;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.ScenaioSerioDeployException;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.ScenarioNodeWrapper;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.api.controller.Condition;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.api.controller.InputNode;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.api.controller.NodeDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.api.dto.ScenarioDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.api.dto.ScenarioPropertyDto;
import ru.sber.df.epmp.cc.ce.cm.scenariodesigner.v1.service.ScenarioService;
import ru.sber.df.epmp.cc.ce.cm.security.SecurityUtils;
import ru.sber.df.epmp.cc.ce.cm.spaces.model.ProjectSpaceObjectType;
import ru.sber.df.epmp.cc.ce.cm.spaces.service.ProjectSpaceObjectService;
import ru.sber.df.epmp.cc.ce.ds.helper.uuid.Guid;
import ru.sber.df.epmp.cc.ce.ds.model.ce.Scenario;
import ru.sber.df.epmp.cc.ce.ds.model.ce.ScenarioDescription;
import ru.sber.df.epmp.cc.ce.ds.model.ce.ScenarioVersion;
import ru.sber.df.epmp.cc.ce.ds.model.ce.ScenarioVersionPrimaryKey;
import ru.sber.df.epmp.cc.ce.ds.repository.ce.ScenarioRepository;
import ru.sber.df.epmp.cc.ce.ds.repository.ce.ScenarioVersionRepository;
import ru.sber.df.epmp.cc.ce.pe.api.request.select.OrderingType;
import ru.sber.df.epmp.cc.ce.sf.constants.AttributeType;
import ru.sber.df.epmp.cc.ce.sf.constants.DataType;
import ru.sber.df.epmp.cc.ce.sf.constants.JoinType;
import ru.sber.df.epmp.cc.ce.sf.constants.NodeType;
import ru.sber.df.epmp.cc.ce.sf.constants.OperationType;
import ru.sber.df.epmp.cc.ce.sf.constants.WindowType;
import ru.sber.df.epmp.cc.ce.sf.util.LoggingUtils;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import static ru.sber.df.epmp.cc.ce.sf.util.LoggingUtils.OBJECT_ID_NO_ID;
import static ru.sber.df.epmp.cc.ce.sf.util.LoggingUtils.composeLogMessageForCAPJournaling;
@Slf4j
@Service
public class ScenarioServiceImpl extends
AbstractEditObjectService<ScenarioNodeWrapper> implements ScenarioService {
private final ScenarioViewService scenarioViewService;
private final DictionaryService dictionaryService;
private final CalculationEngineClient calculationEngineClient;
private final ScenarioNodeFactory nodeFactory;
private final ScenarioHistoryMapper historyMapper;
private final ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
private final ShortedScenarioMapper shortedScenarioMapper;
private final Map<OperationType, OutputModifier> outputModifiers;
private final ScenarioRepository scenarioRepository;
private final ScenarioVersionRepository versionRepository;
public ScenarioServiceImpl(ProjectService projectService,
ProjectSpaceObjectService projectSpaceObjectService,
CmMetaObjectService cmMetaObjectService,
ProjectListService projectListService,
CalculationSchedulerClient calculationSchedulerClient, DictionaryService dictionaryService,
CalculationEngineClient calculationEngineClient,
ScenarioNodeFactory nodeFactory,
ScenarioHistoryMapper historyMapper,
ShortedScenarioMapper shortedScenarioMapper,
ScenarioViewService scenarioViewService,
List<OutputModifier> outputModifiers,
ScenarioRepository scenarioRepository, ScenarioVersionRepository scenarioVersionRepository) {
super(projectService, projectSpaceObjectService, cmMetaObjectService, projectListService, calculationSchedulerClient);
this.dictionaryService = dictionaryService;
this.calculationEngineClient = calculationEngineClient;
this.nodeFactory = nodeFactory;
this.historyMapper = historyMapper;
this.shortedScenarioMapper = shortedScenarioMapper;
this.scenarioViewService = scenarioViewService;
this.outputModifiers = outputModifiers.stream()
.collect(Collectors.toMap(OutputModifier::getType,
Function.identity()));
this.scenarioRepository = scenarioRepository;
this.versionRepository = scenarioVersionRepository;
}
@Override
protected ScenarioNodeWrapper initEditValue(ProjectVersion projectVersion, Map<String, Object> additionalArgs) {
if (additionalArgs != null) {
log.warn(
"Additional args is passed to init project method, but not used for transformations");
}
return new ScenarioNodeWrapper(ScenarioNodeUtils.parseScenario(projectVersion.getJson()));
}
@Override
protected ScenarioNodeWrapper jsonToEditValue(String json) {
return deserialize(json);
}
@Override
protected BaseDeployResponse doRemoteDeploy(String objectId, boolean isNew, ScenarioNodeWrapper editValue, String objectName, String objectDescription, Map<String, Object> additionalArgs) {
return doRemoteDeploy(objectId, isNew, editValue, objectName, objectDescription);
}
@Override
public ScenarioInfoDto getScenarioInfo(String scenarioId) {
return scenarioRepository.findById(scenarioId)
.map(this::toScenarioInfoDto)
.orElseThrow(() -> new ScenarioNotFoundException(
String.format("Scenario [%s] not found", scenarioId)));
}
private ScenarioInfoDto toScenarioInfoDto(Scenario scenario) {
return ScenarioInfoDto.builder()
.scenarioId(scenario.getScenarioId())
.actualVersion(scenario.getActualVersion())
.username(scenario.getMetaObject().getUserCreate())
.description(scenario.getScenarioDescription() == null
? "" : scenario.getScenarioDescription().getDescription())
.name(scenario.getScenarioDescription() == null
? "" : scenario.getScenarioDescription().getScenarioName())
.createdAt(LocalDateTime.ofInstant(toInstantWithMskTZ(scenario.getMetaObject().getDateTimeCreate()),
ZoneId.systemDefault()))
.build();
}
private Instant toInstantWithMskTZ(LocalDateTime localDateTime) {
return localDateTime.toInstant(ZoneId.of("Europe/Moscow").getRules().getOffset(localDateTime));
}
protected BaseDeployResponse doRemoteDeploy(String objectId,
boolean isNew,
ScenarioNodeWrapper editValue,
String objectName,
String objectDescription) throws ScenaioSerioDeployException {
try {
ScenarioInfoDto infoDto = calculationEngineClient
.deployScenario(objectId, objectMapper.writeValueAsString(editValue.getAll()), isNew, objectName, objectDescription);
return BaseDeployResponse.builder()
.objectId(infoDto.getScenarioId())
.deployedVersion(infoDto.getActualVersion())
.build();
} catch (JsonProcessingException e) {
log.error("Unable to serialize scenario json for deploy", e);
throw new ScenaioSerioDeployException(e);
}
}
@Override
protected void enrichEditableObjectData(String objectId, EditableObjectData objectData) {
String lastDeployedJson = projectListService.getLastDeployedJson(objectId);
ScenarioInfoDto info = ScenarioGraphUtils.buildScenarioGraphDto(lastDeployedJson).getProperties().getScenarioInfo();
BlockingObjectDto<ScenarioDto> scenario = getScenario(objectId);
objectData.setObjectId(info.getScenarioId());
objectData.setObjectName(scenario.getData().getProperties().getName());
objectData.setObjectDescription(scenario.getData().getProperties().getDescription());
objectData.setObjectType(CmMetaObjectTypeEnum.SCENARIO_PROJECT);
}
@Override
protected CmMetaObjectTypeEnum getObjectType() {
return CmMetaObjectTypeEnum.SCENARIO_PROJECT;
}
@Override
protected ScenarioNodeWrapper deserialize(String json) {
try {
objectMapper.registerModule(new JavaTimeModule());
return new ScenarioNodeWrapper(ScenarioNodeUtils.parseScenario(json));
} catch (Throwable exception) {
log.error("Unable to serialize json of scenario [{}]. Cause: [{}]",
json, exception.getMessage());
throw new ProjectEditException(String.format("Unable to serialize json of scenario [%s]. Cause: [%s]",
json, exception.getMessage()));
}
}
@Override
public BlockingObjectDto<ScenarioDto> createScenario(CreateNewProjectRequest scenarioDto) {
try {
String scenarioId = new Guid().getIDC25();
ProjectVersion scenarioObjectProject = projectService.startObjectEdit(scenarioId,
CmMetaObjectTypeEnum.SCENARIO_PROJECT,
scenarioDto.getName(), scenarioDto.getDescription(), true, null, null);
ScenarioEditVersion scenarioEditVersion = new ScenarioEditVersion(scenarioObjectProject);
setScenarioNodes(scenarioObjectProject,
new ScenarioNodeWrapper(List.of(nodeFactory.createPropertyNode(ScenarioProperty.builder()
.scenarioId(scenarioId)
.name(scenarioDto.getName())
.description(scenarioDto.getDescription())
.build()),
nodeFactory.createResultNode())));
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
projectSpaceObjectService.assignScenarioEditToPath(scenarioId, scenarioDto.getPathId());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioDto.getName(),
"Scenario project created successfully '%s'('%s')"));
return getScenario(scenarioId);
} catch (Throwable e) {
log.error(LoggingUtils.composeLogMessageForCAPJournaling(OBJECT_ID_NO_ID, scenarioDto.getName(),
"Failed to create scenario project '%s' because of error: %s", scenarioDto.getName(), e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> getScenario(String scenarioId) {
BlockingObjectDto<ScenarioNodeWrapper> scenarioNode = getBlockingObject(scenarioId);
ScenarioGraphDto graphDto = ScenarioGraphUtils.buildScenarioGraphDto(scenarioNode.getJson());
List<ScenarioNode> scenario = ScenarioNodeUtils.parseScenario(scenarioNode.getJson());
ScenarioPropertyDto propertyDto = scenarioNode.getData().getProperty() == null ? null : getScenarioPropertyDto(scenarioNode.getData().getProperty());
ScenarioDto scenarioDto = ScenarioDto.builder()
.graph(graphDto)
.json(scenarioNode.getJson())
.history(historyMapper.map(versionRepository.findAllByScenarioIdOrderByScenarioVersionDesc(scenarioId)))
.variables(scenario.stream().filter(s -> s.getType().equals(NodeType.__VARIABLE__.getId())).map(ScenarioNodeUtils::toVariableNodeDto).toList())
.properties(propertyDto)
.build();
return BlockingObjectDto.<ScenarioDto>builder()
.lockInfo(scenarioNode.getLockInfo())
.json(scenarioNode.getJson())
.additionalParams(scenarioNode.getAdditionalParams())
.objectId(scenarioNode.getObjectId())
.isProject(scenarioNode.isProject())
.data(scenarioDto)
.build();
}
public BlockingObjectDto<ScenarioDto> getScenario(BlockingObjectDto<ScenarioNodeWrapper> scenarioNode) {
ScenarioGraphDto graphDto = ScenarioGraphUtils.buildScenarioGraphDto(scenarioNode.getJson());
List<ScenarioNode> scenario = ScenarioNodeUtils.parseScenario(scenarioNode.getJson());
ScenarioDto scenarioDto = ScenarioDto.builder()
.graph(graphDto)
.json(scenarioNode.getJson())
.history(null)
.variables(scenario.stream().map(ScenarioNodeUtils::toVariableNodeDto).toList())
.properties(getScenarioPropertyDto(scenarioNode.getData().getProperty()))
.build();
return BlockingObjectDto.<ScenarioDto>builder()
.lockInfo(scenarioNode.getLockInfo())
.json(scenarioNode.getJson())
.additionalParams(scenarioNode.getAdditionalParams())
.objectId(scenarioNode.getObjectId())
.isProject(scenarioNode.isProject())
.data(scenarioDto)
.build();
}
private BlockingObjectDto<ScenarioDto> projectVersionToBlockingScenario(ProjectVersion projectVersion,
ScenarioGraphDto graphDto,
List<ScenarioHistoryDto> historyDto) {
String scenarioId = projectVersion.getObjectId();
BlockingObjectDto<ScenarioNodeWrapper> blockingScenarioWrapper = getBlockingObject(scenarioId);
return blockingWrapperToBlockingScenario(graphDto, historyDto, blockingScenarioWrapper, scenarioId, projectVersion.getJson());
}
private BlockingObjectDto<ScenarioDto> blockingWrapperToBlockingScenario(ScenarioGraphDto graphDto,
List<ScenarioHistoryDto> historyDto,
BlockingObjectDto<ScenarioNodeWrapper> blockingScenarioWrapper,
String scenarioId,
String json) {
ScenarioPropertyDto properties = getScenarioPropertyDto(blockingScenarioWrapper.getData().getProperty());
ScenarioDto scenarioDto = ScenarioDto.builder()
.properties(properties)
.graph(graphDto)
.history(historyDto)
.variables(Optional.ofNullable(variableToDto(blockingScenarioWrapper.getData().getVariable(), scenarioId))
.map(List::of)
.orElse(Collections.emptyList()))
.json(json)
.build();
return BlockingObjectDto.<ScenarioDto>builder()
.objectId(blockingScenarioWrapper.getObjectId())
.isProject(blockingScenarioWrapper.isProject())
.data(scenarioDto)
.additionalParams(blockingScenarioWrapper.getAdditionalParams())
.lockInfo(blockingScenarioWrapper.getLockInfo())
.build();
}
private VariableNodeDto variableToDto(Variable variable, String id) {
if (variable == null) return null;
return VariableNodeDto.builder()
.id(id)
.name(variable.getDefaultValue())
.isBatchVariable(variable.getIsBatchVariable())
// .database() todo DBSettings?
// .dbObjectName() todo DBSettings?
// .dbSchema() todo DBSettings?
.build();
}
private ScenarioPropertyDto getScenarioPropertyDto(ScenarioProperty property) {
CmMetaObjectDto meta = cmMetaObjectService.getMetaObject(property.getScenarioId());
Optional<Scenario> scenario = scenarioRepository.findById(property.getScenarioId());
String description = StringUtils.hasText(property.getDescription())
? property.getDescription()
: scenario.map(Scenario::getDescription).orElse("");
String name = StringUtils.hasText(property.getName())
? property.getName()
: scenario.map(Scenario::getName).orElse("");
return ScenarioPropertyDto.builder()
.id(property.getScenarioId())
.userCreate(meta.getUserCreate())
.userChange(meta.getUserChange())
.dateTimeCreate(meta.getDateTimeCreate())
.dateTimeChange(meta.getDateTimeChange())
.name(name)
.description(description)
.version(property.getScenarioVersion())
.frameworkVersion(property.getVersion())
.build();
}
private BlockingObjectDto<ScenarioDto> getScenarioGraph(ProjectVersion scenarioVersion) {
List<ScenarioVersionRepository.ScenarioHistoryView> oldVersion = null;
return projectVersionToBlockingScenario(scenarioVersion,
ScenarioGraphUtils.buildScenarioGraphDto(scenarioVersion.getJson()),
historyMapper.map(oldVersion)); // todo projectService.getPreviousVersion cycle?
}
@Override
public BlockingObjectDto<ScenarioDto> setEditMode(String scenarioId, boolean state) {
updateEditMode(scenarioId, state ? EditModeState.ON : EditModeState.OFF);
return getScenario(scenarioId);
}
@Override
public BlockingObjectDto<ScenarioDto> createNode(String scenarioId, CreateNewNodeRequest node) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodeList = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode =
nodeFactory.createScenarioNode(node.getInfo().getType(), node.getInfo());
scenarioNodeList.add(scenarioNode);
setScenarioNodes(scenarioEditVersion, scenarioNodeList);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Scenario node '%s' added successfully for scenario '%s'('%s')",
node.getInfo().getName(), scenarioId, scenarioName));
return blockingWrapperToBlockingScenario(
ScenarioGraphUtils.buildScenarioGraphDto(scenarioEditVersion.getScenarioJson()),
null,
getEditableObject(scenarioEditVersion.getInternalProjectVersion()),
scenarioId,
scenarioEditVersion.getScenarioJson());
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to add scenario node '%s' for scenario '%s'('%s') because of error: %s",
node.getInfo().getName(), scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
public BlockingObjectDto<ScenarioNodeWrapper> updateEditMode(String objectId, EditModeState editMode, Optional<ProjectVersionDto> version) {
if (EditModeState.OFF == editMode) {
version = projectService.getLastProjectVersion(
objectId, SecurityUtils.getCurrentUsername());
version.ifPresent(
pv -> calculationSchedulerClient.unblockEntity(pv.getBlockingInstanceId(),
SecurityUtils.getCurrentUsername()));
return getBlockingObject(objectId);
}
ResponseBlockEntities responseBlockEntities = calculationSchedulerClient.blockObject(objectId,
BlockType.EDIT);
if (!responseBlockEntities.getStatus()) {
throw new MetaObjectException(String.format("Unable to block object [%s]", objectId));
}
CmMetaObjectDto cmMetaObject = cmMetaObjectService.getMetaObject(objectId);
EditableObjectData objectData = EditableObjectData.builder()
.objectId(objectId)
.objectType(getObjectType())
.objectName(cmMetaObject.getObjectName())
.build();
enrichEditableObjectData(objectId, objectData);
ProjectVersion projectVersion = projectService
.startObjectEdit(objectData, false, null, responseBlockEntities.getBlockerInstanceId(), null);
LockDetailedInfoDto lockInfo = LockDetailedInfoDto.builder()
.objectId(objectId)
.blockedInstanceId(responseBlockEntities.getBlockerInstanceId())
.blockedType(BlockType.EDIT.name()).userBlocked(SecurityUtils.getCurrentUsername())
.difference(false)
.build();
Map<String, Object> additionalParams = buildAdditionalParams(deserialize(projectVersion.getJson(), objectId));
return BlockingObjectDto.<ScenarioNodeWrapper>builder()
.objectId(objectId)
.data(deserialize(projectVersion.getJson(), objectId))
.isProject(false)
.lockInfo(lockInfo)
.additionalParams(additionalParams)
.build();
}
private BlockingObjectDto<ScenarioNodeWrapper> getEditableObject(ProjectVersion pv) {
String objectId = pv.getObjectId();
String blockedInstanceId = pv.getBlockedInstanceId();
CmMetaObjectDto metaObject = cmMetaObjectService.getMetaObject(objectId);
BlockingObjectDto.BlockingObjectDtoBuilder<ScenarioNodeWrapper> resultBuilder = BlockingObjectDto.builder();
String objectJson = pv.getJson();
if (metaObject.getIsProject()) {
resultBuilder.isProject(true);
} else {
LockDetailedInfoDto lockInfo = calculationSchedulerClient.checkBlock(objectId, blockedInstanceId);
resultBuilder.lockInfo(lockInfo);
if (lockInfo == null || lockInfo.getDifference() == null || lockInfo.getDifference()) {
objectJson = projectListService.getLastDeployedJson(objectId);
}
}
Map<String, Object> additionalParams = buildAdditionalParams(deserialize(objectJson, objectId));
return resultBuilder
.objectId(objectId)
.data(deserialize(objectJson, objectId))
.additionalParams(additionalParams)
.build();
}
@Override
public BlockingObjectDto<ScenarioDto> deleteNode(String scenarioId, String nodeId) {
String scenarioName = null, scenarioNodeName = nodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
Iterator<ScenarioNode> iterator = scenarioNodes.iterator();
while (iterator.hasNext()) {
ScenarioNode scenarioNode = iterator.next();
getInputModifier(scenarioNode).removeEdge(scenarioNode, nodeId);
if (Objects.equals(scenarioNode.getId(), nodeId)) {
if (isResultNode(scenarioNode)) {
throw new ScenarioUpdateException(
String.format("Forbidden to delete the resultNode [%s]",
scenarioNode.getId()));
}
iterator.remove();
scenarioNodeName = scenarioNode.getName();
}
}
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Scenario node '%s' added successfully for scenario '%s'('%s')", scenarioNodeName,
scenarioId, scenarioName));
return blockingWrapperToBlockingScenario(
ScenarioGraphUtils.buildScenarioGraphDto(scenarioEditVersion.getScenarioJson()),
null,
getEditableObject(scenarioEditVersion.getInternalProjectVersion()),
scenarioId,
scenarioEditVersion.getScenarioJson());
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to add scenario node '%s' for scenario '%s'('%s') because of error: %s",
scenarioNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> createNodeEdge(String scenarioId, UpdateNodesEdgesRequest edge) {
String source = edge.getSource();
String target = edge.getTarget();
String scenarioName = null, sourceNodeName = source, targetNodeName = target;
try {
if (source.equals(target)) {
log.warn("addEdge -> Source and target are equals. Edge not created");
throw new ScenarioUpdateException("Source and target are equals. Edge not created");
}
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
EdgeNodes edgeNodes = getEdgesNodes(scenarioNodes, source, target);
sourceNodeName = edgeNodes.getSource().getName();
targetNodeName = edgeNodes.getTarget().getName();
getInputModifier(edgeNodes.getTarget()).addEdge(edgeNodes);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
ScenarioProjectResponse scenarioProjectResponse = ScenarioProjectResponse.builder()
.scenarioId(scenarioEditVersion.getScenarioId())
.scenarioGraph(
ScenarioGraphUtils.buildScenarioGraphDto(scenarioEditVersion.getScenarioJson()))
.build();
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Scenario edge (%s -> %s) added successfully for scenario '%s'('%s')", sourceNodeName,
targetNodeName, scenarioId, scenarioName));
return blockingWrapperToBlockingScenario(
scenarioProjectResponse.getScenarioGraph(),
null,
getEditableObject(scenarioEditVersion.getInternalProjectVersion()),
scenarioId,
scenarioEditVersion.getScenarioJson()
);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to add scenario edge (%s -> %s) for scenario '%s'('%s') because of error: %s",
sourceNodeName, targetNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> deleteNodeEdge(String scenarioId, UpdateNodesEdgesRequest edge) {
String source = edge.getSource();
String target = edge.getTarget();
String scenarioName = null, sourceNodeName = source, targetNodeName = target;
try {
if (source.equals(target)) {
log.warn("removeEdge -> Source and target are equals. Edge not created");
throw new ScenarioUpdateException("Source and target are equals. Edge not created");
}
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
EdgeNodes edgeNodes = getEdgesNodes(scenarioNodes, source, target);
sourceNodeName = edgeNodes.getSource().getName();
targetNodeName = edgeNodes.getTarget().getName();
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
ScenarioProjectResponse scenarioProjectResponse = ScenarioProjectResponse.builder()
.scenarioId(scenarioEditVersion.getScenarioId())
.scenarioGraph(
ScenarioGraphUtils.buildScenarioGraphDto(scenarioEditVersion.getScenarioJson()))
.build();
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Scenario edge (%s -> %s) removed successfully for scenario '%s'('%s')", sourceNodeName,
targetNodeName, scenarioId, scenarioName));
return blockingWrapperToBlockingScenario(
scenarioProjectResponse.getScenarioGraph(),
null,
getEditableObject(scenarioEditVersion.getInternalProjectVersion()),
scenarioId,
scenarioEditVersion.getScenarioJson()
);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to remove scenario edge (%s -> %s) for scenario '%s'('%s') because of error: %s",
sourceNodeName, targetNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> createVariable(String scenarioId, VariableNodeDto variable) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = new ArrayList<>(ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson()));
ScenarioNode variableNode = nodeFactory.createVariableNode(variable);
scenarioNodes.add(variableNode);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Variable node '%s' created successfully for scenario '%s'('%s')",
variable.getName(), scenarioId, scenarioName));
return getScenario(scenarioId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to create variable node '%s' for scenario '%s'('%s') because of error: %s",
variable.getName(), scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> updateVariable(String scenarioId, VariableNodeDto variable) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode variableNode = findScenarioNode(scenarioNodes, variable.getId());
if (!NodeType.__VARIABLE__.getId().equals(variableNode.getType())) {
throw new ScenarioUpdateException(
String.format("Node [%s] is not variable", variable.getId()));
}
variableNode.setName(variable.getName());
if (variableNode.getOperation().getSource() == null) {
variableNode.getOperation().setSource(new Source());
}
if (variableNode.getOperation().getSource().getDbObject() == null) {
variableNode.getOperation().getSource().setDbObject(new Source.DbObject());
}
variableNode.getOperation().getSource().getDbObject()
.setName(variable.getDbObjectName());
variableNode.getOperation().getSource().getDbObject()
.setDatabase(variable.getDatabase());
variableNode.getOperation().getSource().getDbObject()
.setSchema(variable.getDbSchema());
if (variableNode.getVariable() == null) {
variableNode.setVariable(new Variable());
}
variableNode.getVariable().setIsBatchVariable(variable.getIsBatchVariable());
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Variable node '%s' updated successfully for scenario '%s'('%s')",
variable.getName(), scenarioId, scenarioName));
return getScenario(scenarioId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update variable node '%s' for scenario '%s'('%s') because of error: %s",
variable.getName(), scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> deleteVariable(String scenarioId, String nodeId) {
String scenarioName = null;
String[] scenarioNodeName = new String[]{nodeId};
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
scenarioNodes.removeIf(sn -> {
if (nodeId.equals(sn.getId()) && NodeType.__VARIABLE__.getId().equals(sn.getType())) {
scenarioNodeName[0] = sn.getName();
return true;
}
return false;
});
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Variable node '%s' deleted successfully for scenario '%s'('%s')", scenarioNodeName[0],
scenarioId, scenarioName));
return blockingWrapperToBlockingScenario(
ScenarioGraphUtils.buildScenarioGraphDto(scenarioEditVersion.getScenarioJson()),
null,
getEditableObject(scenarioEditVersion.getInternalProjectVersion()),
scenarioId,
scenarioEditVersion.getScenarioJson()
);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to delete variable node '%s' for scenario '%s'('%s') because of error: %s",
scenarioNodeName[0], scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public List<ShortedScenarioDto> getScenario(List<String> objectIds) {
return objectIds.stream().map(id -> {
try {
return cmMetaObjectService.getMetaObject(id);
} catch (MetaObjectException e) {
log.warn("Failed to get scenario '%s' because of error: %s", id, e.getMessage());
}
return null;
})
.filter(Objects::nonNull)
.map(se ->
ShortedScenarioDto.builder()
.id(se.getObjectId())
.user(se.getUserCreate())
.name(se.getObjectName())
.description(null)
.type(se.getIsProject() ? ScenarioType.PROJECT : ScenarioType.SCENARIO)
.objectType(se.getIsProject() ? ProjectSpaceObjectType.SCENARIO_EDIT : ProjectSpaceObjectType.SCENARIO)
.build())
.toList();
}
@Override
public List<ShortedScenarioDto> getUserScenarioProjectsByScenarioIds(List<String> scenarioIds) {
return getScenario(scenarioIds);
}
@Override
public BlockingObjectDto<NodeDto> renameWindowOutput(String scenarioId, String nodeId,
String oldName, String newName) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode windowNode = findScenarioNode(scenarioNodes, nodeId);
if (!Objects.equals(windowNode.getOperation().getType(), OperationType.__WINDOW__.getId())) {
throw new ScenarioUpdateException(
String.format("Impossible to update window output name for not-window node [%s]",
nodeId));
}
Window existingWindow = windowNode.getOperation().getWindow().stream()
.filter(window -> window.getColumn().equals(oldName))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Not found existing window with column name [%s]", oldName)));
existingWindow.setColumn(newName);
Output existingWindowOutput = windowNode.getOutput().stream()
.filter(output -> output.getName().equals(oldName))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Not found existing output with name [%s]", oldName)));
existingWindowOutput.setName(newName);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Window node output '%s' / '%s'->'%s' renamed successfully for scenario '%s'('%s')",
nodeId, oldName, newName, scenarioId, scenarioName));
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to rename window node output '%s' / '%s'->'%s' for scenario '%s'('%s') because of error: %s",
nodeId, oldName, newName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
return getResponseNode(scenarioId, nodeId);
}
private BlockingObjectDto<NodeDto> getResponseNode(String scenarioId, String nodeId) {
BlockingObjectDto<ScenarioNodeWrapper> editableObject = getBlockingObject(scenarioId);
Map<String, ScenarioNode> nodes = editableObject.getData().getAllByMap();
List<InputDto> input = ScenarioNodeUtils.buildNodeInput(nodes.get(nodeId).getInput());
ScenarioNode scenarioNode = nodes.get(nodeId);
Map<String, Join> collect = Optional.ofNullable(scenarioNode.getOperation().getJoin()).orElse(Collections.emptyList()).stream()
.collect(Collectors.toMap(Join::getId, j -> j));
List<InputNode> inputNodes = null;
if (input != null) {
inputNodes = input.stream()
.map(node -> {
ScenarioNode source = nodes.get(node.getId());
return InputNode.builder()
.id(node.getId())
.joinType(collect.containsKey(node.getId()) ? JoinType.valueOf(collect.get(node.getId()).getType()) : null)
.name(source.getName())
.outputs(Optional.ofNullable(source.getOutput()).orElse(Collections.emptyList()).stream().map(ScenarioNodeUtils::toOutputDto).toList())
.build();
}).toList();
}
NodeOutputBuilder nodeOutputBuilder =
getNodeOutputBuilder(OperationType.valueOf(nodes.get(nodeId).getOperation().getType()));
NodeDetailsDto nodeDetailsDto = nodeOutputBuilder.buildNodeOutput(editableObject.getData().getAll(), nodeId);
NodeDto node = getNodeDto(nodeId, nodes, inputNodes, nodeDetailsDto);
return BlockingObjectDto.<NodeDto>builder()
.lockInfo(editableObject.getLockInfo())
.isProject(editableObject.isProject())
.objectId(editableObject.getObjectId())
.additionalParams(editableObject.getAdditionalParams())
.data(node)
.build();
}
@Override
public BlockingObjectDto<NodeDto> updateWindow(String scenarioId, WindowOutputDto node) {
String scenarioName = null;
try {
validateWindowParams(node);
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode windowNode = findScenarioNode(scenarioNodes, node.getNodeId());
if (!Objects.equals(windowNode.getOperation().getType(), OperationType.__WINDOW__.getId())) {
throw new ScenarioUpdateException(
String.format("Impossible to update window output to not-window node [%s]",
node.getNodeId()));
}
Window existingWindow = windowNode.getOperation().getWindow().stream()
.filter(window -> window.getColumn().equals(node.getOutputName()))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Not found existing window with column name [%s]",
node.getOutputName())));
existingWindow.setSort(node.getSortType() == OrderingType.DESC);
existingWindow.setBuckets(node.getBuckets());
existingWindow.setOrder(node.getOrderColumns());
existingWindow.setPartitions(node.getPartitionColumns());
existingWindow.setScalar(node.getScalar());
existingWindow.setOffset(node.getOffset());
Output existingWindowOutput = windowNode.getOutput().stream()
.filter(output -> output.getName().equals(node.getOutputName()))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Not found existing output with name [%s]", node.getOutputName())));
existingWindowOutput.setWindowtype(node.getType().getId());
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Window output '%s' updated successfully for scenario '%s'('%s')",
existingWindow.getColumn(), scenarioId, scenarioName));
return getNodeById(scenarioId, node.getNodeId());
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update window output '%s' for scenario '%s'('%s') because of error: %s",
node.getOutputName(), scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
private void validateWindowParams(WindowOutputDto request) {
WindowType windowType = request.getType();
if (windowType == WindowType.NTILE) {
if (request.getBuckets() == null) {
throwRequiredParamMissedForWindow("buckets", windowType.name());
}
} else if (windowType == WindowType.LAG || windowType == WindowType.LEAD) {
if (request.getScalar() == null) {
throwRequiredParamMissedForWindow("scalar", windowType.name());
}
if (request.getOffset() == null) {
int defaultOffset = 1;
request.setOffset(defaultOffset);
}
}
}
private void throwRequiredParamMissedForWindow(String paramName, String functionName) {
throw new ScenarioUpdateException(
String.format("Param [%s] is required for function [%s]", paramName, functionName));
}
@Override
public BlockingObjectDto<NodeDto> addWindow(String scenarioId, WindowOutputDto node) {
String scenarioName = null;
try {
validateWindowParams(node);
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode windowNode = findScenarioNode(scenarioNodes, node.getNodeId());
if (!Objects.equals(windowNode.getOperation().getType(), OperationType.__WINDOW__.getId())) {
throw new ScenarioUpdateException(
String.format("Impossible to add window output to not-window node [%s]",
node.getNodeId()));
}
Window window = Window.builder()
.id(windowNode.getId())
.column(node.getOutputName())
.sort(node.getSortType() == OrderingType.ASC)
.buckets(node.getBuckets())
.order(node.getOrderColumns())
.partitions(node.getPartitionColumns())
.scalar(node.getScalar())
.offset(node.getOffset()).build();
if (windowNode.getOperation().getWindow() == null) {
windowNode.getOperation().setWindow(new ArrayList<>());
}
windowNode.getOperation().getWindow().add(window);
//user can change datatype later in UI if required
Output windowOutput = Output.builder().type(AttributeType.__WINDOW__.getId())
.name(node.getOutputName())
.datatype(ru.sber.df.epmp.cc.ce.cm.scenariodesigner.domain.output.DataType.builder()
.type(DataType.INTEGER.getId()).build())
.windowtype(node.getType().getId())
.build();
if (windowNode.getOutput() == null) {
windowNode.setOutput(new ArrayList<>());
}
windowNode.getOutput().add(windowOutput);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Window output '%s' created successfully for scenario '%s'('%s')", window.getColumn(),
scenarioId, scenarioName));
return getNodeById(scenarioId, node.getNodeId());
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to create window output '%s' for scenario '%s'('%s') because of error: %s",
node.getOutputName(), scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> updateInputsOrder(String scenarioId, String nodeId, List<String> inputs) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> nodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(nodes, nodeId);
getInputModifier(scenarioNode).updateInputOrder(scenarioNode, inputs);
setScenarioNodes(scenarioEditVersion, nodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
return getNodeById(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update join inputs order for scenario '%s'('%s') because of error: %s",
scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> deleteMapRangeOutput(String scenarioId, String nodeId, String outputName) {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
getOutputModifier(scenarioNode.getOperation().getType()).deleteOutputRange(scenarioNode,
outputName);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
return getNodeById(scenarioId, nodeId);
}
@Override
public BlockingObjectDto<NodeDto> addMapRangeOutput(String scenarioId, String nodeId, String outputName, RangeDto range) {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
getOutputModifier(scenarioNode.getOperation().getType()).updateRange(scenarioNode, outputName,
range);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
return getNodeById(scenarioId, nodeId);
}
private OutputModifier getOutputModifier(Integer type) {
return switch (OperationType.valueOf(type)) {
case __UNION__ -> outputModifiers.get(OperationType.__UNION__);
case __JOIN__ -> outputModifiers.get(OperationType.__JOIN__);
default -> outputModifiers.get(OperationType.__PROJECTION__);
};
}
@Override
public BlockingObjectDto<NodeDto> addRange(String scenarioId, String nodeId, String outputName) {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
getOutputModifier(scenarioNode.getOperation().getType()).createDefaultRange(scenarioNode,
outputName);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
return getNodeById(scenarioId, nodeId);
}
@Override
public BlockingObjectDto<NodeDto> createAutoMapRangeOutput(String scenarioId, String unionNodeId) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
((UnionOutputsModifier) getOutputModifier(OperationType.__UNION__.getId()))
.unionAutoMapping(scenarioNodes, unionNodeId);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Union auto mapping for node '%s' set successfully for scenario '%s'('%s')",
unionNodeId, scenarioId, scenarioName));
return getNodeById(scenarioId, unionNodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to set union auto mapping for node '%s' for scenario '%s'('%s') because of error: %s",
unionNodeId, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> addExpression(String scenarioId, String nodeId, String outputName, String expression) {
String scenarioName = null, scenarioNodeName = nodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioNodeName = scenarioNode.getName();
getOutputModifier(scenarioNode.getOperation().getType())
.updateExpression(scenarioNode, expression, outputName,
dictionaryService.getUserFunctions());
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Expression for node '%s' updated successfully for scenario '%s'('%s')", scenarioNodeName,
scenarioId, scenarioName));
return getNodeById(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update expression for node '%s' for scenario '%s'('%s') because of error: %s",
scenarioNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> renameColumn(String scenarioId, String nodeId, String oldName, String newName) {
String scenarioName = null, scenarioNodeName = nodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioNodeName = scenarioNode.getName();
getOutputModifier(scenarioNode.getOperation().getType())
.renameOutput(scenarioNodes, scenarioNode, oldName, newName);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Node output '%s' / '%s'->'%s' renamed successfully for scenario '%s'('%s')",
scenarioNodeName, oldName, newName, scenarioId, scenarioName));
return getNodeById(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to rename node output '%s' / '%s'->'%s' for scenario '%s'('%s') because of error: %s",
scenarioNodeName, oldName, newName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> createAutoMapSourceOutput(String scenarioId, String unionNodeId) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
((UnionOutputsModifier) getOutputModifier(OperationType.__UNION__.getId()))
.unionAutoMapping(scenarioNodes, unionNodeId);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Union auto mapping for node '%s' set successfully for scenario '%s'('%s')",
unionNodeId, scenarioId, scenarioName));
return getNodeById(scenarioId, unionNodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to set union auto mapping for node '%s' for scenario '%s'('%s') because of error: %s",
unionNodeId, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> updateMapSourceOutput(String scenarioId, String sourceOutput,
String targetOutput,
String sourceNodeId, String scenarioNodeId) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, scenarioNodeId);
getOutputModifier(scenarioNode.getOperation().getType())
.mapSourceOutput(scenarioNodes, scenarioNodeId, sourceNodeId, sourceOutput, targetOutput);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Source output mapping '%s'->'%s' created successfully for scenario '%s'('%s')",
sourceNodeId, scenarioNodeId, scenarioId, scenarioName));
return getNodeById(scenarioId, scenarioNodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to remove source output mapping '%s'->'%s' for scenario '%s'('%s') because of error: %s",
sourceNodeId, scenarioNodeId, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> deleteMapSourceOutput(String scenarioId, String scenarioNodeId,
String sourceNodeId,
String sourceOutput, String targetOutput) {
String scenarioName = null, targetNodeName = scenarioNodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, scenarioNodeId);
targetNodeName = scenarioNode.getName();
getOutputModifier(scenarioNode.getOperation().getType())
.removeTargetMapping(scenarioNode, sourceNodeId, sourceOutput, targetOutput);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Node target mapping '%s'->'%s' removed successfully for scenario '%s'('%s')",
sourceNodeId, targetNodeName, scenarioId, scenarioName));
return getNodeById(scenarioId, scenarioNodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to remove union target mapping '%s'->'%s' for scenario '%s'('%s') because of error: %s",
sourceNodeId, targetNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
private NodeDetailsDto getSourceOutputs(String scenarioId, String nodeId) {
var scenario = getScenario(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(scenario.getData().getJson());
return getNodeDetails(scenarioNodes, nodeId);
}
private NodeDetailsDto getNodeDetails(List<ScenarioNode> nodes, String nodeId) {
ScenarioNode scenarioNode = nodes.stream().filter(n -> n.getId().equals(nodeId)).findFirst().orElse(null);
if (scenarioNode == null) {
log.warn("getNodeDetails -> node [{}] not found", nodeId);
return NodeDetailsDto.builder().build();
}
NodeOutputBuilder nodeOutputBuilder =
getNodeOutputBuilder(OperationType.valueOf(scenarioNode.getOperation().getType()));
return nodeOutputBuilder.buildNodeOutput(nodes, nodeId);
}
@Override
public BlockingObjectDto<NodeDto> createMapSourceOutput(String scenarioId, String sourceOutput, String targetOutput, String sourceNodeId, String scenarioNodeId) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, scenarioNodeId);
getOutputModifier(scenarioNode.getOperation().getType())
.mapSourceOutput(scenarioNodes, scenarioNodeId, sourceNodeId, sourceOutput, targetOutput);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Source output mapping '%s'->'%s' created successfully for scenario '%s'('%s')",
sourceNodeId, scenarioNodeId, scenarioId, scenarioName));
return getNodeById(scenarioId, scenarioNodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to remove source output mapping '%s'->'%s' for scenario '%s'('%s') because of error: %s",
sourceNodeId, scenarioNodeId, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> createTargetNode(String scenarioId, String nodeId, List<TargetColumnDto> targets) {
String scenarioName = null, scenarioNodeName = nodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioNodeName = scenarioNode.getName();
getOutputModifier(scenarioNode.getType()).addTargetColumns(scenarioNodes, scenarioNode,
targets);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Node '%s' targets created successfully for scenario '%s'('%s')", scenarioNodeName,
scenarioId, scenarioName));
return getNodeById(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to create node '%s' targets for scenario '%s'('%s') because of error: %s",
scenarioNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> deleteConnection(String scenarioId, String joinNodeId, String targetNodeId, String sourceColumnName, String targetColumnName) {
String scenarioName = null, sourceNodeName = joinNodeId, targetNodeName = targetNodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> nodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode joinNode = findScenarioNode(nodes, joinNodeId);
sourceNodeName = joinNode.getName();
ScenarioNode targetJoinedNode = findScenarioNode(nodes, targetNodeId);
targetNodeName = targetJoinedNode.getName();
Join join = joinNode.getOperation().getJoin().stream()
.filter(j -> Objects.equals(j.getId(), targetNodeId))
.findFirst()
.orElse(null);
if (join == null) {
log.warn("deleteJoinCondition -> joined node [{}] not found", targetNodeId);
throw new ScenarioUpdateException(String.format("Joined node [%s] not found", targetNodeId));
}
join.getConditions().removeIf(c -> Objects.equals(c.getColumn(), targetColumnName)
&& Objects.equals(c.getConnect(), sourceColumnName));
if (join.getConditions().isEmpty()) {
cleanUpJoinOutputs(joinNode, targetJoinedNode);
join.setType(JoinType.__CROSS__.getId());
}
setScenarioNodes(scenarioEditVersion, nodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Join condition (%s - %s) removed successfully for scenario '%s'('%s')", sourceNodeName,
targetNodeName, scenarioId, scenarioName));
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to remove join condition (%s - %s) for scenario '%s'('%s') because of error: %s",
sourceNodeName, targetNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
return getResponseNode(scenarioId, joinNodeId);
}
private void cleanUpJoinOutputs(ScenarioNode joinNode, ScenarioNode targetJoinedNode) {
Input input = joinNode.getInput().stream()
.filter(i -> Objects.equals(i.getId(), targetJoinedNode.getId()))
.findFirst()
.orElse(null);
if (input == null) {
log.warn("cleanUpOutputs -> Join node [{}] input [{}] is null", joinNode.getId(),
targetJoinedNode.getId());
return;
}
if (input.getMappings() == null || input.getMappings().isEmpty()) {
return;
}
List<MappingObject> mappings = input.getMappings();
for (MappingObject mapping : mappings) {
joinNode.getOutput().removeIf(o -> Objects.equals(o.getName(), mapping.getTarget()));
}
input.setMappings(null);
}
@Override
public BlockingObjectDto<NodeDto> createJoinNode(String scenarioId, String sourceNodeId, String joinNodeId, String columnName, String connectName) {
String scenarioName = null, sourceNodeName = sourceNodeId, targetNodeName = joinNodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> nodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode joinNode = null;
ScenarioNode sourceNode = null;
for (ScenarioNode node : nodes) {
if (Objects.equals(node.getId(), joinNodeId)) {
joinNode = node;
}
if (Objects.equals(node.getId(), sourceNodeId)) {
sourceNode = node;
}
}
if (joinNode == null || !joinNode.getOperation().getType()
.equals(OperationType.__JOIN__.getId())) {
throw new ScenarioUpdateException(String.format("Join node [%s] not found", joinNodeId));
}
if (sourceNode == null) {
throw new ScenarioUpdateException(
String.format("Source node [%s] not found", sourceNodeId));
}
sourceNodeName = sourceNode.getName();
targetNodeName = joinNode.getName();
checkJoinNodeInputs(joinNode, sourceNodeId);
if (joinNode.getOperation().getJoin() == null) {
joinNode.getOperation().setJoin(new ArrayList<>());
}
Join join = joinNode.getOperation().getJoin().stream()
.filter(j -> Objects.equals(j.getId(), sourceNodeId)).findFirst().orElse(null);
if (join == null) {
join = Join.builder().id(sourceNodeId).type(1).conditions(new ArrayList<>()).build();
joinNode.getOperation().getJoin().add(join);
}
checkConnectNameExists(nodes, connectName, joinNode);
checkColumnNameExists(columnName, sourceNode);
if (join.getConditions() == null) {
join.setConditions(new ArrayList<>());
}
join.getConditions().add(new Join.Condition(columnName, connectName));
if (Objects.equals(join.getType(), JoinType.__CROSS__.getId())) {
join.setType(JoinType.__INNER__.getId());
}
setScenarioNodes(scenarioEditVersion, nodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Join condition (%s - %s) created successfully for scenario '%s'('%s')", sourceNodeName,
targetNodeName, scenarioId, scenarioName));
return getResponseNode(scenarioId, joinNodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to created join condition (%s - %s) for scenario '%s'('%s') because of error: %s",
sourceNodeName, targetNodeName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
private void checkColumnNameExists(String columnName, ScenarioNode sourceNode) {
sourceNode.getOutput().stream()
.filter(o -> Objects.equals(o.getName(), columnName))
.findAny()
.orElseThrow(
() -> new ScenarioUpdateException(String.format("Connect [%s] not found in node [%s]",
columnName, sourceNode.getId())));
}
private void checkConnectNameExists(List<ScenarioNode> nodes, String columnName,
ScenarioNode joinNode) {
String mainInputId = joinNode.getInput().get(0).getId();
ScenarioNode targetNode = nodes.stream().filter(n -> Objects.equals(n.getId(), mainInputId))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Input node [%s] not found", mainInputId)));
targetNode.getOutput().stream()
.filter(o -> Objects.equals(o.getName(), columnName))
.findAny()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Column [%s] not found in join main input node [%s]",
columnName, mainInputId)));
}
private void checkJoinNodeInputs(ScenarioNode joinNode, String sourceNodeId) {
Optional.ofNullable(joinNode.getInput()).orElse(Collections.emptyList()).stream()
.filter(input -> Objects.equals(input.getId(), sourceNodeId))
.findAny()
.orElseThrow(
() -> new ScenarioUpdateException(String.format("Node [%s] not found in Join inputs",
sourceNodeId)));
}
@Override
public BlockingObjectDto<NodeDto> deleteScenario(String scenarioId) {
return null;
}
@Override
public BlockingObjectDto<NodeDto> updateScenarioNode(String scenarioId, String nodeId, String sourceNodeId, String columnName) {
String scenarioName = null, scenarioNodeName = nodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioNodeName = scenarioNode.getName();
getOutputModifier(scenarioNode.getOperation().getType())
.addOutput(scenarioNodes, nodeId, sourceNodeId, columnName);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Node output '%s'/'%s' added successfully for scenario '%s'('%s')", scenarioNodeName,
columnName, scenarioId, scenarioName));
return getNodeById(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to add node output '%s'/'%s' for scenario '%s'('%s') because of error: %s",
scenarioNodeName, columnName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> deleteTarget(String scenarioId, String nodeId, String sourceNodeId, String columnName) {
String scenarioName = null, scenarioNodeName = nodeId;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
scenarioName = scenarioEditVersion.getName();
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioNodeName = scenarioNode.getName();
getOutputModifier(scenarioNode.getType()).removeOutput(scenarioNode, columnName);
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Node output '%s'/'%s' removed successfully for scenario '%s'('%s')", scenarioNodeName,
columnName, scenarioId, scenarioName));
return getNodeById(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to remove node output '%s'/'%s' for scenario '%s'('%s') because of error: %s",
scenarioNodeName, columnName, scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public List<ShortedScenarioDto> getScenario() {
List<Scenario> scenarios = scenarioRepository.findAll(Sort.by(Sort.Direction.ASC, "scenarioDescription.description"));
return shortedScenarioMapper.map(scenarios);
}
@Override
public BlockingObjectDto<NodeDto> updateScenarioNode(String scenarioId, String nodeId,
UpdateScenarioNodeRequest request) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioName = scenarioNode.getName();
scenarioName = scenarioEditVersion.getName();
scenarioNode.setOutput(ScenarioNodeUtils.buildOutputList(request.getOutput()));
scenarioNode.setName(request.getName());
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Scenario node '%s' updated successfully for scenario '%s'('%s')",
request.getName(), scenarioId, scenarioName));
return getResponseNode(scenarioId, nodeId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update scenario node '%s' for scenario '%s'('%s') because of error: %s",
request.getName(), scenarioId, scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<NodeDto> getNodeById(String scenarioId, String nodeId) {
return getResponseNode(scenarioId, nodeId);
}
private static NodeDto getNodeDto(String nodeId, Map<String, ScenarioNode> nodes, List<InputNode> inputNodes, NodeDetailsDto nodeDetailsDto) {
NodeDto node = NodeDto.builder()
.info(ScenarioNodeUtils.buildNodeInfo(nodes.get(nodeId)))
.inputNodes(inputNodes)
.outputs(Optional.ofNullable(nodes.get(nodeId).getOutput()).orElse(Collections.emptyList()).stream().map(ScenarioNodeUtils::toOutputDto).toList())
.conditions(connectionToCondition(nodeDetailsDto != null ? nodeDetailsDto.getConditions() : null))
.mappings(connectionToCondition(nodeDetailsDto != null ? nodeDetailsDto.getMappings() : null))
.build();
return node;
}
private static List<Condition> connectionToCondition(List<ConnectionDto> nodeDetailsDto) {
if (nodeDetailsDto == null) return null;
return nodeDetailsDto.stream()
.map(connection -> Condition.builder()
.sourceNodeId(connection.getSourceNodeId())
.sourceColumnName(connection.getSourceColumnName())
.targetNodeId(connection.getTargetNodeId())
.targetColumnName(connection.getTargetColumnName())
.joinType(connection.getJoinType())
.build())
.collect(Collectors.toList());
}
private NodeOutputBuilder getNodeOutputBuilder(OperationType operationType) {
return switch (operationType) {
case __JOIN__ -> new JoinNodeOutputBuilder();
case __UNION__ -> new UnionNodeOutputBuilder();
default -> new DefaultNodeOutputBuilder();
};
}
@Override
public BlockingObjectDto<ScenarioDto> updateProjectJson(String scenarioId, String scenarioJson) {
String scenarioName = null;
try {
ProjectVersion scenarioProjectVersion = projectService.saveAsNewVersion(scenarioId,
scenarioJson);
ScenarioJsonDto scenarioJsonDto = new ScenarioJsonDto(scenarioId, scenarioJson);
scenarioName = scenarioProjectVersion.getProject().getObjectName();
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Scenario json updated successfully for scenario '%s'('%s')"));
return getScenario(scenarioId);
} catch (Throwable e) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update scenario json for scenario '%s'('%s') because of error: %s", scenarioId,
scenarioName, e.getMessage()), e);
throw e;
}
}
@Override
public BlockingObjectDto<ScenarioDto> getScenario(String scenarioId, Integer version) {
ScenarioVersion scenarioVersion = getScenarioVersion(scenarioId, version);
ScenarioGraphDto graphDto = ScenarioGraphUtils.buildScenarioGraphDto(scenarioVersion.getScenarioJson());
List<ScenarioNode> scenario = ScenarioNodeUtils.parseScenario(scenarioVersion.getScenarioJson());
ScenarioDto scenarioDto = ScenarioDto.builder()
.graph(graphDto)
.json(scenarioVersion.getScenarioJson())
.history(historyMapper.map(versionRepository.findAllByScenarioIdOrderByScenarioVersionDesc(scenarioId)))
.variables(scenario.stream().filter(s -> s.getType().equals(NodeType.__VARIABLE__.getId())).map(ScenarioNodeUtils::toVariableNodeDto).toList())
.properties(null)
.build();
return BlockingObjectDto.<ScenarioDto>builder()
.lockInfo(null)
.json(scenarioVersion.getScenarioJson())
.additionalParams(null)
.objectId(scenarioId)
.isProject(false)
.data(scenarioDto)
.build();
}
private ScenarioVersion getScenarioVersion(String scenarioId, Integer version) {
return versionRepository.findById(new ScenarioVersionPrimaryKey(scenarioId, version)).orElseThrow(
() -> new ScenarioNotFoundException(
String.format("Scenario [%s] version [%s] not found", scenarioId, version)));
}
public BlockingObjectDto<ScenarioDto> deployScenario(String scenarioId, DeployScenarioClientRequest property) {
deployProject(scenarioId);
try {
if (property.isIntegrationSL()) {
scenarioViewService.generateAndSaveView(scenarioId, property.getSlViewFramework());
}
return getScenario(scenarioId);
} catch (Exception e) {
updateEditMode(scenarioId, EditModeState.ON);
throw e;
}
}
private void setScenarioNodes(ProjectVersion scenarioEditVersion,
ScenarioNodeWrapper scenarioNodes) {
try {
scenarioEditVersion.setJson(objectMapper.writeValueAsString(scenarioNodes.getAll()));
} catch (JsonProcessingException exception) {
log.error("Unable to serialize nodes of scenario [{}]. Cause: [{}]",
scenarioEditVersion.getObjectId(), exception.getMessage());
throw new ScenarioPanelException(
String.format("Unable to serialize nodes of scenario [%s]. Cause: [%s]",
scenarioEditVersion.getObjectId(), exception.getMessage()));
}
}
private ScenarioEditVersion getScenarioEditVersion(String scenarioId) {
ProjectVersion scenarioProjectVersion = projectService.getNewVersionForEdit(scenarioId);
if (scenarioProjectVersion.getProject().getMetaObject().getObjectType()
!= CmMetaObjectTypeEnum.SCENARIO_PROJECT) {
throw new MetaObjectException("Invalid meta object type found");
}
return new ScenarioEditVersion(scenarioProjectVersion);
}
private InputModifier getInputModifier(ScenarioNode target) {
if (target.getIsResultNode() != null && target.getIsResultNode()) {
return new ResultNodeInputModifier();
} else if (target.getOperation() != null && Objects.equals(target.getOperation().getType(),
OperationType.__JOIN__.getId())) {
return new JoinNodeInputModifier();
}
return new DefaultInputModifier();
}
private boolean isResultNode(ScenarioNode scenarioNode) {
return
Objects.equals(scenarioNode.getOperation().getType(), OperationType.__PROJECTION__.getId())
&& Objects.equals(scenarioNode.getIsResultNode(), Boolean.TRUE);
}
private void setScenarioNodes(ScenarioEditVersion scenarioEditVersion,
List<ScenarioNode> scenarioNodes) {
try {
scenarioEditVersion.setScenarioJson(objectMapper.writeValueAsString(scenarioNodes));
} catch (JsonProcessingException exception) {
log.error("Unable to serialize nodes of scenario [{}]. Cause: [{}]",
scenarioEditVersion.getScenarioId(), exception.getMessage());
throw new ScenarioPanelException(
String.format("Unable to serialize nodes of scenario [%s]. Cause: [%s]",
scenarioEditVersion.getScenarioId(), exception.getMessage()));
}
}
private EdgeNodes getEdgesNodes(List<ScenarioNode> scenarioNodes, String source, String target) {
ScenarioNode sourceNode = null;
ScenarioNode targetNode = null;
for (ScenarioNode scenarioNode : scenarioNodes) {
if (scenarioNode.getId().equals(source)) {
sourceNode = scenarioNode;
}
if (scenarioNode.getId().equals(target)) {
targetNode = scenarioNode;
}
}
if (sourceNode == null) {
log.error("getEdgesNodes -> Source [{}] node not found", source);
throw new ScenarioUpdateException(String.format("Source [%s] node not found", source));
}
if (targetNode == null) {
log.error("getEdgesNodes -> Target [{}] node not found", target);
throw new ScenarioUpdateException(String.format("Target [%s] node not found", target));
}
return new EdgeNodes(sourceNode, targetNode);
}
private ScenarioNode findScenarioNode(List<ScenarioNode> scenarioNodes, String nodeId) {
return scenarioNodes.stream()
.filter(sn -> Objects.equals(sn.getId(), nodeId))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(String.format("Node [{%s}] not found", nodeId)));
}
@Override
public ScenarioNodeWrapper getCurrentProject(String objectId) {
return getBlockingObject(objectId).getData();
}
@Override
public List<ExtendedMappingDto> getTargetMapping(String scenarioId, String unionNodeId, String targetOutput) {
BlockingObjectDto<ScenarioDto> scenario = getScenario(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(scenario.getData().getJson());
UnionNodeOutputBuilder unionNodeOutputBuilder = new UnionNodeOutputBuilder();
return unionNodeOutputBuilder.getTargetMapping(scenarioNodes, unionNodeId, targetOutput);
}
@Override
public BlockingObjectDto<NodeDto> updateJoinType(String scenarioId, String nodeId, JoinTypeDto joinType) {
String scenarioName = null;
try {
ScenarioEditVersion scenarioEditVersion = getScenarioEditVersion(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(
scenarioEditVersion.getScenarioJson());
ScenarioNode scenarioNode = findScenarioNode(scenarioNodes, nodeId);
scenarioName = scenarioNode.getName();
if (!Objects.equals(scenarioNode.getOperation().getType(), OperationType.__JOIN__.getId())) {
throw new ScenarioUpdateException(String.format("Join node [%s] not found", nodeId));
}
Join join = Optional.ofNullable(scenarioNode.getOperation().getJoin())
.orElse(Collections.emptyList())
.stream()
.filter(j -> Objects.equals(j.getId(), joinType.getJoinedNodeId()))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Join for table [%s] not found", joinType.getJoinedNodeId())));
join.setType(joinType.getJoinType().getId());
if (joinType.getJoinType().equals(JoinType.__CROSS__)) {
join.setConditions(null);
}
setScenarioNodes(scenarioEditVersion, scenarioNodes);
projectService.saveAsNewVersion(scenarioId, scenarioEditVersion.getScenarioJson());
String auditMessage = "Join type '%s' updated successfully for scenario node '%s.%s' and join id '%s'";
log.info(composeLogMessageForCAPJournaling(scenarioId, scenarioName, auditMessage,
joinType.getJoinType().name(), scenarioId, nodeId,
joinType.getJoinedNodeId()));
} catch (Throwable t) {
log.error(composeLogMessageForCAPJournaling(scenarioId, scenarioName,
"Failed to update join type for scenario '%s'('%s') " +
"because of error: %s",
scenarioId, scenarioName, t.getMessage()), t);
throw t;
}
return getResponseNode(scenarioId, nodeId);
}
@Override
public BlockingObjectDto<ScenarioDto> updateScenarioProperties(String scenarioId, UpdateScenarioPropertiesRequest rq) {
ScenarioEditVersion scenario = getScenarioEditVersion(scenarioId);
scenario.getScenarioJson();
projectService.saveAsNewVersion(scenarioId, scenario.getScenarioJson(), rq);
return getScenario(scenarioId);
}
@Override
public BlockingObjectDto<ScenarioDto> rollback(String scenarioId) {
revertToPreviousVersion(scenarioId);
return getScenario(scenarioId);
}
@Override
public WindowOutputDto getWindowOutputDetails(String scenarioId, String nodeId, String outputName) {
BlockingObjectDto<ScenarioDto> scenario = getScenario(scenarioId);
List<ScenarioNode> scenarioNodes = ScenarioNodeUtils.parseScenario(scenario.getJson());
return scenarioNodes.stream()
.filter(sn -> Objects.equals(sn.getId(), nodeId))
.map(sn -> {
Window window = sn.getOperation().getWindow().stream()
.filter(w -> w.getColumn().equals(outputName))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Window for output [%s] not found", outputName)));
Output windowOutput = sn.getOutput().stream()
.filter(o -> o.getName().equals(outputName))
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Output with name [%s] not found", outputName)));
return ScenarioNodeUtils.toWindowOutputDto(window, windowOutput);
})
.findFirst()
.orElseThrow(() -> new ScenarioUpdateException(
String.format("Node [%s] not found", nodeId)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment