Skip to content

Commit e24c084

Browse files
Copilotyanavasileva
andcommitted
Initial plan
feat(history): migrate async-before/after job logs from C7 to C8 related to #5331 - Add HISTORY_JOB type to IdKeyMapper.TYPE with HistoricJobLogEventEntity class mapping - Create JobMigrator: migrates C7 HistoricJobLog entries tracked by job ID (deduplicates multiple log entries per job) - Create JobTransformer: EntityInterceptor mapping C7 job log fields to C8 JobDbModel (state, retries, error, timing, etc.) - Add fetchAndHandleHistoricJobLogs() and getHistoricJobLog() to C7Client - Add insertJob() to C8Client - Add of(HistoricJobLog) factory method to C7Entity using getJobId() as tracking key - Add HistoricJobLog constructor to EntitySkippedException - Update IncidentMigrator: resolve jobKey for failedJob incidents from HISTORY_JOB tracking table; skip incident if the referenced job was explicitly skipped - Update HistoryMigrator: call migrateJobs() before migrateIncidents() - Enable previously-disabled test shouldNotMigrateIncidentsWhenJobIsSkipped (changed HISTORY_FLOW_NODE to HISTORY_JOB) - Add HistoryJobTest integration tests covering deduplication and incident FK population - Add asyncBeforeUserTaskProcess.bpmn for testing successfully-completed async-before jobs Co-authored-by: yanavasileva <26868499+yanavasileva@users.noreply.github.com> fix(history): update IncidentMigrator Javadoc to reflect job key validation Co-authored-by: yanavasileva <26868499+yanavasileva@users.noreply.github.com>
1 parent f203bfc commit e24c084

File tree

15 files changed

+530
-10
lines changed

15 files changed

+530
-10
lines changed

data-migrator/core/src/main/java/io/camunda/migration/data/HistoryMigrator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.camunda.migration.data.impl.history.migrator.FlowNodeMigrator;
1919
import io.camunda.migration.data.impl.history.migrator.FormMigrator;
2020
import io.camunda.migration.data.impl.history.migrator.IncidentMigrator;
21+
import io.camunda.migration.data.impl.history.migrator.JobMigrator;
2122
import io.camunda.migration.data.impl.history.migrator.ProcessDefinitionMigrator;
2223
import io.camunda.migration.data.impl.history.migrator.ProcessInstanceMigrator;
2324
import io.camunda.migration.data.impl.history.migrator.UserTaskMigrator;
@@ -62,6 +63,9 @@ public class HistoryMigrator {
6263
@Autowired
6364
protected IncidentMigrator incidentMigrator;
6465

66+
@Autowired
67+
protected JobMigrator jobMigrator;
68+
6569
@Autowired
6670
protected DecisionRequirementsMigrator decisionRequirementsMigrator;
6771

@@ -85,6 +89,7 @@ public class HistoryMigrator {
8589
flowNodeMigrator,
8690
userTaskMigrator,
8791
variableMigrator,
92+
jobMigrator,
8893
incidentMigrator,
8994
decisionRequirementsMigrator,
9095
decisionDefinitionMigrator,

data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C7Client.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.camunda.bpm.engine.history.HistoricActivityInstance;
4343
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
4444
import org.camunda.bpm.engine.history.HistoricIncident;
45+
import org.camunda.bpm.engine.history.HistoricJobLog;
4546
import org.camunda.bpm.engine.history.HistoricProcessInstance;
4647
import org.camunda.bpm.engine.history.HistoricTaskInstance;
4748
import org.camunda.bpm.engine.history.HistoricVariableInstance;
@@ -56,6 +57,7 @@
5657
import org.camunda.bpm.engine.impl.HistoricActivityInstanceQueryImpl;
5758
import org.camunda.bpm.engine.impl.HistoricDecisionInstanceQueryImpl;
5859
import org.camunda.bpm.engine.impl.HistoricIncidentQueryImpl;
60+
import org.camunda.bpm.engine.impl.HistoricJobLogQueryImpl;
5961
import org.camunda.bpm.engine.impl.HistoricProcessInstanceQueryImpl;
6062
import org.camunda.bpm.engine.impl.HistoricTaskInstanceQueryImpl;
6163
import org.camunda.bpm.engine.impl.HistoricVariableInstanceQueryImpl;
@@ -570,6 +572,39 @@ public UserOperationLogEntry getUserOperationLogEntry(String c7Id) {
570572
return callApi(query::singleResult, format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "UserOperationLogEntry", c7Id));
571573
}
572574

575+
/**
576+
* Fetches the first historic job log entry for the given job ID, ordered by timestamp ascending.
577+
* Used for retry mode, where the job ID is the tracking key.
578+
*/
579+
public HistoricJobLog getHistoricJobLog(String jobId) {
580+
HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery()
581+
.jobId(jobId)
582+
.orderByTimestamp()
583+
.asc()
584+
.orderByJobId()
585+
.asc();
586+
List<HistoricJobLog> results = callApi(query::list,
587+
format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "HistoricJobLog with jobId", jobId));
588+
return results.isEmpty() ? null : results.getFirst();
589+
}
590+
591+
/**
592+
* Processes historic job log entries with pagination using the provided callback consumer.
593+
* The {@code ignoredCreatedAfter} parameter is not supported by the query and is ignored;
594+
* deduplication is handled via the tracking table.
595+
*/
596+
public void fetchAndHandleHistoricJobLogs(Consumer<HistoricJobLog> callback, Date ignoredCreatedAfter) {
597+
HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery()
598+
.orderByTimestamp()
599+
.asc()
600+
.orderByJobId()
601+
.asc();
602+
603+
new Pagination<HistoricJobLog>().pageSize(properties.getPageSize())
604+
.query(query)
605+
.callback(callback);
606+
}
607+
573608
/**
574609
* Processes tenant entities with pagination using the provided callback consumer.
575610
*/

