11from collections .abc import Callable , Iterator
22from functools import partial
33from typing import Any
4- from requests .exceptions import RequestException
5- from uuid import UUID
64from django .db import transaction , IntegrityError
75
86from country_workspace .contrib .hope .exceptions import HopePushError
9- from country_workspace .contrib .hope .constants import IMAGES_TO_DEDUPLICATE_BULK_BATCH_SIZE
107from country_workspace .models import AsyncJob , Rdp
118from country_workspace .workspaces .models import CountryIndividual
12- from country_workspace .contrib .dedup_engine .client import Status as DedupClientStatus , make_client
139from country_workspace .contrib .dedup_engine .response import Status as DedupResponseStatus
1410
1511
16- from .processor import PushProcessor
12+ from .processor import HopeProcessor , DedupProcessor
1713from .config import CreateRdpConfig , PushWorkflowConfig
1814from .repository import (
1915 individuals_by_pks ,
20- individuals_for_rdp ,
2116 individuals_by_household_pks ,
2217 households ,
2318 rdp_for_push ,
24- rdp_for_dedup ,
2519 preflight_errors ,
2620 workflow_config_for_rdp ,
2721)
22+ from .transport import dedup_api
2823
2924
3025def create_rdp_records (config : CreateRdpConfig , job_id : int ) -> Rdp :
@@ -64,7 +59,7 @@ def mark_rdp_beneficiaries_removed(rdp: Rdp, is_master_detail: bool) -> None:
6459 rdp .individuals .update (removed = True )
6560
6661
67- def steps (processor : PushProcessor , config : PushWorkflowConfig ) -> Iterator [Callable [[], None ]]:
62+ def steps (processor : HopeProcessor , config : PushWorkflowConfig ) -> Iterator [Callable [[], None ]]:
6863 """Yield the ordered workflow callables; each step appends errors to processor.total."""
6964 pks = config ["pks" ]
7065
@@ -100,99 +95,13 @@ def create_rdp_core(job: AsyncJob) -> dict[str, Any]:
10095 return {"rdp_id" : rdp .id , "rdp_str" : str (rdp )}
10196
10297
103- def dedup_engine_status_or_error (program_code : str ) -> DedupClientStatus :
104- """Fetch DedupEngine status or raise HopePushError."""
105- try :
106- with make_client (program_code ) as client :
107- return client .status ()
108- except (RequestException , ValueError ) as e :
109- raise HopePushError ({"errors" : [f"DedupEngine: status() failed: { e } " ]}) from e
110-
111-
112- def dedup_engine_approve_or_error (program_code : str ) -> None :
113- """Approve DedupEngine results or raise HopePushError."""
114- try :
115- with make_client (program_code ) as client :
116- client .approve ()
117- except (RequestException , ValueError ) as e :
118- raise HopePushError ({"errors" : [f"DedupEngine: approve() failed: { e } " ]}) from e
119-
120-
121- def _parse_uuid (value : object ) -> UUID | None :
122- """Return UUID parsed from value or None if parsing fails."""
123- if value is None :
124- return None
125- try :
126- return UUID (str (value ))
127- except (TypeError , ValueError ):
128- return None
129-
130-
131- def _collect_dedup_images (* , rdp : Rdp ) -> tuple [list [dict [str , str ]], int ]:
132- """Collect DedupEngine images payload (reference_pk, filename) from RDP individuals."""
133- rows = individuals_for_rdp (rdp = rdp ).values_list ("pk" , "flex_fields__photo" )
134-
135- images : list [dict [str , str ]] = []
136- skipped = 0
137- for pk , photo in rows .iterator (chunk_size = IMAGES_TO_DEDUPLICATE_BULK_BATCH_SIZE * 5 ):
138- if isinstance (photo , str ) and (photo := photo .strip ()):
139- images .append ({"reference_pk" : str (pk ), "filename" : photo })
140- else :
141- skipped += 1
142- return images , skipped
143-
144-
145- def _dedup_engine_create_set_or_error (* , program_code : str ) -> UUID :
146- """Create a DedupEngine deduplication set and return its UUID."""
147- try :
148- with make_client (program_code ) as client :
149- ds_id = client .create_deduplication_set (settings = {})
150- except (RequestException , ValueError , KeyError , TypeError ) as e :
151- raise HopePushError ({"errors" : [f"DedupEngine: create_deduplication_set() failed: { e } " ]}) from e
152-
153- if (dedup_uuid := _parse_uuid (ds_id )) is None :
154- raise HopePushError ({"errors" : [f"DedupEngine: invalid deduplication_set_id={ ds_id !r} " ]})
155- return dedup_uuid
156-
157-
158- def _dedup_engine_upload_and_process_or_error (* , program_code : str , images : list [dict [str , str ]]) -> None :
159- """Upload images to DedupEngine and start processing."""
160- try :
161- with make_client (program_code ) as client :
162- client .create_images (images )
163- client .process ()
164- except (RequestException , ValueError , KeyError , TypeError ) as e :
165- raise HopePushError ({"errors" : [f"DedupEngine: create_images()/process() failed: { e } " ]}) from e
166-
167-
16898def dedup_existing_rdp_core (job : AsyncJob ) -> dict [str , Any ]:
16999 """Run DedupEngine deduplication for an existing RDP."""
170- rdp = rdp_for_dedup (pk = job .config ["rdp_id" ])
171- if rdp .status != Rdp .PushStatus .PENDING :
172- raise HopePushError ({"errors" : [f"RDP: can not run dedup in status={ rdp .status } " ]})
173- if rdp .dedup_run_state == rdp .DedupRunState .APPROVED :
174- raise HopePushError ({"errors" : ["RDP: can not run dedup after approval" ]})
175-
176- images , skipped = _collect_dedup_images (rdp = rdp )
177- total = {
178- "rdp_id" : rdp .pk ,
179- "program" : rdp .program .code ,
180- "skipped_no_photo" : skipped ,
181- "images_sent" : len (images ),
182- }
183- if not images :
184- return total | {"deduplication_set_id" : None }
185-
186- deduplication_set_id = _dedup_engine_create_set_or_error (program_code = rdp .program .code )
187-
188- Rdp .objects .filter (pk = rdp .pk ).update (
189- deduplication_set_id = deduplication_set_id ,
190- dedup_run_state = rdp .DedupRunState .IN_PROGRESS ,
191- )
192-
193- _dedup_engine_upload_and_process_or_error (program_code = rdp .program .code , images = images )
194-
195- return total | {"deduplication_set_id" : str (deduplication_set_id )}
100+ processor = DedupProcessor (rdp_id = job .config ["rdp_id" ])
101+ processor .run ()
102+ if processor .has_errors :
103+ raise HopePushError (processor .total )
104+ return processor .total
196105
197106
198107def push_existing_rdp_core (job : AsyncJob ) -> dict [str , Any ]:
@@ -203,25 +112,24 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
203112 imported_by_email = getattr (job .owner , "email" , "" ) or getattr (rdp .pushed_by , "email" , "" )
204113
205114 config : PushWorkflowConfig = workflow_config_for_rdp (rdp = rdp , imported_by_email = imported_by_email )
206- processor = PushProcessor (config )
207- for step in steps (processor , config ):
115+ hope_processor = HopeProcessor (config )
116+ for step in steps (hope_processor , config ):
208117 step ()
209- if processor .total ["errors" ]:
210- complete_rdp (rdp .id , Rdp .PushStatus .FAILURE , processor .hope_rdi_id or "N/A" )
211- raise HopePushError (processor .total )
212-
213- program_code = rdp .program .code
214- if rdp .dedup_run_state == Rdp .DedupRunState .IN_PROGRESS :
215- try :
216- if dedup_engine_status_or_error (program_code ).status == DedupResponseStatus .SUCCESS :
217- dedup_engine_approve_or_error (program_code )
118+ if hope_processor .has_errors :
119+ complete_rdp (rdp .id , Rdp .PushStatus .FAILURE , hope_processor .hope_rdi_id or "N/A" )
120+ raise HopePushError (hope_processor .total )
121+
122+ if rdp .program .biometric_deduplication_enabled :
123+ with dedup_api (rdp .program .code , hope_processor ._err ) as de :
124+ if (de_status := de .status ()) is None :
125+ raise HopePushError (hope_processor .total )
126+ if de_status .status == DedupResponseStatus .SUCCESS :
127+ if not de .approve ():
128+ raise HopePushError (hope_processor .total )
218129 Rdp .objects .filter (pk = rdp .pk ).update (dedup_run_state = Rdp .DedupRunState .APPROVED )
219- except HopePushError :
220- complete_rdp (rdp .id , Rdp .PushStatus .FAILURE , processor .hope_rdi_id or "N/A" )
221- raise
222130
223131 with transaction .atomic ():
224- updated = complete_rdp (rdp .id , Rdp .PushStatus .SUCCESS , processor .hope_rdi_id )
132+ updated = complete_rdp (rdp .id , Rdp .PushStatus .SUCCESS , hope_processor .hope_rdi_id )
225133 mark_rdp_beneficiaries_removed (updated , config ["master_detail" ])
226134
227- return processor .total
135+ return hope_processor .total
0 commit comments