Skip to content

Commit 2122a31

Browse files
rebase; refactor; tests
1 parent 24ee7e0 commit 2122a31

File tree

14 files changed

+766
-368
lines changed

14 files changed

+766
-368
lines changed

docs/src/flows/rdp_matrix.md

Lines changed: 0 additions & 39 deletions
This file was deleted.

src/country_workspace/contrib/hope/push/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@
33
create_rdp_core,
44
dedup_existing_rdp_core,
55
push_existing_rdp_core,
6-
dedup_engine_status_or_error,
76
)
87

98
__all__ = [
109
"CreateRdpConfig",
1110
"PushExistingRdpConfig",
1211
"create_rdp_core",
13-
"dedup_engine_status_or_error",
1412
"dedup_existing_rdp_core",
1513
"push_existing_rdp_core",
1614
]
Lines changed: 22 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,25 @@
11
from collections.abc import Callable, Iterator
22
from functools import partial
33
from typing import Any
4-
from requests.exceptions import RequestException
5-
from uuid import UUID
64
from django.db import transaction, IntegrityError
75

86
from country_workspace.contrib.hope.exceptions import HopePushError
9-
from country_workspace.contrib.hope.constants import IMAGES_TO_DEDUPLICATE_BULK_BATCH_SIZE
107
from country_workspace.models import AsyncJob, Rdp
118
from country_workspace.workspaces.models import CountryIndividual
12-
from country_workspace.contrib.dedup_engine.client import Status as DedupClientStatus, make_client
139
from country_workspace.contrib.dedup_engine.response import Status as DedupResponseStatus
1410

1511

16-
from .processor import PushProcessor
12+
from .processor import PushProcessor, DedupProcessor
1713
from .config import CreateRdpConfig, PushWorkflowConfig
1814
from .repository import (
1915
individuals_by_pks,
20-
individuals_for_rdp,
2116
individuals_by_household_pks,
2217
households,
2318
rdp_for_push,
24-
rdp_for_dedup,
2519
preflight_errors,
2620
workflow_config_for_rdp,
2721
)
22+
from .transport import dedup_api
2823

2924

3025
def create_rdp_records(config: CreateRdpConfig, job_id: int) -> Rdp:
@@ -100,99 +95,13 @@ def create_rdp_core(job: AsyncJob) -> dict[str, Any]:
10095
return {"rdp_id": rdp.id, "rdp_str": str(rdp)}
10196

10297

