Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3ae999d
feat(worker): add currentJob details to job processing and status upd…
djabarovgeorge Feb 3, 2026
90b8847
feat(workflow): enhance trace creation logic with currentJob paramete…
djabarovgeorge Feb 3, 2026
1c89aab
feat(workflow): implement delivery lifecycle transition logic and enh…
djabarovgeorge Feb 3, 2026
7d9d223
feat(workflow): simplify delivery lifecycle transition logic and impr…
djabarovgeorge Feb 4, 2026
bd3ee8a
feat(workflow): integrate WorkflowRunService into shared module and e…
djabarovgeorge Feb 4, 2026
5e1a285
refactor(workflow): rename TemplateForTrace to WorkflowForTrace and u…
djabarovgeorge Feb 4, 2026
96020b0
refactor(workflow): streamline notification and workflow retrieval lo…
djabarovgeorge Feb 4, 2026
378ef54
feat(workflow): add synthetic SENT status for in-app channel upon DEL…
djabarovgeorge Feb 4, 2026
cd98e76
refactor(delivery): remove delivery lifecycle constants and integrate…
djabarovgeorge Feb 4, 2026
37cc8b2
feat(workflow): introduce DeliveryLifecycleEventType for managing del…
djabarovgeorge Feb 4, 2026
cf59aaa
refactor(workflow): update delivery lifecycle handling to use Deliver…
djabarovgeorge Feb 4, 2026
65563c8
refactor(workflow): enhance delivery lifecycle update logic with feat…
djabarovgeorge Feb 4, 2026
b6f7738
feat(workflow): implement trend chart data retrieval from traces and …
djabarovgeorge Feb 5, 2026
787ebd2
refactor(workflow): update job failure handling and workflow completi…
djabarovgeorge Feb 5, 2026
6003801
refactor(workflow): update job failure handling to include isLastJobI…
djabarovgeorge Feb 5, 2026
7b529d4
fix(workflow): correct workflow status assignment in job failure hand…
djabarovgeorge Feb 5, 2026
a82b8d3
refactor(workflow): enhance job failure handling by integrating StepR…
djabarovgeorge Feb 5, 2026
04b349e
refactor(analytic-logs): simplify organization, environment, and user…
djabarovgeorge Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { Injectable } from '@nestjs/common';
import { InstrumentUsecase, PinoLogger, WorkflowRunRepository } from '@novu/application-generic';
import {
FeatureFlagsService,
InstrumentUsecase,
PinoLogger,
TraceLogRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import { FeatureFlagsKeysEnum } from '@novu/shared';
import { WorkflowRunsTrendDataPointDto } from '../../dtos/get-charts.response.dto';
import { BuildWorkflowRunsTrendChartCommand } from './build-workflow-runs-trend-chart.command';

@Injectable()
export class BuildWorkflowRunsTrendChart {
constructor(
private workflowRunRepository: WorkflowRunRepository,
private traceLogRepository: TraceLogRepository,
private featureFlagsService: FeatureFlagsService,
private logger: PinoLogger
) {
this.logger.setContext(BuildWorkflowRunsTrendChart.name);
Expand All @@ -16,6 +25,81 @@ export class BuildWorkflowRunsTrendChart {
async execute(command: BuildWorkflowRunsTrendChartCommand): Promise<WorkflowRunsTrendDataPointDto[]> {
const { environmentId, organizationId, startDate, endDate, workflowIds } = command;

const isTraceBasedEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED,
defaultValue: false,
organization: { _id: organizationId },
environment: { _id: environmentId },
});

if (isTraceBasedEnabled) {
return this.buildChartFromTraces(startDate, endDate, environmentId, organizationId, workflowIds);
}

return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds);
}

private async buildChartFromTraces(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string,
workflowIds?: string[]
): Promise<WorkflowRunsTrendDataPointDto[]> {
const workflowRuns = await this.traceLogRepository.getWorkflowRunsTrendData(
environmentId,
organizationId,
startDate,
endDate,
workflowIds
);

const dataByDate = new Map<string, WorkflowRunsTrendDataPointDto>();

const currentDate = new Date(startDate);
while (currentDate <= endDate) {
const dateKey = currentDate.toISOString().split('T')[0];
dataByDate.set(dateKey, {
timestamp: dateKey,
processing: 0,
completed: 0,
error: 0,
});
currentDate.setDate(currentDate.getDate() + 1);
}

for (const workflowRun of workflowRuns) {
const existingDataPoint = dataByDate.get(workflowRun.date);
if (existingDataPoint) {
const count = parseInt(workflowRun.count, 10);
const updatedDataPoint = { ...existingDataPoint };

switch (workflowRun.event_type) {
case 'workflow_run_status_processing':
updatedDataPoint.processing += count;
break;
case 'workflow_run_status_completed':
updatedDataPoint.completed += count;
break;
case 'workflow_run_status_error':
updatedDataPoint.error += count;
break;
}

dataByDate.set(workflowRun.date, updatedDataPoint);
}
}

return Array.from(dataByDate.values());
}

private async buildChartFromWorkflowRuns(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string,
workflowIds?: string[]
): Promise<WorkflowRunsTrendDataPointDto[]> {
const workflowRuns = await this.workflowRunRepository.getWorkflowRunsTrendData(
environmentId,
organizationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export type InteractionChartData = {

export type WorkflowRunsChartData = {
date: string;
processing: number;
processing?: number;
completed: number;
error: number;
timestamp: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { FeatureFlagsKeysEnum } from '@novu/shared';
import { useCallback, useMemo } from 'react';
import { Line, LineChart, XAxis } from 'recharts';
import { type WorkflowRunsTrendDataPoint } from '../../../api/activity';
import { useFeatureFlag } from '../../../hooks/use-feature-flag';

import { ChartConfig, ChartContainer, ChartTooltip, NovuTooltip } from '../../primitives/chart';
import { Skeleton } from '../../primitives/skeleton';
Expand All @@ -10,7 +12,7 @@ import { generateDummyWorkflowRunsData } from './chart-dummy-data';
import { type WorkflowRunsChartData } from './chart-types';
import { ChartWrapper } from './chart-wrapper';

const chartConfig = {
const legacyChartConfig = {
success: {
label: 'Success',
color: '#34d399',
Expand All @@ -25,6 +27,17 @@ const chartConfig = {
},
} satisfies ChartConfig;

const finalStatusChartConfig = {
success: {
label: 'Success',
color: '#34d399',
},
error: {
label: 'Error',
color: '#ef4444',
},
} satisfies ChartConfig;

function WorkflowRunsTrendChartSkeleton() {
return (
<div className="h-[160px] w-full relative px-4">
Expand Down Expand Up @@ -62,7 +75,7 @@ type WorkflowRunsTrendChartProps = {
error?: Error | null;
};

export function WorkflowRunsTrendChart({ data, isLoading, error }: WorkflowRunsTrendChartProps) {
function LegacyWorkflowRunsTrendChart({ data, isLoading, error }: WorkflowRunsTrendChartProps) {
const chartData = useMemo(() => {
return data?.map((dataPoint) => ({
date: new Date(dataPoint.timestamp).toLocaleDateString('en-US', {
Expand All @@ -77,16 +90,17 @@ export function WorkflowRunsTrendChart({ data, isLoading, error }: WorkflowRunsT
}, [data]);

const hasDataChecker = useCallback(
createDateBasedHasDataChecker<WorkflowRunsChartData>((dataPoint: WorkflowRunsChartData) => {
return (dataPoint.completed || 0) > 0 || (dataPoint.processing || 0) > 0 || (dataPoint.error || 0) > 0;
}),
createDateBasedHasDataChecker<WorkflowRunsChartData>(
(dataPoint: WorkflowRunsChartData) =>
(dataPoint.completed || 0) > 0 || (dataPoint.processing || 0) > 0 || (dataPoint.error || 0) > 0
),
[]
);

const renderChart = useCallback((data: WorkflowRunsChartData[], includeTooltip = true) => {
const renderChart = useCallback((chartDataToRender: WorkflowRunsChartData[], includeTooltip = true) => {
return (
<ChartContainer config={chartConfig} className="h-[160px] w-full">
<LineChart accessibilityLayer data={data}>
<ChartContainer config={legacyChartConfig} className="h-[160px] w-full">
<LineChart accessibilityLayer data={chartDataToRender}>
<XAxis
dataKey="date"
axisLine={{ stroke: '#e5e7eb', strokeDasharray: '3 3', strokeWidth: 1 }}
Expand All @@ -109,9 +123,75 @@ export function WorkflowRunsTrendChart({ data, isLoading, error }: WorkflowRunsT
}, []);

const renderEmptyState = useCallback(
(dummyData: WorkflowRunsChartData[]) => {
return renderChart(dummyData, false);
},
(dummyData: WorkflowRunsChartData[]) => renderChart(dummyData, false),
[renderChart]
);

return (
<ChartWrapper
title="Workflow runs"
data={chartData}
isLoading={isLoading}
error={error}
hasDataChecker={hasDataChecker}
loadingSkeleton={<WorkflowRunsTrendChartSkeleton />}
dummyDataGenerator={generateDummyWorkflowRunsData}
emptyStateRenderer={renderEmptyState}
infoTooltip={ANALYTICS_TOOLTIPS.WORKFLOW_RUNS_TREND}
emptyStateTitle="Not enough data to show"
emptyStateTooltip={ANALYTICS_TOOLTIPS.INSUFFICIENT_DATE_RANGE}
>
{renderChart}
</ChartWrapper>
);
}

function FinalStatusWorkflowRunsTrendChart({ data, isLoading, error }: WorkflowRunsTrendChartProps) {
const chartData = useMemo(() => {
return data?.map((dataPoint) => ({
date: new Date(dataPoint.timestamp).toLocaleDateString('en-US', {
month: 'short',
day: 'numeric',
}),
completed: dataPoint.completed,
error: dataPoint.error,
timestamp: dataPoint.timestamp,
}));
}, [data]);

const hasDataChecker = useCallback(
createDateBasedHasDataChecker<WorkflowRunsChartData>(
(dataPoint: WorkflowRunsChartData) => (dataPoint.completed || 0) > 0 || (dataPoint.error || 0) > 0
),
[]
);

const renderChart = useCallback((chartDataToRender: WorkflowRunsChartData[], includeTooltip = true) => {
return (
<ChartContainer config={finalStatusChartConfig} className="h-[160px] w-full">
<LineChart accessibilityLayer data={chartDataToRender}>
<XAxis
dataKey="date"
axisLine={{ stroke: '#e5e7eb', strokeDasharray: '3 3', strokeWidth: 1 }}
tickLine={false}
tick={{ fontSize: 10, fill: '#99a0ae', textAnchor: 'middle' }}
tickFormatter={(value, index) => {
if (index % 2 === 0) return value;

return '';
}}
domain={['dataMin', 'dataMax']}
/>
{includeTooltip && <ChartTooltip cursor={false} content={<NovuTooltip showTotal={false} />} />}
<Line dataKey="completed" name="Completed" stroke="#34d399" strokeWidth={2} dot={false} type="monotone" />
<Line dataKey="error" name="Error" stroke="#ef4444" strokeWidth={2} dot={false} type="monotone" />
</LineChart>
</ChartContainer>
);
}, []);

const renderEmptyState = useCallback(
(dummyData: WorkflowRunsChartData[]) => renderChart(dummyData, false),
[renderChart]
);

Expand All @@ -133,3 +213,13 @@ export function WorkflowRunsTrendChart({ data, isLoading, error }: WorkflowRunsT
</ChartWrapper>
);
}

export function WorkflowRunsTrendChart(props: WorkflowRunsTrendChartProps) {
const isFinalStatusOnly = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED);

if (isFinalStatusOnly) {
return <FinalStatusWorkflowRunsTrendChart {...props} />;
}

return <LegacyWorkflowRunsTrendChart {...props} />;
}
2 changes: 2 additions & 0 deletions apps/worker/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
UpdateSubscriberChannel,
UpdateTenant,
WorkflowRunRepository,
WorkflowRunService,
} from '@novu/application-generic';
import {
ControlValuesRepository,
Expand Down Expand Up @@ -94,6 +95,7 @@ const ANALYTICS_PROVIDERS = [

// Services
clickHouseService,
WorkflowRunService,
];

const PROVIDERS = [
Expand Down
5 changes: 4 additions & 1 deletion apps/worker/src/app/workflow/services/standard.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ export class StandardWorker extends StandardWorkerService {
isLastJobInWorkflow = !hasNextJob || shouldHaltOnFailure;
}

await this.setJobAsFailed.execute(SetJobAsFailedCommand.create({ ...minimalData, isLastJobInWorkflow }), error);
await this.setJobAsFailed.execute(
SetJobAsFailedCommand.create({ ...minimalData, isLastJobFailed: isLastJobInWorkflow }),
error
);
}

if (shouldHandleLastFailedJob) {
Expand Down
Loading
Loading