data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C8Client.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_REQUIREMENTS;
2525
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_FLOW_NODE_INSTANCE;
2626
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_INCIDENT;
27+
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_JOB;
2728
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_DEFINITION;
2829
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_INSTANCE;
2930
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_USER_TASK;
@@ -68,6 +69,7 @@
6869
import io.camunda.db.rdbms.sql.FlowNodeInstanceMapper;
6970
import io.camunda.db.rdbms.sql.FormMapper;
7071
import io.camunda.db.rdbms.sql.IncidentMapper;
72+
import io.camunda.db.rdbms.sql.JobMapper;
7173
import io.camunda.db.rdbms.sql.ProcessDefinitionMapper;
7274
import io.camunda.db.rdbms.sql.ProcessInstanceMapper;
7375
import io.camunda.db.rdbms.sql.UserTaskMapper;
@@ -79,6 +81,7 @@
7981
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
8082
import io.camunda.db.rdbms.write.domain.FormDbModel;
8183
import io.camunda.db.rdbms.write.domain.IncidentDbModel;
84+
import io.camunda.db.rdbms.write.domain.JobDbModel;
8285
import io.camunda.db.rdbms.write.domain.ProcessDefinitionDbModel;
8386
import io.camunda.db.rdbms.write.domain.ProcessInstanceDbModel;
8487
import io.camunda.db.rdbms.write.domain.UserTaskDbModel;
@@ -148,6 +151,9 @@ public class C8Client {
148151
@Autowired(required = false)
149152
protected FormMapper formMapper;
150153

154+
@Autowired(required = false)
155+
protected JobMapper jobMapper;
156+
151157
/**
152158
* Creates a new process instance with the given BPMN process ID and variables.
153159
*/
@@ -444,4 +450,11 @@ public void insertForm(FormDbModel dbModel) {
444450
callApi(() -> formMapper.insert(dbModel), "Failed to insert form");
445451
}
446452

453+
/**
454+
* Inserts a Job into the database.
455+
*/
456+
public void insertJob(JobDbModel dbModel) {
457+
callApi(() -> jobMapper.insert(new BatchInsertDto(List.of(dbModel))), FAILED_TO_INSERT_JOB);
458+
}
459+
447460
}

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/C7Entity.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.camunda.bpm.engine.history.HistoricActivityInstance;
1414
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
1515
import org.camunda.bpm.engine.history.HistoricIncident;
16+
import org.camunda.bpm.engine.history.HistoricJobLog;
1617
import org.camunda.bpm.engine.history.HistoricProcessInstance;
1718
import org.camunda.bpm.engine.history.HistoricTaskInstance;
1819
import org.camunda.bpm.engine.history.HistoricVariableInstance;
@@ -42,6 +43,7 @@ public static C7Entity<?> of(Object c7) {
4243
case HistoricDecisionInstance c7DecisionInstance -> of(c7DecisionInstance);
4344
case HistoricActivityInstance c7ActivityInstance -> of(c7ActivityInstance);
4445
case HistoricIncident c7Incident -> of(c7Incident);
46+
case HistoricJobLog c7JobLog -> of(c7JobLog);
4547
case HistoricProcessInstance c7ProcessInstance -> of(c7ProcessInstance);
4648
case HistoricTaskInstance c7TaskInstance -> of(c7TaskInstance);
4749
case HistoricVariableInstance c7VariableInstance -> of(c7VariableInstance);
@@ -104,6 +106,10 @@ public static C7Entity<HistoricVariableInstance> of(HistoricVariableInstance c7E
104106
return new C7Entity<>(c7Entity.getId(), c7Entity.getCreateTime(), c7Entity);
105107
}
106108

109+
public static C7Entity<HistoricJobLog> of(HistoricJobLog c7Entity) {
110+
return new C7Entity<>(c7Entity.getJobId(), c7Entity.getTimestamp(), c7Entity);
111+
}
112+
107113
public String getId() {
108114
return id;
109115
}

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/EntitySkippedException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.camunda.bpm.engine.history.HistoricActivityInstance;
1212
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
1313
import org.camunda.bpm.engine.history.HistoricIncident;
14+
import org.camunda.bpm.engine.history.HistoricJobLog;
1415
import org.camunda.bpm.engine.history.HistoricProcessInstance;
1516
import org.camunda.bpm.engine.history.HistoricTaskInstance;
1617
import org.camunda.bpm.engine.history.HistoricVariableInstance;
@@ -47,6 +48,10 @@ public EntitySkippedException(HistoricIncident c7Incident, String message) {
4748
this(C7Entity.of(c7Incident), message);
4849
}
4950

51+
public EntitySkippedException(HistoricJobLog c7JobLog, String message) {
52+
this(C7Entity.of(c7JobLog), message);
53+
}
54+
5055
public EntitySkippedException(HistoricProcessInstance c7ProcessInstance, String message) {
5156
this(C7Entity.of(c7ProcessInstance), message);
5257
}

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
package io.camunda.migration.data.impl.history.migrator;
99

1010
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_FLOW_NODE;
11+
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_JOB_REFERENCE;
1112
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_DEFINITION;
1213
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE;
1314
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE;
1415
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT;
16+
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB;
1517
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE;
1618

1719
import io.camunda.db.rdbms.read.domain.FlowNodeInstanceDbQuery;
@@ -66,12 +68,10 @@ public IdKeyMapper.TYPE getType() {
6668
* <li>Process instance key missing - skipped with {@code SKIP_REASON_MISSING_PROCESS_INSTANCE_KEY}</li>
6769
* <li>Process definition not yet migrated - skipped with {@code SKIP_REASON_MISSING_PROCESS_DEFINITION}</li>
6870
* <li>Root process instance not yet migrated (when part of a process hierarchy) - skipped with {@code SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE}</li>
71+
* <li>Referenced job explicitly skipped - skipped with {@code SKIP_REASON_MISSING_JOB_REFERENCE}</li>
6972
* <li>Interceptor error during conversion - skipped with the exception message</li>
7073
* </ul>
7174
*
72-
* <p><strong>Note:</strong> Flow node instance and job reference validations are currently disabled
73-
* pending resolution of known issues. See code comments for details.
74-
*
7575
* @param c7Incident the historic incident from Camunda 7 to be migrated
7676
* @throws EntityInterceptorException if an error occurs during entity conversion (handled internally, entity marked as skipped)
7777
*/
@@ -104,6 +104,8 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
104104
}
105105
}
106106