103-
def dedup_engine_status_or_error(program_code: str) -> DedupClientStatus:
104-
"""Fetch DedupEngine status or raise HopePushError."""
105-
try:
106-
with make_client(program_code) as client:
107-
return client.status()
108-
except (RequestException, ValueError) as e:
109-
raise HopePushError({"errors": [f"DedupEngine: status() failed: {e}"]}) from e
110-
111-
112-
def dedup_engine_approve_or_error(program_code: str) -> None:
113-
"""Approve DedupEngine results or raise HopePushError."""
114-
try:
115-
with make_client(program_code) as client:
116-
client.approve()
117-
except (RequestException, ValueError) as e:
118-
raise HopePushError({"errors": [f"DedupEngine: approve() failed: {e}"]}) from e
119-
120-
121-
def _parse_uuid(value: object) -> UUID | None:
122-
"""Return UUID parsed from value or None if parsing fails."""
123-
if value is None:
124-
return None
125-
try:
126-
return UUID(str(value))
127-
except (TypeError, ValueError):
128-
return None
129-
130-
131-
def _collect_dedup_images(*, rdp: Rdp) -> tuple[list[dict[str, str]], int]:
132-
"""Collect DedupEngine images payload (reference_pk, filename) from RDP individuals."""
133-
rows = individuals_for_rdp(rdp=rdp).values_list("pk", "flex_fields__photo")
134-
135-
images: list[dict[str, str]] = []
136-
skipped = 0
137-
for pk, photo in rows.iterator(chunk_size=IMAGES_TO_DEDUPLICATE_BULK_BATCH_SIZE * 5):
138-
if isinstance(photo, str) and (photo := photo.strip()):
139-
images.append({"reference_pk": str(pk), "filename": photo})
140-
else:
141-
skipped += 1
142-
return images, skipped
143-
144-
145-
def _dedup_engine_create_set_or_error(*, program_code: str) -> UUID:
146-
"""Create a DedupEngine deduplication set and return its UUID."""
147-
try:
148-
with make_client(program_code) as client:
149-
ds_id = client.create_deduplication_set(settings={})
150-
except (RequestException, ValueError, KeyError, TypeError) as e:
151-
raise HopePushError({"errors": [f"DedupEngine: create_deduplication_set() failed: {e}"]}) from e
152-
153-
if (dedup_uuid := _parse_uuid(ds_id)) is None:
154-
raise HopePushError({"errors": [f"DedupEngine: invalid deduplication_set_id={ds_id!r}"]})
155-
return dedup_uuid
156-
157-
158-
def _dedup_engine_upload_and_process_or_error(*, program_code: str, images: list[dict[str, str]]) -> None:
159-
"""Upload images to DedupEngine and start processing."""
160-
try:
161-
with make_client(program_code) as client:
162-
client.create_images(images)
163-
client.process()
164-
except (RequestException, ValueError, KeyError, TypeError) as e:
165-
raise HopePushError({"errors": [f"DedupEngine: create_images()/process() failed: {e}"]}) from e
166-
167-
16898
def dedup_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
16999
"""Run DedupEngine deduplication for an existing RDP."""
170-
rdp = rdp_for_dedup(pk=job.config["rdp_id"])
171-
if rdp.status != Rdp.PushStatus.PENDING:
172-
raise HopePushError({"errors": [f"RDP: can not run dedup in status={rdp.status}"]})
173-
if rdp.dedup_run_state == rdp.DedupRunState.APPROVED:
174-
raise HopePushError({"errors": ["RDP: can not run dedup after approval"]})
175-
176-
images, skipped = _collect_dedup_images(rdp=rdp)
177-
total = {
178-
"rdp_id": rdp.pk,
179-
"program": rdp.program.code,
180-
"skipped_no_photo": skipped,
181-
"images_sent": len(images),
182-
}
183-
if not images:
184-
return total | {"deduplication_set_id": None}
185-
186-
deduplication_set_id = _dedup_engine_create_set_or_error(program_code=rdp.program.code)
187-
188-
Rdp.objects.filter(pk=rdp.pk).update(
189-
deduplication_set_id=deduplication_set_id,
190-
dedup_run_state=rdp.DedupRunState.IN_PROGRESS,
191-
)
192-
193-
_dedup_engine_upload_and_process_or_error(program_code=rdp.program.code, images=images)
194-
195-
return total | {"deduplication_set_id": str(deduplication_set_id)}
100+
processor = DedupProcessor(rdp_id=job.config["rdp_id"])
101+
processor.run()
102+
if processor.has_errors:
103+
raise HopePushError(processor.total)
104+
return processor.total
196105

197106

198107
def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
@@ -203,25 +112,24 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
203112
imported_by_email = getattr(job.owner, "email", "") or getattr(rdp.pushed_by, "email", "")
204113

205114
config: PushWorkflowConfig = workflow_config_for_rdp(rdp=rdp, imported_by_email=imported_by_email)
206-
processor = PushProcessor(config)
207-
for step in steps(processor, config):
115+
hope_processor = PushProcessor(config)
116+
for step in steps(hope_processor, config):
208117
step()
209-
if processor.total["errors"]:
210-
complete_rdp(rdp.id, Rdp.PushStatus.FAILURE, processor.hope_rdi_id or "N/A")
211-
raise HopePushError(processor.total)
212-
213-
program_code = rdp.program.code
214-
if rdp.dedup_run_state == Rdp.DedupRunState.IN_PROGRESS:
215-
try:
216-
if dedup_engine_status_or_error(program_code).status == DedupResponseStatus.SUCCESS:
217-
dedup_engine_approve_or_error(program_code)
118+
if hope_processor.has_errors:
119+
complete_rdp(rdp.id, Rdp.PushStatus.FAILURE, hope_processor.hope_rdi_id or "N/A")
120+
raise HopePushError(hope_processor.total)
121+
122+
if rdp.program.biometric_deduplication_enabled:
123+
with dedup_api(rdp.program.code, hope_processor._err) as de:
124+
if (de_status := de.status()) is None:
125+
raise HopePushError(hope_processor.total)
126+
if de_status.status == DedupResponseStatus.SUCCESS:
127+
if not de.approve():
128+
raise HopePushError(hope_processor.total)
218129
Rdp.objects.filter(pk=rdp.pk).update(dedup_run_state=Rdp.DedupRunState.APPROVED)
219-
except HopePushError:
220-
complete_rdp(rdp.id, Rdp.PushStatus.FAILURE, processor.hope_rdi_id or "N/A")
221-
raise
222130

