Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
if (shouldMigrate(c7IncidentId, HISTORY_INCIDENT)) {
HistoryMigratorLogs.migratingHistoricIncident(c7IncidentId);
var c7ProcessInstance = findProcessInstanceByC7Id(c7Incident.getProcessInstanceId());
Long processInstanceKey = null;
Long flowNodeInstanceKey = null;

var builder = new Builder();
var processDefinitionKey = findProcessDefinitionKey(c7Incident.getProcessDefinitionId());
builder.processDefinitionKey(processDefinitionKey);
if (c7ProcessInstance != null) {
var processInstanceKey = c7ProcessInstance.processInstanceKey();
processInstanceKey = c7ProcessInstance.processInstanceKey();
builder.processInstanceKey(processInstanceKey);
if (processInstanceKey != null) {
var flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId());
flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId());
builder.flowNodeInstanceKey(flowNodeInstanceKey);
// .jobKey(jobDefinitionKey) // TODO when jobs are migrated

// .jobKey(jobDefinitionKey) // TODO when jobs are migrated

String c7RootProcessInstanceId = c7Incident.getRootProcessInstanceId();
if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) {
Expand All @@ -101,32 +102,34 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
builder.rootProcessInstanceKey(rootProcessInstance.processInstanceKey());
}
}
builder.treePath(generateTreePath(processInstanceKey, flowNodeInstanceKey));
}
}

IncidentDbModel dbModel = convert(C7Entity.of(c7Incident), builder);

if (dbModel.processDefinitionKey() == null) {
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_DEFINITION);
}
if (dbModel.processDefinitionKey() == null) {
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_DEFINITION);
}

if (dbModel.processInstanceKey() == null) {
if (dbModel.processInstanceKey() == null) {
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_INSTANCE);
}
}

if (dbModel.rootProcessInstanceKey() == null) {
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE);
}
if (dbModel.rootProcessInstanceKey() == null) {
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE);
}

if (dbModel.flowNodeInstanceKey() == null) {
if (!c7Client.hasWaitingExecution(c7Incident.getProcessInstanceId(), c7Incident.getActivityId())) { // Activities on async before waiting state will not have a flow node instance key, but should not be skipped
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_FLOW_NODE);
if (dbModel.flowNodeInstanceKey() == null) {
if (!c7Client.hasWaitingExecution(c7Incident.getProcessInstanceId(), c7Incident.getActivityId())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ If the incident is not skipped. Then the C7 execution is completed the incident will have an inconsistent tree patch in C8. Is that a trade off we accepted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the treePath is not a reason to skip the incidentMigration, I will add this to the documentation

// Activities on async before waiting state will not have a flow node instance key, but should not be skipped
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_FLOW_NODE);
}
}
}

if (dbModel.jobKey() == null) {
// throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated
}
if (dbModel.jobKey() == null) {
// throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated
}
c8Client.insertIncident(dbModel);

return dbModel.incidentKey();
Expand All @@ -141,11 +144,9 @@ protected Long findFlowNodeInstanceKey(String activityId, String processInstance
return null;
}

List<FlowNodeInstanceDbModel> flowNodes = c8Client.searchFlowNodeInstances(
FlowNodeInstanceDbQuery.of(builder -> builder.filter(
FlowNodeInstanceFilter.of(filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey))
))
);
List<FlowNodeInstanceDbModel> flowNodes = c8Client.searchFlowNodeInstances(FlowNodeInstanceDbQuery.of(
builder -> builder.filter(FlowNodeInstanceFilter.of(
filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey)))));

if (!flowNodes.isEmpty()) {
return flowNodes.getFirst().flowNodeInstanceKey();
Expand All @@ -154,5 +155,18 @@ protected Long findFlowNodeInstanceKey(String activityId, String processInstance
}
}

