Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.camunda.migration.data.impl.history.migrator.FlowNodeMigrator;
import io.camunda.migration.data.impl.history.migrator.FormMigrator;
import io.camunda.migration.data.impl.history.migrator.IncidentMigrator;
import io.camunda.migration.data.impl.history.migrator.JobMigrator;
import io.camunda.migration.data.impl.history.migrator.ProcessDefinitionMigrator;
import io.camunda.migration.data.impl.history.migrator.ProcessInstanceMigrator;
import io.camunda.migration.data.impl.history.migrator.UserTaskMigrator;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class HistoryMigrator {
@Autowired
protected IncidentMigrator incidentMigrator;

@Autowired
protected JobMigrator jobMigrator;

@Autowired
protected DecisionRequirementsMigrator decisionRequirementsMigrator;

Expand All @@ -85,6 +89,7 @@ public class HistoryMigrator {
flowNodeMigrator,
userTaskMigrator,
variableMigrator,
jobMigrator,
incidentMigrator,
decisionRequirementsMigrator,
decisionDefinitionMigrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.camunda.migration.data.impl.interceptor.history.entity.FlowNodeTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.FormTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.IncidentTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.JobTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.ProcessDefinitionTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.ProcessInstanceTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.UserTaskTransformer;
Expand Down Expand Up @@ -357,6 +358,11 @@ public FormTransformer formTransformer() {
return new FormTransformer();
}

@Bean
public JobTransformer jobTransformer() {
return new JobTransformer();
}

@Bean
public IncidentTransformer incidentTransformer() {
return new IncidentTransformer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.camunda.migration.data.impl.history.migrator.DecisionInstanceMigrator;
import io.camunda.migration.data.impl.history.migrator.DecisionRequirementsMigrator;
import io.camunda.migration.data.impl.history.migrator.FormMigrator;
import io.camunda.migration.data.impl.history.migrator.JobMigrator;
import io.camunda.migration.data.impl.history.migrator.ProcessDefinitionMigrator;
import io.camunda.migration.data.impl.history.migrator.AuditLogMigrator;
import io.camunda.migration.data.impl.identity.AuthorizationManager;
Expand Down Expand Up @@ -75,6 +76,7 @@
DecisionInstanceMigrator.class,
DecisionRequirementsMigrator.class,
FlowNodeMigrator.class,
JobMigrator.class,
IncidentMigrator.class,
FormMigrator.class,
ProcessDefinitionMigrator.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.camunda.bpm.engine.history.HistoricActivityInstance;
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
import org.camunda.bpm.engine.history.HistoricIncident;
import org.camunda.bpm.engine.history.HistoricJobLog;
import org.camunda.bpm.engine.history.HistoricProcessInstance;
import org.camunda.bpm.engine.history.HistoricTaskInstance;
import org.camunda.bpm.engine.history.HistoricVariableInstance;
Expand All @@ -56,6 +57,7 @@
import org.camunda.bpm.engine.impl.HistoricActivityInstanceQueryImpl;
import org.camunda.bpm.engine.impl.HistoricDecisionInstanceQueryImpl;
import org.camunda.bpm.engine.impl.HistoricIncidentQueryImpl;
import org.camunda.bpm.engine.impl.HistoricJobLogQueryImpl;
import org.camunda.bpm.engine.impl.HistoricProcessInstanceQueryImpl;
import org.camunda.bpm.engine.impl.HistoricTaskInstanceQueryImpl;
import org.camunda.bpm.engine.impl.HistoricVariableInstanceQueryImpl;
Expand Down Expand Up @@ -570,6 +572,39 @@ public UserOperationLogEntry getUserOperationLogEntry(String c7Id) {
return callApi(query::singleResult, format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "UserOperationLogEntry", c7Id));
}

/**
* Fetches the first historic job log entry for the given job ID, ordered by timestamp ascending.
* Used for retry mode, where the job ID is the tracking key.
*/
public HistoricJobLog getHistoricJobLog(String jobId) {
HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery()
.jobId(jobId)
.orderByTimestamp()
.asc()
.orderByJobId()
.asc();
List<HistoricJobLog> results = callApi(query::list,
format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "HistoricJobLog with jobId", jobId));
return results.isEmpty() ? null : results.getFirst();
}

/**
* Processes historic job log entries with pagination using the provided callback consumer.
* The {@code ignoredCreatedAfter} parameter is not supported by the query and is ignored;
* deduplication is handled via the tracking table.
*/
public void fetchAndHandleHistoricJobLogs(Consumer<HistoricJobLog> callback, Date ignoredCreatedAfter) {
HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery()
.orderByTimestamp()
.asc()
.orderByJobId()
.asc();

new Pagination<HistoricJobLog>().pageSize(properties.getPageSize())
.query(query)
.callback(callback);
}

/**
* Processes tenant entities with pagination using the provided callback consumer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_REQUIREMENTS;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_FLOW_NODE_INSTANCE;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_INCIDENT;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_JOB;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_DEFINITION;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_INSTANCE;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_USER_TASK;
Expand Down Expand Up @@ -68,6 +69,7 @@
import io.camunda.db.rdbms.sql.FlowNodeInstanceMapper;
import io.camunda.db.rdbms.sql.FormMapper;
import io.camunda.db.rdbms.sql.IncidentMapper;
import io.camunda.db.rdbms.sql.JobMapper;
import io.camunda.db.rdbms.sql.ProcessDefinitionMapper;
import io.camunda.db.rdbms.sql.ProcessInstanceMapper;
import io.camunda.db.rdbms.sql.UserTaskMapper;
Expand All @@ -79,6 +81,7 @@
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
import io.camunda.db.rdbms.write.domain.FormDbModel;
import io.camunda.db.rdbms.write.domain.IncidentDbModel;
import io.camunda.db.rdbms.write.domain.JobDbModel;
import io.camunda.db.rdbms.write.domain.ProcessDefinitionDbModel;
import io.camunda.db.rdbms.write.domain.ProcessInstanceDbModel;
import io.camunda.db.rdbms.write.domain.UserTaskDbModel;
Expand Down Expand Up @@ -148,6 +151,9 @@ public class C8Client {
@Autowired(required = false)
protected FormMapper formMapper;

@Autowired(required = false)
protected JobMapper jobMapper;

/**
* Creates a new process instance with the given BPMN process ID and variables.
*/
Expand Down Expand Up @@ -444,4 +450,11 @@ public void insertForm(FormDbModel dbModel) {
callApi(() -> formMapper.insert(dbModel), "Failed to insert form");
}

/**
* Inserts a Job into the database.
*/
public void insertJob(JobDbModel dbModel) {
callApi(() -> jobMapper.insert(new BatchInsertDto(List.of(dbModel))), FAILED_TO_INSERT_JOB);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.camunda.bpm.engine.history.HistoricActivityInstance;
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
import org.camunda.bpm.engine.history.HistoricIncident;
import org.camunda.bpm.engine.history.HistoricJobLog;
import org.camunda.bpm.engine.history.HistoricProcessInstance;
import org.camunda.bpm.engine.history.HistoricTaskInstance;
import org.camunda.bpm.engine.history.HistoricVariableInstance;
Expand Down Expand Up @@ -42,6 +43,7 @@ public static C7Entity<?> of(Object c7) {
case HistoricDecisionInstance c7DecisionInstance -> of(c7DecisionInstance);
case HistoricActivityInstance c7ActivityInstance -> of(c7ActivityInstance);
case HistoricIncident c7Incident -> of(c7Incident);
case HistoricJobLog c7JobLog -> of(c7JobLog);
case HistoricProcessInstance c7ProcessInstance -> of(c7ProcessInstance);
case HistoricTaskInstance c7TaskInstance -> of(c7TaskInstance);
case HistoricVariableInstance c7VariableInstance -> of(c7VariableInstance);
Expand Down Expand Up @@ -104,6 +106,10 @@ public static C7Entity<HistoricVariableInstance> of(HistoricVariableInstance c7E
return new C7Entity<>(c7Entity.getId(), c7Entity.getCreateTime(), c7Entity);
}

public static C7Entity<HistoricJobLog> of(HistoricJobLog c7Entity) {
return new C7Entity<>(c7Entity.getJobId(), c7Entity.getTimestamp(), c7Entity);
}

public String getId() {
return id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.camunda.bpm.engine.history.HistoricActivityInstance;
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
import org.camunda.bpm.engine.history.HistoricIncident;
import org.camunda.bpm.engine.history.HistoricJobLog;
import org.camunda.bpm.engine.history.HistoricProcessInstance;
import org.camunda.bpm.engine.history.HistoricTaskInstance;
import org.camunda.bpm.engine.history.HistoricVariableInstance;
Expand Down Expand Up @@ -47,6 +48,10 @@ public EntitySkippedException(HistoricIncident c7Incident, String message) {
this(C7Entity.of(c7Incident), message);
}

public EntitySkippedException(HistoricJobLog c7JobLog, String message) {
this(C7Entity.of(c7JobLog), message);
}

public EntitySkippedException(HistoricProcessInstance c7ProcessInstance, String message) {
this(C7Entity.of(c7ProcessInstance), message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
package io.camunda.migration.data.impl.history.migrator;

import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_FLOW_NODE;
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_JOB_REFERENCE;
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_DEFINITION;
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE;
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE;
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT;
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB;
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE;
import static org.camunda.bpm.engine.runtime.Incident.FAILED_JOB_HANDLER_TYPE;

import io.camunda.db.rdbms.read.domain.FlowNodeInstanceDbQuery;
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
Expand Down Expand Up @@ -66,12 +69,10 @@ public IdKeyMapper.TYPE getType() {
* <li>Process instance key missing - skipped with {@code SKIP_REASON_MISSING_PROCESS_INSTANCE_KEY}</li>
* <li>Process definition not yet migrated - skipped with {@code SKIP_REASON_MISSING_PROCESS_DEFINITION}</li>
* <li>Root process instance not yet migrated (when part of a process hierarchy) - skipped with {@code SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE}</li>
* <li>Referenced job explicitly skipped - skipped with {@code SKIP_REASON_MISSING_JOB_REFERENCE}</li>
* <li>Interceptor error during conversion - skipped with the exception message</li>
* </ul>
*
* <p><strong>Note:</strong> Flow node instance and job reference validations are currently disabled
* pending resolution of known issues. See code comments for details.
*
* @param c7Incident the historic incident from Camunda 7 to be migrated
* @throws EntityInterceptorException if an error occurs during entity conversion (handled internally, entity marked as skipped)
*/
Expand All @@ -91,8 +92,6 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
if (processInstanceKey != null) {
var flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId());
builder.flowNodeInstanceKey(flowNodeInstanceKey);
// .jobKey(jobDefinitionKey) // TODO when jobs are migrated


String c7RootProcessInstanceId = c7Incident.getRootProcessInstanceId();
if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) {
Expand All @@ -104,6 +103,8 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
}
}

resolveJobKey(c7Incident, builder);

IncidentDbModel dbModel = convert(C7Entity.of(c7Incident), builder);

if (dbModel.processDefinitionKey() == null) {
Expand All @@ -124,8 +125,8 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
}
}

if (dbModel.jobKey() == null) {
// throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated
if (isFailedJobIncident(c7Incident) && dbModel.jobKey() == null) { // nope, only async
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE);
}
c8Client.insertIncident(dbModel);

Expand Down Expand Up @@ -154,5 +155,47 @@ protected Long findFlowNodeInstanceKey(String activityId, String processInstance
}
}

/**
* Resolves and sets the job key on the incident builder for {@code failedJob} incidents.
* <p>
* If the incident is a {@code failedJob} incident and the associated C7 job has been tracked
* in the migration table:
* <ul>
* <li>If the job was migrated (has a C8 key), sets the {@code jobKey} on the builder.</li>
* <li>If the job was explicitly skipped (null C8 key), throws an
* {@link EntitySkippedException} to skip this incident as well.</li>
* </ul>
* If the job is not yet tracked (job migration may not have run or the job type is not
* tracked), the incident proceeds without a job key.
* </p>
*
* @param c7Incident the Camunda 7 incident
* @param builder the incident builder to set the job key on
* @throws EntitySkippedException if the associated job was explicitly skipped
*/
protected void resolveJobKey(final HistoricIncident c7Incident, final Builder builder) {
if (!isFailedJobIncident(c7Incident)) {
return;
}
final String c7JobId = c7Incident.getConfiguration();
if (c7JobId == null) {
return;
}
if (dbClient.checkExistsByC7IdAndType(c7JobId, HISTORY_JOB)) {
final Long jobKey = dbClient.findC8KeyByC7IdAndType(c7JobId, HISTORY_JOB);
builder.jobKey(jobKey);
}
}

/**
* Returns true if this incident was caused by a job failure.
*
* @param c7Incident the Camunda 7 incident
* @return true for {@code failedJob} incident type
*/
protected boolean isFailedJobIncident(final HistoricIncident c7Incident) {
return FAILED_JOB_HANDLER_TYPE.equals(c7Incident.getIncidentType());
}

}

Loading
Loading