Skip to content

Commit 9750d92

Browse files
committed
wip
1 parent e24c084 commit 9750d92

File tree

2 files changed

+59
-16
lines changed

2 files changed

+59
-16
lines changed

data-migrator/core/src/main/java/io/camunda/migration/data/config/InterceptorConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.camunda.migration.data.impl.interceptor.history.entity.FlowNodeTransformer;
3232
import io.camunda.migration.data.impl.interceptor.history.entity.FormTransformer;
3333
import io.camunda.migration.data.impl.interceptor.history.entity.IncidentTransformer;
34+
import io.camunda.migration.data.impl.interceptor.history.entity.JobTransformer;
3435
import io.camunda.migration.data.impl.interceptor.history.entity.ProcessDefinitionTransformer;
3536
import io.camunda.migration.data.impl.interceptor.history.entity.ProcessInstanceTransformer;
3637
import io.camunda.migration.data.impl.interceptor.history.entity.UserTaskTransformer;
@@ -357,6 +358,11 @@ public FormTransformer formTransformer() {
357358
return new FormTransformer();
358359
}
359360

361+
@Bean
362+
public JobTransformer jobTransformer() {
363+
return new JobTransformer();
364+
}
365+
360366
@Bean
361367
public IncidentTransformer incidentTransformer() {
362368
return new IncidentTransformer();

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/JobMigrator.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,19 @@
1313
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE;
1414
import static io.camunda.migration.data.impl.util.ConverterUtil.getNextKey;
1515

16+
import io.camunda.db.rdbms.read.domain.FlowNodeInstanceDbQuery;
17+
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
1618
import io.camunda.db.rdbms.write.domain.JobDbModel;
1719
import io.camunda.migration.data.impl.history.C7Entity;
1820
import io.camunda.migration.data.impl.history.EntitySkippedException;
19-
import io.camunda.migration.data.impl.logging.HistoryMigratorLogs;
21+
import io.camunda.migration.data.impl.persistence.IdKeyMapper;
2022
import io.camunda.search.entities.ProcessInstanceEntity;
23+
import io.camunda.search.filter.FlowNodeInstanceFilter;
24+
import java.util.Date;
25+
import java.util.List;
26+
import java.util.function.BiConsumer;
27+
import java.util.function.Consumer;
28+
import java.util.function.Function;
2129
import org.camunda.bpm.engine.history.HistoricJobLog;
2230
import org.springframework.stereotype.Service;
2331

@@ -35,23 +43,21 @@
3543
* </p>
3644
*/
3745
@Service
38-
public class JobMigrator extends BaseMigrator<HistoricJobLog, JobDbModel> {
46+
public class JobMigrator extends HistoryEntityMigrator<HistoricJobLog, JobDbModel> {
3947

40-
/**
41-
* Migrates all historic job log entries from Camunda 7 to Camunda 8.
42-
* <p>
43-
* Processes log entries in ascending timestamp order. Since tracking is done by C7 job ID, only
44-
* the first log entry encountered per job is migrated; subsequent entries for the same job are
45-
* deduplicated via the tracking table.
46-
* </p>
47-
*/
4848
@Override
49-
public void migrateAll() {
50-
fetchMigrateOrRetry(
51-
HISTORY_JOB,
52-
c7Client::getHistoricJobLog,
53-
c7Client::fetchAndHandleHistoricJobLogs
54-
);
49+
public BiConsumer<Consumer<HistoricJobLog>, Date> fetchForMigrateHandler() {
50+
return c7Client::fetchAndHandleHistoricJobLogs;
51+
}
52+
53+
@Override
54+
public Function<String, HistoricJobLog> fetchForRetryHandler() {
55+
return c7Client::getHistoricJobLog;
56+
}
57+
58+
@Override
59+
public IdKeyMapper.TYPE getType() {
60+
return HISTORY_JOB;
5561
}
5662

5763
/**
@@ -114,4 +120,35 @@ public Long migrateTransactionally(final HistoricJobLog c7JobLog) {
114120

115121
return null;
116122
}
123+
124+
/**
125+
* Finds the C8 flow node instance key by C7 activity ID and process instance ID.
126+
* <p>
127+
* Since {@link HistoricJobLog} provides only the activity ID (not the activity instance ID),
128+
* this method searches C8 flow node instances by activity ID within the migrated process
129+
* instance.
130+
* </p>
131+
*
132+
* @param activityId the C7 activity ID
133+
* @param processInstanceId the C7 process instance ID
134+
* @return the C8 flow node instance key, or {@code null} if not found
135+
*/
136+
protected Long findFlowNodeInstanceKey(String activityId, String processInstanceId) {
137+
Long processInstanceKey = dbClient.findC8KeyByC7IdAndType(processInstanceId, HISTORY_PROCESS_INSTANCE);
138+
if (processInstanceKey == null) {
139+
return null;
140+
}
141+
142+
List<FlowNodeInstanceDbModel> flowNodes = c8Client.searchFlowNodeInstances(
143+
FlowNodeInstanceDbQuery.of(builder -> builder.filter(
144+
FlowNodeInstanceFilter.of(filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey))
145+
))
146+
);
147+
148+
if (!flowNodes.isEmpty()) {
149+
return flowNodes.getFirst().flowNodeInstanceKey();
150+
} else {
151+
return null;
152+
}
153+
}
117154
}

0 commit comments

Comments
 (0)