fix(worker): deduplicate delivered dl status#9966
fix(worker): deduplicate delivered dl status#9966djabarovgeorge wants to merge 17 commits intonextfrom
Conversation
…r for accurate status updates
|
Hey there and thank you for opening this pull request! 👋 We require pull request titles to follow specific formatting rules and it looks like your proposed title needs to be adjusted. Your PR title is: Requirements:
Expected format: Details: PR title must end with 'fixes TICKET-ID' (e.g., 'fixes NOV-123') or include ticket ID in branch name |
…ance notification handling
commit: |
…ove trace creation for in-app notifications
…nhance notification handling with prefetched data
…pdate related references for clarity
…gic in WorkflowRunService
…IVERED transition
WalkthroughRegisters WorkflowRunService in analytics providers and propagates a new currentJob context through delivery lifecycle updates. Adds feature flags IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED and IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED to enable transition-based lifecycle handling and trace-based trend charts. Introduces DeliveryLifecycleEventType, adds lastEmittedDeliveryEvent to NotificationEntity, and adds NotificationRepository.tryDeliveryLifecycleTransition. Extends updateDeliveryLifecycle signatures and tracing (new trace types, allow empty TraceStatus), adds prefetched-data support to workflow-run repository methods, and adds trace-based workflow-run trend retrieval. 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
libs/application-generic/src/services/workflow-run.service.ts (1)
450-475:⚠️ Potential issue | 🟡 MinorMissing return statement in catch block causes implicit undefined return.
The
getDeliveryLifecyclemethod logs the error but doesn't return anything in the catch block, causing an implicitundefinedreturn. This should either re-throw the error or return a defined fallback.🐛 Proposed fix
} catch (error) { this.logger.error( { error: error instanceof Error ? error.message : 'Unknown error', notificationId, }, 'Failed to get workflow run delivery lifecycle' ); + + return { + deliveryLifecycleStatus: DeliveryLifecycleStatusEnum.PENDING, + deliveryLifecycleDetail: undefined, + }; }
🤖 Fix all issues with AI agents
In `@libs/dal/src/repositories/notification/notification.repository.ts`:
- Around line 12-29: DELIVERY_LIFECYCLE_ORDER currently gives
DeliveryLifecycleStatusEnum.INTERACTED an order of 3 but TERMINAL_STATUSES also
includes DeliveryLifecycleStatusEnum.INTERACTED, causing terminal-transition
logic to block expected progress; fix by removing
DeliveryLifecycleStatusEnum.INTERACTED from the TERMINAL_STATUSES array (or if
INTERACTED must be terminal, change its value in DELIVERY_LIFECYCLE_ORDER to -1)
so the transition checks that use DELIVERY_LIFECYCLE_ORDER and TERMINAL_STATUSES
behave consistently.
🧹 Nitpick comments (4)
libs/application-generic/src/services/workflow-run.service.ts (2)
50-53: Prefer interface over type forWorkflowForTrace.As per coding guidelines for TypeScript files, interfaces should be preferred over types.
🔧 Suggested change
-export type WorkflowForTrace = { - name: string; - triggers?: Array<{ identifier?: string }>; -}; +export interface WorkflowForTrace { + name: string; + triggers?: Array<{ identifier?: string }>; +}
510-519: Extract duplicateddeliveryLifecycleEventTypeMapto module-level constant.The same map is defined twice: in
createWorkflowRunTraceUpdate(lines 510-519) andcreateDeliveryLifecycleTrace(lines 684-693). Consider extracting to a module-level constant to avoid duplication.♻️ Suggested refactor
Add near line 27 (after imports):
const DELIVERY_LIFECYCLE_EVENT_TYPE_MAP: Record<DeliveryLifecycleStatusEnum, EventType> = { [DeliveryLifecycleStatusEnum.PENDING]: 'workflow_run_delivery_pending', [DeliveryLifecycleStatusEnum.SENT]: 'workflow_run_delivery_sent', [DeliveryLifecycleStatusEnum.ERRORED]: 'workflow_run_delivery_errored', [DeliveryLifecycleStatusEnum.SKIPPED]: 'workflow_run_delivery_skipped', [DeliveryLifecycleStatusEnum.CANCELED]: 'workflow_run_delivery_canceled', [DeliveryLifecycleStatusEnum.MERGED]: 'workflow_run_delivery_merged', [DeliveryLifecycleStatusEnum.DELIVERED]: 'workflow_run_delivery_delivered', [DeliveryLifecycleStatusEnum.INTERACTED]: 'workflow_run_delivery_interacted', };Then use
DELIVERY_LIFECYCLE_EVENT_TYPE_MAPin both methods.Also applies to: 684-693
libs/dal/src/repositories/notification/notification.schema.ts (1)
73-76: Consider adding a compound index fortryDeliveryLifecycleTransitionqueries if it becomes a performance bottleneck.The
tryDeliveryLifecycleTransitionmethod queries by_id,_organizationId,_environmentId, andlastEmittedDeliveryLifecycle. Existing indexes focus on_environmentIdwithcreatedAtand other fields, so they don't align with this query pattern. However, since this method is the only consumer of this field and the$orcondition includes operators like$existsand$in(which limit index effectiveness), a dedicated compound index may not provide significant gains. Monitor query performance if call volume increases.libs/application-generic/src/services/analytic-logs/workflow-run/workflow-run.repository.ts (1)
282-288: Unsafe type cast may hide type mismatches.The cast
workflow as NotificationTemplateEntityis applied to a value that may only containPick<NotificationTemplateEntity, 'name' | 'triggers'>. WhilemapNotificationToWorkflowRuncurrently only usesnameandtriggers(viagetTriggerIdentifier), this cast obscures this contract and could lead to runtime errors if the method is extended to use other fields.Consider narrowing the parameter type of
mapNotificationToWorkflowRunto only require the fields it actually uses:♻️ Suggested approach
private mapNotificationToWorkflowRun( notification: QueryNotificationEntity, - workflow: NotificationTemplateEntity, + workflow: Pick<NotificationTemplateEntity, 'name' | 'triggers'>, options: IWorkflowRunOptions ): WorkflowRunInsertData {And update
getTriggerIdentifier:- private getTriggerIdentifier(template: NotificationTemplateEntity): string { + private getTriggerIdentifier(template: Pick<NotificationTemplateEntity, 'name' | 'triggers'>): string {This removes the need for the unsafe cast.
libs/dal/src/repositories/notification/notification.repository.ts
Outdated
Show resolved
Hide resolved
…ivery lifecycle events
…yLifecycleEventType and streamline notification transitions
…ure flag support for optimized transitions
LaunchDarkly flag references🔍 2 flags added or modified
|
…enhance chart components for better visualization
…on logic to prevent duplicate status traces
…nWorkflow flag for improved status management
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts (1)
392-404:⚠️ Potential issue | 🟠 MajorAvoid emitting COMPLETED when an error occurred and
shouldQueueNextJobis false.With the new ERROR mapping in
SetJobAsFailed, this branch can emit a conflicting COMPLETED status before the worker records ERROR.🛠️ Suggested fix
- } else if (!isJobExtendedToSubscriberSchedule) { - // Update workflow run status based on step runs when halting on step failure - await this.workflowRunService.updateDeliveryLifecycle({ - workflowStatus: WorkflowRunStatusEnum.COMPLETED, - notificationId: job._notificationId, - environmentId: job._environmentId, - organizationId: job._organizationId, - _subscriberId: job._subscriberId, - notification, - currentJob: { type: job.type, _id: job._id }, - }); + } else if (!isJobExtendedToSubscriberSchedule) { + if (!error) { + // Update workflow run status based on step runs when halting on step failure + await this.workflowRunService.updateDeliveryLifecycle({ + workflowStatus: WorkflowRunStatusEnum.COMPLETED, + notificationId: job._notificationId, + environmentId: job._environmentId, + organizationId: job._organizationId, + _subscriberId: job._subscriberId, + notification, + currentJob: { type: job.type, _id: job._id }, + }); + } // Remove the attachments if the job should not be queued await this.storageHelperService.deleteAttachments(job.payload?.attachments); }
🧹 Nitpick comments (1)
apps/worker/src/app/workflow/services/standard.worker.ts (1)
192-214: Rename the local flag to matchisLastJobFailedfor clarity.This avoids the old name lingering in a new semantics path.
♻️ Suggested tweak
- let isLastJobInWorkflow = false; + let isLastJobFailed = false; @@ - isLastJobInWorkflow = !hasNextJob || shouldHaltOnFailure; + isLastJobFailed = !hasNextJob || shouldHaltOnFailure; @@ - SetJobAsFailedCommand.create({ ...minimalData, isLastJobFailed: isLastJobInWorkflow }), + SetJobAsFailedCommand.create({ ...minimalData, isLastJobFailed }),
…ling to reflect completed state when last job fails
…unRepository for improved error logging
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@libs/application-generic/src/services/analytic-logs/step-run/step-run.repository.ts`:
- Around line 71-79: The code currently stringifies job._userId to produce
userIdString which yields "undefined"/"null" when absent and then passes a user
field to featureFlagsService.getFlag, skewing user-targeted bucketing; update
the call in getFlag (FeatureFlagsKeysEnum.IS_STEP_RUN_LOGS_WRITE_ENABLED) so
that you only include the user property when job._userId is present — compute
organizationIdString and environmentIdString as before, derive userId only if
job._userId != null (or typeof !== "undefined"), and build the payload so user:
{ _id: userIdString } is omitted entirely when there is no userId rather than
passing a stringified "undefined"/"null".
| const organizationIdString = job._organizationId?.toString() || String(job._organizationId); | ||
| const environmentIdString = job._environmentId?.toString() || String(job._environmentId); | ||
| const userIdString = job._userId?.toString() || String(job._userId); | ||
|
|
||
| const isEnabled = await this.featureFlagsService.getFlag({ | ||
| key: FeatureFlagsKeysEnum.IS_STEP_RUN_LOGS_WRITE_ENABLED, | ||
| organization: { _id: job._organizationId }, | ||
| environment: { _id: job._environmentId }, | ||
| user: { _id: job._userId }, | ||
| organization: { _id: organizationIdString }, | ||
| environment: { _id: environmentIdString }, | ||
| user: { _id: userIdString }, |
There was a problem hiding this comment.
Avoid passing "undefined"/"null" user IDs to the flag service.
If job._userId is absent, current stringification yields "undefined"/"null", which can skew bucketing for user-targeted flags. Prefer omitting the user field when there is no user ID.
Suggested fix
- const organizationIdString = job._organizationId?.toString() || String(job._organizationId);
- const environmentIdString = job._environmentId?.toString() || String(job._environmentId);
- const userIdString = job._userId?.toString() || String(job._userId);
+ const organizationIdString = String(job._organizationId);
+ const environmentIdString = String(job._environmentId);
+ const userIdString = job._userId ? String(job._userId) : undefined;
const isEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_STEP_RUN_LOGS_WRITE_ENABLED,
organization: { _id: organizationIdString },
environment: { _id: environmentIdString },
- user: { _id: userIdString },
+ ...(userIdString ? { user: { _id: userIdString } } : {}),
defaultValue: false,
});🤖 Prompt for AI Agents
In
`@libs/application-generic/src/services/analytic-logs/step-run/step-run.repository.ts`
around lines 71 - 79, The code currently stringifies job._userId to produce
userIdString which yields "undefined"/"null" when absent and then passes a user
field to featureFlagsService.getFlag, skewing user-targeted bucketing; update
the call in getFlag (FeatureFlagsKeysEnum.IS_STEP_RUN_LOGS_WRITE_ENABLED) so
that you only include the user property when job._userId is present — compute
organizationIdString and environmentIdString as before, derive userId only if
job._userId != null (or typeof !== "undefined"), and build the payload so user:
{ _id: userIdString } is omitted entirely when there is no userId rather than
passing a stringified "undefined"/"null".
…r for accurate status updates
What changed? Why was the change needed?
Screenshots
Expand for optional sections
Related enterprise PR
Special notes for your reviewer