223131
with transaction.atomic():
224-
updated = complete_rdp(rdp.id, Rdp.PushStatus.SUCCESS, processor.hope_rdi_id)
132+
updated = complete_rdp(rdp.id, Rdp.PushStatus.SUCCESS, hope_processor.hope_rdi_id)
225133
mark_rdp_beneficiaries_removed(updated, config["master_detail"])
226134

227-
return processor.total
135+
return hope_processor.total

src/country_workspace/contrib/hope/push/processor.py

Lines changed: 101 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,53 @@
33
from functools import cached_property
44
from itertools import batched
55
from typing import Any
6+
from uuid import UUID
67

78
from django.db.models import QuerySet
89

910
from country_workspace.contrib.hope.constants import PUSH_BATCH_SIZE
1011
from country_workspace.workspaces.models import CountryHousehold, CountryIndividual
1112

13+
from country_workspace.contrib.hope.constants import IMAGES_TO_DEDUPLICATE_BULK_BATCH_SIZE
14+
from country_workspace.models import Rdp
15+
from .repository import individuals_for_rdp, rdp_for_dedup
16+
17+
1218
from .config import ROLE_FIELDS, Serializer, ERROR_CONFIG, PushWorkflowConfig
1319
from .mappings import load_mapping_from_api, map_members, map_role_value
1420
from .repository import serializer_for_program, preflight_errors
15-
from .transport import HopeApi
21+
from .transport import HopeApi, dedup_api
22+
23+
24+
class ProcessorBase:
25+
"""Shared processor primitives."""
26+
27+
def __init__(self) -> None:
28+
self.total: dict[str, Any] = {"errors": []}
29+
30+
@property
31+
def has_errors(self) -> bool:
32+
"""Return True when at least one error was collected."""
33+
return bool(self.total["errors"])
34+
35+
def _err(self, msg: str) -> None:
36+
"""Append an error into total['errors']; truncate long text; cap the list with a marker."""
37+
errors: list[str] = self.total["errors"]
38+
if errors and errors[-1] == ERROR_CONFIG.MARKER:
39+
return
40+
if len(errors) >= ERROR_CONFIG.MAX_ERRORS - 1:
41+
errors.append(ERROR_CONFIG.MARKER)
42+
return
43+
if len(msg) > ERROR_CONFIG.MAX_ERROR_LEN:
44+
msg = f"{msg[: ERROR_CONFIG.MAX_ERROR_LEN - 1]}…"
45+
errors.append(msg)
1646

1747

18-
class PushProcessor:
48+
class PushProcessor(ProcessorBase):
1949
"""Push pipeline: validate, prepare, send and track results via Hope API."""
2050

2151
def __init__(self, config: PushWorkflowConfig) -> None:
52+
super().__init__()
2253
self.api = HopeApi(co_slug=config["co_slug"], err=self._err)
2354
self.batch_name: str = config["batch_name"]
2455
self.hope_rdi_id: str | None = None
@@ -29,7 +60,6 @@ def __init__(self, config: PushWorkflowConfig) -> None:
2960
self.program_hope_id: str = config["program_hope_id"]
3061
self.queryset: QuerySet | None = None
3162
self.rdp_id: int | None = config.get("rdp_id")
32-
self.total: dict[str, Any] = {"errors": []}
3363

3464
@cached_property
3565
def serializer(self) -> Serializer:
@@ -85,18 +115,6 @@ def run_with(self, qs: QuerySet, step: Callable) -> None:
85115
with self._using_qs(qs):
86116
step()
87117

