Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.camunda.migration.data.impl.history.ProcessInstanceMigrator;
import io.camunda.migration.data.impl.history.UserTaskMigrator;
import io.camunda.migration.data.impl.history.VariableMigrator;
import io.camunda.migration.data.impl.history.AuditLogMigrator;
import io.camunda.migration.data.impl.clients.DbClient;
import io.camunda.migration.data.impl.util.ExceptionUtils;
import io.camunda.migration.data.impl.util.PrintUtils;
Expand Down Expand Up @@ -63,6 +64,9 @@ public class HistoryMigrator {
@Autowired
protected DecisionInstanceMigrator decisionInstanceMigrator;

@Autowired
protected AuditLogMigrator auditLogMigrator;

@Autowired
protected DbClient dbClient;

Expand Down Expand Up @@ -106,6 +110,7 @@ public void migrate() {
migrateDecisionRequirementsDefinitions();
migrateDecisionDefinitions();
migrateDecisionInstances();
migrateAuditLogs();
}

public void migrateProcessDefinitions() {
Expand Down Expand Up @@ -144,6 +149,10 @@ public void migrateDecisionInstances() {
decisionInstanceMigrator.migrate();
}

public void migrateAuditLogs() {
auditLogMigrator.migrate();
}

public void setRequestedEntityTypes(List<TYPE> requestedEntityTypes) {
this.requestedEntityTypes = requestedEntityTypes;
}
Expand All @@ -159,5 +168,6 @@ public void setMode(MigratorMode mode) {
processInstanceMigrator.setMode(mode);
userTaskMigrator.setMode(mode);
variableMigrator.setMode(mode);
auditLogMigrator.setMode(mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void migrate() {
if (skipReason == null && shouldStartProcessInstance(c7ProcessInstanceId)) {
startProcessInstance(c7ProcessInstanceId, createTime);
} else if (isUnknown(c7ProcessInstanceId)) {
dbClient.insert(c7ProcessInstanceId, null, createTime, TYPE.RUNTIME_PROCESS_INSTANCE, skipReason);
dbClient.insert(c7ProcessInstanceId, (Long) null, createTime, TYPE.RUNTIME_PROCESS_INSTANCE, skipReason);
} else {
dbClient.updateSkipReason(c7ProcessInstanceId, TYPE.RUNTIME_PROCESS_INSTANCE, skipReason);
}
Expand Down Expand Up @@ -131,7 +131,7 @@ protected void handleVariableInterceptorException(VariableInterceptorException e
RuntimeMigratorLogs.stacktrace(e);

if (MIGRATE.equals(mode)) {
dbClient.insert(c7ProcessInstanceId, null, createTime, TYPE.RUNTIME_PROCESS_INSTANCE, e.getMessage());
dbClient.insert(c7ProcessInstanceId, (Long) null, createTime, TYPE.RUNTIME_PROCESS_INSTANCE, e.getMessage());
} else if (RETRY_SKIPPED.equals(mode)) {
dbClient.updateSkipReason(c7ProcessInstanceId, TYPE.RUNTIME_PROCESS_INSTANCE, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.camunda.migration.data.impl.interceptor.PrimitiveVariableTransformer;
import io.camunda.migration.data.impl.interceptor.SpinJsonVariableTransformer;
import io.camunda.migration.data.impl.interceptor.SpinXmlVariableTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.AuditLogTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.DecisionDefinitionTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.DecisionInstanceTransformer;
import io.camunda.migration.data.impl.interceptor.history.entity.DecisionRequirementsDefinitionTransformer;
Expand Down Expand Up @@ -367,4 +368,9 @@ public UserTaskTransformer userTaskTransformer() {
public VariableTransformer variableTransformer() {
return new VariableTransformer();
}

@Bean
public AuditLogTransformer auditLogTransformer() {
return new AuditLogTransformer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.camunda.migration.data.impl.clients.DbClient;
import io.camunda.migration.data.impl.VariableService;
import io.camunda.migration.data.impl.RuntimeValidator;
import io.camunda.migration.data.impl.history.AuditLogMigrator;
import io.camunda.migration.data.impl.identity.AuthorizationManager;
import io.camunda.migration.data.impl.history.DecisionDefinitionMigrator;
import io.camunda.migration.data.impl.history.DecisionInstanceMigrator;
Expand Down Expand Up @@ -86,6 +87,7 @@
ProcessInstanceMigrator.class,
UserTaskMigrator.class,
VariableMigrator.class,
AuditLogMigrator.class,
SchemaShutdownCleaner.class
})
@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ protected MigratorConstants() {}
* collide with actual Zeebe partition keys during migration.
*/
public static int C7_HISTORY_PARTITION_ID = 4095;
public static int C7_AUDIT_LOG_ENTITY_VERSION = -4095;

public static final String LEGACY_ID_VAR_NAME = "legacyId";
public static final String C8_DEFAULT_TENANT = "<default>";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.camunda.bpm.engine.history.HistoricProcessInstance;
import org.camunda.bpm.engine.history.HistoricTaskInstance;
import org.camunda.bpm.engine.history.HistoricVariableInstance;
import org.camunda.bpm.engine.history.UserOperationLogEntry;
import org.camunda.bpm.engine.identity.Tenant;
import org.camunda.bpm.engine.identity.TenantQuery;
import org.camunda.bpm.engine.impl.AuthorizationQueryImpl;
Expand All @@ -49,6 +50,7 @@
import org.camunda.bpm.engine.impl.HistoricProcessInstanceQueryImpl;
import org.camunda.bpm.engine.impl.HistoricTaskInstanceQueryImpl;
import org.camunda.bpm.engine.impl.HistoricVariableInstanceQueryImpl;
import org.camunda.bpm.engine.impl.UserOperationLogQueryImpl;
import org.camunda.bpm.engine.impl.ProcessDefinitionQueryImpl;
import org.camunda.bpm.engine.impl.TenantQueryImpl;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
Expand Down Expand Up @@ -495,6 +497,35 @@ public void fetchAndHandleHistoricFlowNodes(Consumer<HistoricActivityInstance> c
.callback(callback);
}

/**
* Processes historic user operation log entries with pagination using the provided callback consumer.
*/
public void fetchAndHandleUserOperationLogEntries(Consumer<UserOperationLogEntry> callback, Date timestampAfter) {
UserOperationLogQueryImpl query = (UserOperationLogQueryImpl) historyService.createUserOperationLogQuery()
.orderByTimestamp()
.asc()
.orderByOperationId()
.asc();

if (timestampAfter != null) {
query.afterTimestamp(timestampAfter);
}

new Pagination<UserOperationLogEntry>()
.pageSize(properties.getPageSize())
.query(query)
.maxCount(query::count)
.callback(callback);
}

/**
* Gets a single user operation log entry by ID.
*/
public UserOperationLogEntry getUserOperationLogEntry(String c7Id) {
var query = historyService.createUserOperationLogQuery().operationId(c7Id);
return callApi(query::singleResult, format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "UserOperationLogEntry", c7Id));
}

/**
* Processes tenant entities with pagination using the provided callback consumer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_FETCH_PROCESS_DEFINITION_XML;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_FETCH_VARIABLE;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_FIND_PROCESS_INSTANCE_BY_KEY;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_AUDIT_LOG;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_DEFINITION;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_INSTANCE;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_INSTANCE_INPUT;
Expand All @@ -33,6 +34,7 @@
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_SEARCH_FLOW_NODE_INSTANCES;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_SEARCH_PROCESS_DEFINITIONS;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_SEARCH_PROCESS_INSTANCE;
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_SEARCH_USER_TASKS;
import static io.camunda.migration.data.impl.util.ConverterUtil.getTenantId;
import static io.camunda.migration.data.impl.util.ExceptionUtils.callApi;

Expand All @@ -56,6 +58,8 @@
import io.camunda.db.rdbms.read.domain.FlowNodeInstanceDbQuery;
import io.camunda.db.rdbms.read.domain.ProcessDefinitionDbQuery;
import io.camunda.db.rdbms.read.domain.ProcessInstanceDbQuery;
import io.camunda.db.rdbms.read.domain.UserTaskDbQuery;
import io.camunda.db.rdbms.sql.AuditLogMapper;
import io.camunda.db.rdbms.sql.DecisionDefinitionMapper;
import io.camunda.db.rdbms.sql.DecisionInstanceMapper;
import io.camunda.db.rdbms.sql.DecisionRequirementsMapper;
Expand All @@ -65,6 +69,7 @@
import io.camunda.db.rdbms.sql.ProcessInstanceMapper;
import io.camunda.db.rdbms.sql.UserTaskMapper;
import io.camunda.db.rdbms.sql.VariableMapper;
import io.camunda.db.rdbms.write.domain.AuditLogDbModel;
import io.camunda.db.rdbms.write.domain.DecisionDefinitionDbModel;
import io.camunda.db.rdbms.write.domain.DecisionInstanceDbModel;
import io.camunda.db.rdbms.write.domain.DecisionRequirementsDbModel;
Expand Down Expand Up @@ -133,6 +138,9 @@ public class C8Client {
@Autowired(required = false)
protected DecisionRequirementsMapper decisionRequirementsMapper;

@Autowired(required = false)
protected AuditLogMapper auditLogMapper;

/**
* Creates a new process instance with the given BPMN process ID and variables.
*/
Expand Down Expand Up @@ -332,13 +340,27 @@ public void insertFlowNodeInstance(FlowNodeInstanceDbModel dbModel) {
callApi(() -> flowNodeInstanceMapper.insert(new BatchInsertDto(List.of(dbModel))), FAILED_TO_INSERT_FLOW_NODE_INSTANCE);
}

/**
* Inserts an AuditLog into the database.
*/
public void insertAuditLog(AuditLogDbModel dbModel) {
callApi(() -> auditLogMapper.insert(new BatchInsertDto(List.of(dbModel))), FAILED_TO_INSERT_AUDIT_LOG);
}

/**
* Searches for FlowNodeInstances matching the query.
*/
public List<FlowNodeInstanceDbModel> searchFlowNodeInstances(FlowNodeInstanceDbQuery query) {
return callApi(() -> flowNodeInstanceMapper.search(query), FAILED_TO_SEARCH_FLOW_NODE_INSTANCES);
}

/**
* Searches for User tasks matching the query.
*/
public List<UserTaskDbModel> searchUserTasks(UserTaskDbQuery query){
return callApi(() -> userTaskMapper.search(query), FAILED_TO_SEARCH_USER_TASKS);
}

/**
* Searches for ProcessDefinitions matching the query.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public List<String> findAllC7Ids() {
* Updates a record by setting the key for an existing ID and type.
*/
public void updateC8KeyByC7IdAndType(String c7Id, Long c8Key, TYPE type) {
updateC8KeyByC7IdAndType(c7Id, (c8Key == null) ? null : c8Key.toString(), type);
}

/**
* Updates a record by setting the key for an existing ID and type.
*/
public void updateC8KeyByC7IdAndType(String c7Id, String c8Key, TYPE type) {
DbClientLogs.updatingC8KeyForC7Id(c7Id, c8Key);
var model = createIdKeyDbModel(c7Id, null, c8Key, type);
callApi(() -> idKeyMapper.updateC8KeyByC7IdAndType(model), FAILED_TO_UPDATE_KEY + c8Key);
Expand All @@ -115,7 +122,7 @@ public void updateSkipReason(String c7Id, TYPE type, String skipReason) {
}

/**
* Inserts a new process instance record into the mapping table.
* Inserts a new record of the given type into the mapping table.
*/
public void insert(String c7Id, Long c8Key, Date createTime, TYPE type) {
insert(c7Id, c8Key, createTime, type, null);
Expand All @@ -129,9 +136,16 @@ public void insert(String c7Id, Long c8Key, TYPE type) {
}

/**
* Inserts a new process instance record into the mapping table.
* Inserts a new record of the given type into the mapping table.
*/
public void insert(String c7Id, Long c8Key, Date createTime, TYPE type, String skipReason) {
insert(c7Id, (c8Key == null) ? null : c8Key.toString(), createTime, type, skipReason);
}

/**
* Inserts a new record of the given type into the mapping table.
*/
public void insert(String c7Id, String c8Key, Date createTime, TYPE type, String skipReason) {
String finalSkipReason = properties.getSaveSkipReason() ? skipReason : null;
DbClientLogs.insertingRecord(c7Id, createTime, null, finalSkipReason);
var model = createIdKeyDbModel(c7Id, createTime, c8Key, type, finalSkipReason);
Expand Down Expand Up @@ -201,7 +215,7 @@ protected void deleteByC7Id(String c7Id) {
/**
* Creates a new IdKeyDbModel instance with the provided parameters including skip reason.
*/
protected IdKeyDbModel createIdKeyDbModel(String c7Id, Date createTime, Long c8Key, TYPE type, String skipReason) {
protected IdKeyDbModel createIdKeyDbModel(String c7Id, Date createTime, String c8Key, TYPE type, String skipReason) {
var keyIdDbModel = new IdKeyDbModel();
keyIdDbModel.setC7Id(c7Id);
keyIdDbModel.setCreateTime(createTime);
Expand All @@ -214,7 +228,7 @@ protected IdKeyDbModel createIdKeyDbModel(String c7Id, Date createTime, Long c8K
/**
* Creates a new IdKeyDbModel instance with the provided parameters.
*/
protected IdKeyDbModel createIdKeyDbModel(String c7Id, Date createTime, Long c8Key, TYPE type) {
protected IdKeyDbModel createIdKeyDbModel(String c7Id, Date createTime, String c8Key, TYPE type) {
return createIdKeyDbModel(c7Id, createTime, c8Key, type, null);
}
}
Loading