/**
* Generates a tree path for incidents in the format: PI_processInstanceKey/FNI_elementInstanceKey (if the
* elementInstanceKey exists, otherwise PI_processInstanceKey)
*
* @param processInstanceKey the process instance key
* @param elementInstanceKey the flow node instance key
* @return the tree path string
*/
public static String generateTreePath(Long processInstanceKey, Long elementInstanceKey) {
return elementInstanceKey == null ?
"PI_" + processInstanceKey :
"PI_" + processInstanceKey + "/FNI_" + elementInstanceKey;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void execute(HistoricIncident entity, Builder builder) {
.errorType(determineErrorType(entity))
.errorMessage(entity.getIncidentMessage())
.creationDate(convertDate(entity.getCreateTime()))
.treePath(null)
.errorMessageHash(null)
.partitionId(C7_HISTORY_PARTITION_ID)
.jobKey(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.db.rdbms.sql.FlowNodeInstanceMapper;
import io.camunda.db.rdbms.sql.PurgeMapper;
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
import io.camunda.db.rdbms.write.domain.IncidentDbModel;
import io.camunda.db.rdbms.write.service.RdbmsPurger;
import io.camunda.migration.data.HistoryMigrator;
import io.camunda.migration.data.MigratorMode;
Expand All @@ -29,6 +30,7 @@
import io.camunda.migration.data.impl.clients.DbClient;
import io.camunda.migration.data.impl.util.ConverterUtil;
import io.camunda.migration.data.qa.AbstractMigratorTest;
import io.camunda.migration.data.qa.extension.RdbmsQueryExtension;
import io.camunda.migration.data.qa.util.WithSpringProfile;
import io.camunda.search.entities.AuditLogEntity;
import io.camunda.search.entities.DecisionDefinitionEntity;
Expand Down Expand Up @@ -64,6 +66,7 @@
import org.camunda.bpm.engine.impl.util.ClockUtil;
import org.camunda.bpm.engine.task.Task;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
Expand Down Expand Up @@ -93,6 +96,10 @@ public abstract class HistoryMigrationAbstractTest extends AbstractMigratorTest
@Autowired
protected FlowNodeInstanceMapper flowNodeInstanceMapper;

@RegisterExtension
@Autowired
protected RdbmsQueryExtension rdbmsQuery = new RdbmsQueryExtension();

// C7 ---------------------------------------

@Autowired
Expand Down Expand Up @@ -232,6 +239,20 @@ public List<IncidentEntity> searchHistoricIncidents(String processDefinitionId)
.items();
}

public List<IncidentDbModel> searchIncidentsByProcessInstanceKeyAndReturnAsDbModel(Long processInstanceKey) {
String sql = "SELECT INCIDENT_KEY, TREE_PATH, PROCESS_INSTANCE_KEY, FLOW_NODE_INSTANCE_KEY " +
"FROM INCIDENT WHERE PROCESS_INSTANCE_KEY = ?";

return rdbmsQuery.query(sql, (rs, rowNum) -> {
IncidentDbModel.Builder builder = new IncidentDbModel.Builder();
builder.incidentKey(rs.getLong("INCIDENT_KEY"));
builder.treePath(rs.getString("TREE_PATH"));
builder.processInstanceKey(rs.getLong("PROCESS_INSTANCE_KEY"));
builder.flowNodeInstanceKey(rs.getLong("FLOW_NODE_INSTANCE_KEY"));
return builder.build();
}, processInstanceKey);
}

public List<VariableEntity> searchHistoricVariables(String varName) {
return rdbmsService.getVariableReader()
.search(VariableQuery.of(queryBuilder ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@
import static io.camunda.search.entities.IncidentEntity.ErrorType.RESOURCE_NOT_FOUND;
import static io.camunda.search.entities.IncidentEntity.ErrorType.UNHANDLED_ERROR_EVENT;
import static io.camunda.search.entities.IncidentEntity.ErrorType.UNKNOWN;
import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.END_EVENT;
import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.START_EVENT;
import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.USER_TASK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.camunda.bpm.engine.impl.incident.IncidentHandling.createIncident;
import static org.camunda.bpm.engine.impl.jobexecutor.ExecuteJobHelper.executeJob;

import io.camunda.migration.data.MigratorMode;
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
import io.camunda.db.rdbms.write.domain.IncidentDbModel;
import io.camunda.migration.data.qa.history.HistoryMigrationAbstractTest;
import io.camunda.search.entities.IncidentEntity;
import java.util.Collections;
import io.camunda.search.entities.ProcessInstanceEntity;
import java.util.List;
import org.camunda.bpm.engine.history.HistoricIncident;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
Expand Down Expand Up @@ -52,7 +59,9 @@ public void shouldMigrateIncidentTenant() {
// then
List<IncidentEntity> incidentsDefaultTenant = searchHistoricIncidents("incidentProcessId");
List<IncidentEntity> incidentsTenant1 = searchHistoricIncidents("incidentProcessId2");
assertThat(incidentsDefaultTenant).singleElement().extracting(IncidentEntity::tenantId).isEqualTo(C8_DEFAULT_TENANT);
assertThat(incidentsDefaultTenant).singleElement()
.extracting(IncidentEntity::tenantId)
.isEqualTo(C8_DEFAULT_TENANT);
assertThat(incidentsTenant1).singleElement().extracting(IncidentEntity::tenantId).isEqualTo("tenant1");
}

Expand Down Expand Up @@ -110,13 +119,19 @@ public void shouldMigrateIncidentForNestedProcessInstance() {
deployer.deployCamunda7Process("callActivityProcess.bpmn");
deployer.deployCamunda7Process("calledActivitySubprocess.bpmn");
ProcessInstance parentProcess = runtimeService.startProcessInstanceByKey("callingProcessId");
ProcessInstance childProcess = runtimeService.createProcessInstanceQuery().processDefinitionKey("calledProcessInstanceId").singleResult();
ProcessInstance childProcess = runtimeService.createProcessInstanceQuery()
.processDefinitionKey("calledProcessInstanceId")
.singleResult();
createIncident("userTaskId"); // create incident in child's task

HistoricIncident c7ChildIncident = historyService.createHistoricIncidentQuery().processInstanceId(childProcess.getProcessInstanceId()).singleResult();
HistoricIncident c7ChildIncident = historyService.createHistoricIncidentQuery()
.processInstanceId(childProcess.getProcessInstanceId())
.singleResult();
assertThat(c7ChildIncident).isNotNull();

HistoricIncident c7ParentIncident = historyService.createHistoricIncidentQuery().processInstanceId(parentProcess.getProcessInstanceId()).singleResult();
HistoricIncident c7ParentIncident = historyService.createHistoricIncidentQuery()
.processInstanceId(parentProcess.getProcessInstanceId())
.singleResult();
assertThat(c7ParentIncident).isNotNull();

// when
Expand All @@ -127,7 +142,8 @@ public void shouldMigrateIncidentForNestedProcessInstance() {
// child incident is migrated
List<IncidentEntity> childIncidents = searchHistoricIncidents(childProcess.getProcessDefinitionKey());
assertThat(childIncidents).hasSize(1);
assertOnIncidentBasicFields(childIncidents.getFirst(), c7ChildIncident, childProcess, parentProcess, UNKNOWN, false);
assertOnIncidentBasicFields(childIncidents.getFirst(), c7ChildIncident, childProcess, parentProcess, UNKNOWN,
false);

// parent incident is migrated
List<IncidentEntity> parentIncidents = searchHistoricIncidents(parentProcess.getProcessDefinitionKey());
Expand Down Expand Up @@ -282,7 +298,8 @@ public void shouldMigrateIncidentWithDecisionEvaluationErrorType() {
// given
deployer.deployCamunda7Process("ruleTaskProcess.bpmn");
deployer.deployCamunda7Decision("mappingFailureDmn.dmn");
ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("ruleTaskProcessId", Collections.singletonMap("input", "single entry list"));
ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("ruleTaskProcessId",
Collections.singletonMap("input", "single entry list"));
triggerIncident(c7ProcessInstance.getId());

HistoricIncident c7Incident = historyService.createHistoricIncidentQuery()
Expand Down Expand Up @@ -368,19 +385,91 @@ public void shouldMigrateIncidentWithNoJobRetriesErrorType() {
assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, JOB_NO_RETRIES, true);
}

protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIncident c7Incident, ProcessInstance c7ChildInstance, ProcessInstance c7ParentInstance) {
@Test
public void shouldGenerateTreePathForIncidentsWithFlowNodeInstanceKey() {
// given
deployer.deployCamunda7Process("userTaskProcessAsyncAfter.bpmn");
ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("userTaskProcessId");

createIncident("userTaskId");
String userTaskId = taskService.createTaskQuery().taskDefinitionKey("userTaskId").singleResult().getId();
taskService.complete(userTaskId);

HistoricIncident c7Incident = historyService.createHistoricIncidentQuery()
.processInstanceId(c7ProcessInstance.getId())
.singleResult();
assertThat(c7Incident).isNotNull();

// when
historyMigrator.migrate();

// then
List<ProcessInstanceEntity> processInstances = searchHistoricProcessInstances("userTaskProcessId");
assertThat(processInstances).hasSize(1);
Long processInstanceKey = processInstances.getFirst().processInstanceKey();
List<FlowNodeInstanceDbModel> flowNodes = searchFlowNodeInstancesByName("UserTaskName");
assertThat(flowNodes).hasSize(1);
Long flownodeInstanceKey = flowNodes.getFirst().flowNodeInstanceKey();

List<IncidentDbModel> incidents = searchIncidentsByProcessInstanceKeyAndReturnAsDbModel(processInstanceKey);
assertThat(incidents).singleElement()
.extracting(IncidentDbModel::treePath)
.isNotNull()
.isEqualTo("PI_" + processInstanceKey + "/FNI_" + flownodeInstanceKey);
}

@Test
public void shouldGenerateTreePathForIncidentsWithoutFlowNodeInstanceKey() {
// given
deployer.deployCamunda7Process("incidentProcess.bpmn");
ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("incidentProcessId");
triggerIncident(c7ProcessInstance.getId());

HistoricIncident c7Incident = historyService.createHistoricIncidentQuery()
.processInstanceId(c7ProcessInstance.getId())
.singleResult();
assertThat(c7Incident).isNotNull();

// when
historyMigrator.migrate();

// then
List<ProcessInstanceEntity> processInstances = searchHistoricProcessInstances("incidentProcessId");
assertThat(processInstances).hasSize(1);
Long processInstanceKey = processInstances.getFirst().processInstanceKey();

List<IncidentDbModel> incidents = searchIncidentsByProcessInstanceKeyAndReturnAsDbModel(processInstanceKey);
assertThat(incidents).singleElement()
.extracting(IncidentDbModel::treePath)
.isNotNull()
.isEqualTo("PI_" + processInstanceKey);
}

protected void assertOnIncidentBasicFields(IncidentEntity c8Incident,
HistoricIncident c7Incident,
ProcessInstance c7ChildInstance,
ProcessInstance c7ParentInstance) {
assertOnIncidentBasicFields(c8Incident, c7Incident, c7ChildInstance, c7ParentInstance, UNKNOWN, false);
}

protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIncident c7Incident, ProcessInstance c7ChildInstance, ProcessInstance c7ParentInstance, IncidentEntity.ErrorType errorType, boolean waitingExecution) {
protected void assertOnIncidentBasicFields(IncidentEntity c8Incident,
HistoricIncident c7Incident,
ProcessInstance c7ChildInstance,
ProcessInstance c7ParentInstance,
IncidentEntity.ErrorType errorType,
boolean waitingExecution) {
// specific values
assertThat(c8Incident.tenantId()).isEqualTo(C8_DEFAULT_TENANT);
assertThat(c8Incident.processDefinitionId()).isEqualTo(prefixDefinitionId(c7ChildInstance.getProcessDefinitionKey()));
assertThat(c8Incident.processDefinitionId()).isEqualTo(
prefixDefinitionId(c7ChildInstance.getProcessDefinitionKey()));
assertThat(c8Incident.flowNodeId()).isEqualTo(c7Incident.getActivityId());
assertThat(c8Incident.state()).isEqualTo(IncidentEntity.IncidentState.RESOLVED);
assertThat(c8Incident.errorMessage()).isEqualTo(c7Incident.getIncidentMessage());
assertThat(c8Incident.processInstanceKey()).isEqualTo(findMigratedProcessInstanceKey(c7ChildInstance.getProcessDefinitionKey()));
String expectedRootProcessKey = c7ParentInstance != null ? c7ParentInstance.getProcessDefinitionKey() : c7ChildInstance.getProcessDefinitionKey();
assertThat(c8Incident.processInstanceKey()).isEqualTo(
findMigratedProcessInstanceKey(c7ChildInstance.getProcessDefinitionKey()));
String expectedRootProcessKey = c7ParentInstance != null ?
c7ParentInstance.getProcessDefinitionKey() :
c7ChildInstance.getProcessDefinitionKey();
assertThat(c8Incident.rootProcessInstanceKey()).isEqualTo(findMigratedProcessInstanceKey(expectedRootProcessKey));
assertThat(c8Incident.errorType()).isEqualTo(errorType);

Expand All @@ -400,10 +489,7 @@ protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIn
}

protected void executeJob(ProcessInstance c7ProcessInstance) {
Job job = managementService
.createJobQuery()
.processInstanceId(c7ProcessInstance.getId())
.singleResult();
Job job = managementService.createJobQuery().processInstanceId(c7ProcessInstance.getId()).singleResult();

if (job != null) {
managementService.executeJob(job.getId());
Expand Down