88-
def _err(self, msg: str) -> None:
89-
"""Append an error into total['errors']; truncate long text; cap the list with a marker."""
90-
errors: list[str] = self.total["errors"]
91-
if errors and errors[-1] == ERROR_CONFIG.MARKER:
92-
return
93-
if len(errors) >= ERROR_CONFIG.MAX_ERRORS - 1:
94-
errors.append(ERROR_CONFIG.MARKER)
95-
return
96-
if len(msg) > ERROR_CONFIG.MAX_ERROR_LEN:
97-
msg = f"{msg[: ERROR_CONFIG.MAX_ERROR_LEN - 1]}…"
98-
errors.append(msg)
99-
100118
def _prepare_households_batch(self, batch: Iterable[CountryHousehold]) -> tuple[list[int], list[dict]]:
101119
"""Return (ids, payload) for a households batch: roles mapped, members resolved."""
102120
ids, rows = [], []
@@ -207,3 +225,71 @@ def _using_qs(self, qs: QuerySet) -> Iterator[None]:
207225
yield
208226
finally:
209227
self.queryset = prev
228+
229+
230+
class DedupProcessor(ProcessorBase):
231+
"""Dedup pipeline: collect images, create set, upload and start processing."""
232+
233+
def __init__(self, *, rdp_id: int) -> None:
234+
super().__init__()
235+
self.rdp = rdp_for_dedup(pk=rdp_id)
236+
self.program_code = self.rdp.program.code
237+
238+
def run(self) -> None:
239+
"""Execute dedup workflow; collect errors in total."""
240+
if self.rdp.status != Rdp.PushStatus.PENDING:
241+
self._err(f"RDP: can not run dedup in status={self.rdp.status}")
242+
if self.rdp.dedup_run_state == Rdp.DedupRunState.APPROVED:
243+
self._err("RDP: can not run dedup after approval")
244+
if self.has_errors:
245+
return
246+
247+
self.total |= {"rdp_id": self.rdp.pk, "program": self.program_code}
248+
249+
images = self._collect_images()
250+
self.total["images_sent"] = len(images)
251+
252+
if not images:
253+
self.total["deduplication_set_id"] = None
254+
return
255+
256+
ds_id = self._deduplicate(images)
257+
self.total["deduplication_set_id"] = str(ds_id) if ds_id else None
258+
259+
def _collect_images(self) -> list[dict[str, str]]:
260+
"""Collect DedupEngine images payload (reference_pk, filename) from RDP individuals."""
261+
rows = individuals_for_rdp(rdp=self.rdp).values_list("originating_id", "flex_fields__photo")
262+
263+
images: list[dict[str, str]] = []
264+
for pk, photo in rows.iterator(chunk_size=IMAGES_TO_DEDUPLICATE_BULK_BATCH_SIZE * 5):
265+
if isinstance(photo, str) and (photo := photo.strip()):
266+
images.append({"reference_pk": str(pk), "filename": photo})
267+
return images
268+
269+
def _deduplicate(self, images: list[dict[str, str]]) -> UUID | None:
270+
"""Run remote DedupEngine steps; return deduplication_set_id UUID on success."""
271+
with dedup_api(self.program_code, self._err) as api:
272+
if (raw := api.create_deduplication_set(settings={})) is None:
273+
self._err("DedupEngine: create_deduplication_set() failed")
274+
return None
275+
276+
try:
277+
ds_id = UUID(str(raw))
278+
except (TypeError, ValueError) as e:
279+
self._err(f"DedupEngine: create_deduplication_set returned invalid UUID {raw!r}: {e}")
280+
return None
281+
282+
Rdp.objects.filter(pk=self.rdp.pk).update(
283+
deduplication_set_id=ds_id,
284+
dedup_run_state=Rdp.DedupRunState.IN_PROGRESS,
285+
)
286+
287+
if not api.create_images(images):
288+
self._err("DedupEngine: create_images() failed")
289+
return None
290+
291+
if not api.process():
292+
self._err("DedupEngine: process() failed")
293+
return None
294+
295+
return ds_id

0 commit comments

Comments
 (0)