107+
resolveJobKey(c7Incident, builder);
108+
107109
IncidentDbModel dbModel = convert(C7Entity.of(c7Incident), builder);
108110

109111
if (dbModel.processDefinitionKey() == null) {
@@ -154,5 +156,51 @@ protected Long findFlowNodeInstanceKey(String activityId, String processInstance
154156
}
155157
}
156158

159+
/**
160+
* Resolves and sets the job key on the incident builder for {@code failedJob} incidents.
161+
* <p>
162+
* If the incident is a {@code failedJob} incident and the associated C7 job has been tracked
163+
* in the migration table:
164+
* <ul>
165+
* <li>If the job was migrated (has a C8 key), sets the {@code jobKey} on the builder.</li>
166+
* <li>If the job was explicitly skipped (null C8 key), throws an
167+
* {@link EntitySkippedException} to skip this incident as well.</li>
168+
* </ul>
169+
* If the job is not yet tracked (job migration may not have run or the job type is not
170+
* tracked), the incident proceeds without a job key.
171+
* </p>
172+
*
173+
* @param c7Incident the Camunda 7 incident
174+
* @param builder the incident builder to set the job key on
175+
* @throws EntitySkippedException if the associated job was explicitly skipped
176+
*/
177+
protected void resolveJobKey(final HistoricIncident c7Incident, final Builder builder) {
178+
if (!isFailedJobIncident(c7Incident)) {
179+
return;
180+
}
181+
final String c7JobId = c7Incident.getConfiguration();
182+
if (c7JobId == null) {
183+
return;
184+
}
185+
if (dbClient.checkExistsByC7IdAndType(c7JobId, HISTORY_JOB)) {
186+
final Long jobKey = dbClient.findC8KeyByC7IdAndType(c7JobId, HISTORY_JOB);
187+
if (jobKey != null) {
188+
builder.jobKey(jobKey);
189+
} else {
190+
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE);
191+
}
192+
}
193+
}
194+
195+
/**
196+
* Returns true if this incident was caused by a job failure.
197+
*
198+
* @param c7Incident the Camunda 7 incident
199+
* @return true for {@code failedJob} incident type
200+
*/
201+
protected boolean isFailedJobIncident(final HistoricIncident c7Incident) {
202+
return "failedJob".equals(c7Incident.getIncidentType());
203+
}
204+
157205
}
158206

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
3+
* one or more contributor license agreements. See the NOTICE file distributed
4+
* with this work for additional information regarding copyright ownership.
5+
* Licensed under the Camunda License 1.0. You may not use this file
6+
* except in compliance with the Camunda License 1.0.
7+
*/
8+
package io.camunda.migration.data.impl.history.migrator;
9+
10+
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE;
11+
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.logMigratingJob;
12+
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB;
13+
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE;
14+
import static io.camunda.migration.data.impl.util.ConverterUtil.getNextKey;
15+
16+
import io.camunda.db.rdbms.write.domain.JobDbModel;
17+
import io.camunda.migration.data.impl.history.C7Entity;
18+
import io.camunda.migration.data.impl.history.EntitySkippedException;
19+
import io.camunda.migration.data.impl.logging.HistoryMigratorLogs;
20+
import io.camunda.search.entities.ProcessInstanceEntity;
21+
import org.camunda.bpm.engine.history.HistoricJobLog;
22+
import org.springframework.stereotype.Service;
23+
24+
/**
25+
* Service class responsible for migrating historic job log entries from Camunda 7 to Camunda 8.
26+
* <p>
27+
* Job logs in Camunda 7 record lifecycle events (creation, execution, failure, deletion) for each
28+
* job. This migrator converts C7 job log entries to C8 {@link JobDbModel} records, tracking each
29+
* C7 job by its job ID so that only one C8 record is created per C7 job.
30+
* </p>
31+
* <p>
32+
* The C8 job key is stored in the migration tracking table keyed by the C7 job ID. This allows
33+
* the {@link IncidentMigrator} to look up the C8 job key when migrating {@code failedJob}
34+
* incidents.
35+
* </p>
36+
*/
37+
@Service
38+
public class JobMigrator extends BaseMigrator<HistoricJobLog, JobDbModel> {
39+
40+
/**
41+
* Migrates all historic job log entries from Camunda 7 to Camunda 8.
42+
* <p>
43+
* Processes log entries in ascending timestamp order. Since tracking is done by C7 job ID, only
44+
* the first log entry encountered per job is migrated; subsequent entries for the same job are
45+
* deduplicated via the tracking table.
46+
* </p>
47+
*/
48+
@Override
49+
public void migrateAll() {
50+
fetchMigrateOrRetry(
51+
HISTORY_JOB,
52+
c7Client::getHistoricJobLog,
53+
c7Client::fetchAndHandleHistoricJobLogs
54+
);
55+
}
56+
57+
/**
58+
* Migrates a single historic job log entry from Camunda 7 to Camunda 8.
59+
* <p>
60+
* Uses the C7 job ID as the tracking key, ensuring that only one C8 job record is created
61+
* per C7 job across multiple log entries.
62+
* </p>
63+
*
64+
* <p>Skip scenarios:
65+
* <ul>
66+
* <li>Job already tracked in the migration table - silently skipped</li>
67+
* <li>Process instance not yet migrated - skipped with
68+
* {@code SKIP_REASON_MISSING_PROCESS_INSTANCE}</li>
69+
* </ul>
70+
*
71+
* @param c7JobLog the Camunda 7 historic job log entry to migrate
72+
* @return the C8 job key as a string, or {@code null} if already migrated
73+
*/
74+
@Override
75+
public Long migrateTransactionally(final HistoricJobLog c7JobLog) {
76+
final String c7JobId = c7JobLog.getJobId();
77+
if (shouldMigrate(c7JobId, HISTORY_JOB)) {
78+
logMigratingJob(c7JobId);
79+
80+
final var jobKey = getNextKey();
81+
final var builder = new JobDbModel.Builder().jobKey(jobKey);
82+
83+
final String c7ProcessInstanceId = c7JobLog.getProcessInstanceId();
84+
final ProcessInstanceEntity processInstance = findProcessInstanceByC7Id(c7ProcessInstanceId);
85+
if (processInstance != null) {
86+
builder.processInstanceKey(processInstance.processInstanceKey());
87+
88+
final var processDefinitionKey = findProcessDefinitionKey(c7JobLog.getProcessDefinitionId());
89+
builder.processDefinitionKey(processDefinitionKey);
90+
91+
final String c7RootProcessInstanceId = c7JobLog.getRootProcessInstanceId();
92+
if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) {
93+
final ProcessInstanceEntity rootProcessInstance = findProcessInstanceByC7Id(c7RootProcessInstanceId);
94+
if (rootProcessInstance != null && rootProcessInstance.processInstanceKey() != null) {
95+
builder.rootProcessInstanceKey(rootProcessInstance.processInstanceKey());
96+
}
97+
}
98+
99+
final Long elementInstanceKey = findFlowNodeInstanceKey(
100+
c7JobLog.getActivityId(), c7ProcessInstanceId);
101+
builder.elementInstanceKey(elementInstanceKey);
102+
}
103+
104+
final JobDbModel dbModel = convert(C7Entity.of(c7JobLog), builder);
105+
106+
if (dbModel.processInstanceKey() == null) {
107+
throw new EntitySkippedException(c7JobLog, SKIP_REASON_MISSING_PROCESS_INSTANCE);
108+
}
109+
110+
c8Client.insertJob(dbModel);
111+
112+
return jobKey;
113+
}
114+
115+
return null;
116+
}
117+
}

0 commit comments

Comments
 (0)