Skip to content

Commit f29a687

Browse files
author
Arsen P
committed
Enhance transaction management in Kobo asset import
- Updated `import_asset` to ensure each submission is processed in its own transaction, allowing for partial success and rollback of only failed submissions. - Added detailed comments to clarify transaction handling and watermark persistence for recovery. - Introduced a new test to verify that earlier submissions remain intact when a later submission fails, ensuring data integrity during import processes. These changes improve the robustness of the import functionality and enhance error handling in the Kobo sync process.
1 parent 1c94ade commit f29a687

File tree

2 files changed

+88
-6
lines changed

2 files changed

+88
-6
lines changed

src/country_workspace/contrib/kobo/sync.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ def import_asset(
293293
for submission in submissions_iterator:
294294
current_submission = submission
295295
originating_id = get_kobo_originating_id(asset.uid, str(submission.id))
296+
# One transaction per submission: if this block fails, only this
297+
# submission is rolled back; previously committed data stays.
296298
with transaction.atomic():
297299
household = create_household(batch, submission, config, id_generator, originating_id)
298300
individuals = create_individuals(batch, household, submission, config, originating_id)
@@ -319,6 +321,8 @@ def import_asset(
319321
raise ImportError(error_msg) from e
320322

321323
finally:
324+
# Persist watermark so the next run (or retry) resumes after the last
325+
# successful submission; runs even when an exception is raised.
322326
if last_successful_id > last_id:
323327
SyncLog.objects.update_or_create(
324328
name=sync_log_name,
@@ -347,8 +351,22 @@ def id_generator() -> int:
347351

348352

349353
def import_data(job: AsyncJob) -> ImportResult:
354+
"""
355+
Import Kobo asset data for a job.
356+
357+
Transaction management:
358+
- Batch setup (create or lock batch, set status to LOADING) runs in a single
359+
atomic block so the batch is visible and locked before import starts.
360+
- import_asset() runs outside that block; it commits each submission in its
361+
own transaction (see import_asset). That way, partial progress is persisted
362+
and the watermark (SyncLog) can be updated per submission.
363+
- On success, batch status is updated to COMPLETE in a final atomic block.
364+
- On incomplete (timebox or reschedule), a new AsyncJob is queued; no batch
365+
status change until the run that completes.
366+
"""
350367
config: Config = job.config
351368

369+
# Create or lock the batch and set status to LOADING in one transaction.
352370
with transaction.atomic():
353371
batch_id = getattr(job, "batch_id", None)
354372
if batch_id:
@@ -381,25 +399,28 @@ def import_data(job: AsyncJob) -> ImportResult:
381399
timebox_minutes = getattr(constance_config, "KOBO_IMPORT_TIMEBOX_MINUTES", 0) or 0
382400
timebox_seconds = int(timebox_minutes * 60) if timebox_minutes > 0 else None
383401

384-
import_result = import_asset(
402+
# import_asset commits each submission in its own transaction; on error it
403+
# raises after updating the watermark so the next run can resume.
404+
asset_import_result = import_asset(
385405
batch,
386406
asset,
387407
config,
388408
id_generator,
389409
timebox_seconds=timebox_seconds,
390410
)
391-
household_counter += import_result["households"]
392-
individual_counter += import_result["individuals"]
411+
household_counter += asset_import_result["households"]
412+
individual_counter += asset_import_result["individuals"]
393413

394-
if import_result["completed"] and config.get("validate_after_import"):
414+
if asset_import_result["completed"] and config.get("validate_after_import"):
395415
create_validation_jobs(
396416
description=f"Validate records for batch {batch.pk}",
397417
owner=job.owner,
398418
program=job.program,
399419
queryset=batch.household_set.all().prefetch_related("members"), # type: ignore[attr-defined]
400420
)
401421

402-
if import_result["completed"]:
422+
if asset_import_result["completed"]:
423+
# Mark batch complete in a dedicated transaction.
403424
with transaction.atomic():
404425
Batch.objects.select_for_update().filter(pk=batch.pk).update(status=Batch.BatchStatus.COMPLETE)
405426
else:
@@ -417,5 +438,5 @@ def import_data(job: AsyncJob) -> ImportResult:
417438
return ImportResult(
418439
households=household_counter,
419440
individuals=individual_counter,
420-
completed=import_result["completed"],
441+
completed=asset_import_result["completed"],
421442
)

tests/contrib/kobo/test_kobo_sync.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
DataCheckerFactory,
3636
FieldsetFactory,
3737
FlexFieldFactory,
38+
HouseholdFactory,
39+
IndividualFactory,
3840
SyncLogFactory,
3941
)
4042
from testutils.factories.smart_fields import DataCheckerFieldsetFactory
@@ -392,6 +394,65 @@ def test_import_asset_with_error(mocker: MockerFixture, config: Config) -> None:
392394
assert sync_log.last_id == "101"
393395

394396

397+
@pytest.mark.django_db
398+
def test_import_asset_on_error_persists_previous_data(mocker: MockerFixture, config: Config) -> None:
399+
"""
400+
When import_asset fails on a later submission, earlier data are persisted:
401+
each submission runs in its own transaction, so a failure rolls back only
402+
that submission; we assert the first household (and its individuals) remain
403+
in the DB and the watermark is updated for recovery.
404+
"""
405+
batch = BatchFactory()
406+
program_ct = ContentType.objects.get_for_model(Program)
407+
SyncLogFactory(
408+
name="kobo_test_asset_uid",
409+
content_type=program_ct,
410+
object_id=batch.program.id,
411+
last_id="100",
412+
)
413+
414+
id_generator_mock = mocker.Mock(name="id_generator")
415+
416+
def create_household_real(batch, submission, config, id_generator, originating_id):
417+
return HouseholdFactory(batch=batch, individuals=[])
418+
419+
def create_individuals_real(batch, household, submission, config, originating_id):
420+
return [IndividualFactory(batch=batch, household=household)]
421+
422+
mocker.patch(
423+
"country_workspace.contrib.kobo.sync.create_household",
424+
side_effect=create_household_real,
425+
)
426+
mocker.patch(
427+
"country_workspace.contrib.kobo.sync.create_individuals",
428+
side_effect=create_individuals_real,
429+
)
430+
set_roles_and_relationships_mock = mocker.patch("country_workspace.contrib.kobo.sync.set_roles_and_relationships")
431+
set_roles_and_relationships_mock.side_effect = [None, ValueError("fail on second")]
432+
433+
submission_1 = Mock(spec=dict)
434+
submission_1.id = 101
435+
submission_2 = Mock(spec=dict)
436+
submission_2.id = 102
437+
asset_mock = Mock()
438+
asset_mock.uid = "test_asset_uid"
439+
asset_mock.submissions = Mock(return_value=iter([submission_1, submission_2]))
440+
441+
with pytest.raises(ImportError, match=r"Successfully imported.*at submission 102"):
442+
import_asset(batch, asset_mock, config, id_generator_mock)
443+
444+
batch.refresh_from_db()
445+
assert batch.household_set.count() == 1
446+
assert batch.household_set.first().members.count() == 1
447+
448+
sync_log = SyncLog.objects.get(
449+
name="kobo_test_asset_uid",
450+
content_type=program_ct,
451+
object_id=batch.program.id,
452+
)
453+
assert sync_log.last_id == "101"
454+
455+
395456
@pytest.mark.django_db
396457
def test_import_asset_no_new_submissions(mocker: MockerFixture, config: Config) -> None:
397458
batch = BatchFactory()

0 commit comments

Comments
 (0)