Skip to content

Commit d727232

Browse files
committed
feat(rpc): automatically garbage collect observables + new event system + stats collection
All observables are not tracked via WeakRef and FinalizationRegistry, and frees them up on the server automatically when the client garbage collected them. Added new event system to track and control what the rpc kernel and connections do. Added statistics object RpcStats which collects statistics about the kernel, traffic, connections, actions, and observables.
1 parent 798dfb7 commit d727232

File tree

30 files changed

+1170
-612
lines changed

30 files changed

+1170
-612
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
"license": "MIT",
66
"private": true,
77
"scripts": {
8-
"test": "node --max_old_space_size=3048 node_modules/jest/bin/jest.js --forceExit --no-cache",
9-
"test:coverage": "node --max_old_space_size=3048 node_modules/jest/bin/jest.js --coverage --forceExit --no-cache",
8+
"test": "node --expose-gc --max_old_space_size=3048 node_modules/jest/bin/jest.js --forceExit --no-cache",
9+
"test:coverage": "node --expose-gc --max_old_space_size=3048 node_modules/jest/bin/jest.js --coverage --forceExit --no-cache",
1010
"build": "tsc --build tsconfig.json && tsc --build tsconfig.esm.json && lerna run build",
1111
"build:esm": "tsc --build tsconfig.esm.json",
1212
"tsc": "tsc --build",

packages/app/src/app.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import { AppModule, RootModuleDefinition } from './module.js';
1515
import { EnvConfiguration } from './configuration.js';
1616
import {
1717
DataEventToken,
18+
DispatchArguments,
1819
EventDispatcher,
20+
EventDispatcherDispatchType,
1921
EventListener,
2022
EventListenerCallback,
21-
EventOfEventToken,
2223
EventToken,
2324
} from '@deepkit/event';
2425
import { ReceiveType, ReflectionClass, ReflectionKind } from '@deepkit/type';
@@ -292,14 +293,14 @@ export class App<T extends RootModuleDefinition> {
292293
*
293294
* order: The lower the order, the sooner the listener is called. Default is 0.
294295
*/
295-
listen<T extends EventToken<any>, DEPS extends any[]>(eventToken: T, callback: EventListenerCallback<T['event']>, order: number = 0): this {
296-
const listener: EventListener<any> = { callback, order, eventToken };
296+
listen<T extends EventToken<any>>(eventToken: T, callback: EventListenerCallback<T['event']>, order: number = 0): this {
297+
const listener: EventListener = { callback, order, eventToken };
297298
this.appModule.listeners.push(listener);
298299
return this;
299300
}
300301

301-
public async dispatch<T extends EventToken<any>>(eventToken: T, event?: EventOfEventToken<T>, injector?: InjectorContext): Promise<void> {
302-
return await this.get(EventDispatcher).dispatch(eventToken, event, injector);
302+
dispatch<T extends EventToken<any>>(eventToken: T, ...args: DispatchArguments<T>): EventDispatcherDispatchType<T> {
303+
return this.get(EventDispatcher).dispatch(eventToken, ...args);
303304
}
304305

305306
/**

packages/app/src/module.ts

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,7 @@ import { InjectorModule, InjectorModuleConfig, NormalizedProvider, ProviderWithS
1212
import { AbstractClassType, ClassType, CustomError, ExtractClassType, isClass } from '@deepkit/core';
1313
import { EventListener, EventToken } from '@deepkit/event';
1414
import { WorkflowDefinition } from '@deepkit/workflow';
15-
import {
16-
getPartialSerializeFunction,
17-
reflect,
18-
ReflectionFunction,
19-
ReflectionMethod,
20-
serializer,
21-
Type,
22-
TypeClass,
23-
} from '@deepkit/type';
15+
import { getPartialSerializeFunction, reflect, ReflectionFunction, ReflectionMethod, serializer, Type, TypeClass } from '@deepkit/type';
2416
import { ControllerConfig } from './service-container.js';
2517

2618
export type DefaultObject<T> = T extends undefined ? {} : T;
@@ -137,7 +129,7 @@ export interface ModuleDefinition {
137129
* }
138130
* ```
139131
*/
140-
listeners?: (EventListener<any> | ClassType)[];
132+
listeners?: (EventListener | ClassType)[];
141133

142134
/**
143135
* HTTP middlewares.
@@ -253,7 +245,7 @@ export function createModule<T extends CreateModuleDefinition>(options: T): AppM
253245
return new (createModuleClass(options))();
254246
}
255247

256-
export type ListenerType = EventListener<any> | ClassType;
248+
export type ListenerType = EventListener | ClassType;
257249

258250
/**
259251
* The AppModule is the base class for all modules.
@@ -409,7 +401,7 @@ export class AppModule<C extends InjectorModuleConfig = any> extends InjectorMod
409401
return this;
410402
}
411403

412-
addListener(...listener: (EventListener<any> | ClassType)[]): this {
404+
addListener(...listener: (EventListener | ClassType)[]): this {
413405
this.assertInjectorNotBuilt();
414406

415407
for (const l of listener) {

packages/broker/src/kernel.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
RpcMessage,
1818
RpcMessageBuilder,
1919
RpcMessageRouteType,
20+
RpcStats,
2021
TransportConnection,
2122
} from '@deepkit/rpc';
2223
import { Logger } from '@deepkit/logger';
@@ -52,6 +53,8 @@ import cluster from 'cluster';
5253
import { closeSync, openSync, renameSync, writeSync } from 'fs';
5354
import { snapshotState } from './snapshot.js';
5455
import { handleMessageDeduplication } from './utils.js';
56+
import { InjectorContext } from '@deepkit/injector';
57+
import { EventDispatcher } from '@deepkit/event';
5558

5659
export interface Queue {
5760
currentId: number;
@@ -66,12 +69,15 @@ export class BrokerConnection extends RpcKernelBaseConnection {
6669
protected locks = new Map<number, ProcessLock>();
6770

6871
constructor(
72+
stats: RpcStats,
6973
logger: Logger,
7074
transportConnection: TransportConnection,
71-
protected connections: RpcKernelConnections,
75+
connections: RpcKernelConnections,
76+
injector: InjectorContext,
77+
eventDispatcher: EventDispatcher,
7278
protected state: BrokerState,
7379
) {
74-
super(logger, transportConnection, connections);
80+
super(stats, logger, transportConnection, connections, injector, eventDispatcher);
7581
}
7682

7783
public close(): void {
@@ -569,6 +575,6 @@ export class BrokerKernel extends RpcKernel {
569575
protected state: BrokerState = new BrokerState;
570576

571577
createConnection(transport: TransportConnection): BrokerConnection {
572-
return new BrokerConnection(this.logger, transport, this.connections, this.state);
578+
return new BrokerConnection(this.stats, this.logger, transport, this.connections, this.injector, this.getEventDispatcher(), this.state);
573579
}
574580
}

packages/core/src/core.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,13 @@ export function mergeStack(error: Error, stack: string) {
604604
}
605605
}
606606

607+
/**
608+
* Makes sure the given value is an error. If it's not an error, it creates a new error with the given value as message.
609+
*/
610+
export function ensureError(error?: any, classType: ClassType = Error): Error {
611+
return error instanceof Error || error instanceof AggregateError ? error : new classType(error);
612+
}
613+
607614
export function collectForMicrotask<T>(callback: (args: T[]) => void): (arg: T) => void {
608615
let items: T[] = [];
609616
let taskScheduled = false;

packages/framework/src/module.ts

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,7 @@ import { DebugDIController } from './cli/debug-di.js';
1717
import { ServerStartController } from './cli/server-start.js';
1818
import { DebugController } from './debug/debug.controller.js';
1919
import { registerDebugHttpController } from './debug/http-debug.controller.js';
20-
import {
21-
http,
22-
HttpLogger,
23-
HttpModule,
24-
HttpRegExp,
25-
HttpRequest,
26-
HttpResponse,
27-
serveStaticListener,
28-
} from '@deepkit/http';
20+
import { http, HttpLogger, HttpModule, HttpRegExp, HttpRequest, HttpResponse, serveStaticListener } from '@deepkit/http';
2921
import { InjectorContext, ProviderWithScope, Token } from '@deepkit/injector';
3022
import { BrokerConfig, FrameworkConfig } from './module.config.js';
3123
import { Logger } from '@deepkit/logger';
@@ -44,15 +36,7 @@ import {
4436
} from '@deepkit/sql/commands';
4537
import { FileStopwatchStore } from './debug/stopwatch/store.js';
4638
import { DebugProfileFramesCommand } from './cli/debug-debug-frames.js';
47-
import {
48-
rpcClass,
49-
RpcHooks,
50-
RpcKernel,
51-
RpcKernelBaseConnection,
52-
RpcKernelConnection,
53-
RpcKernelSecurity,
54-
SessionState,
55-
} from '@deepkit/rpc';
39+
import { rpcClass, RpcKernel, RpcKernelBaseConnection, RpcKernelConnection, RpcKernelSecurity, SessionState } from '@deepkit/rpc';
5640
import { DebugConfigController } from './cli/app-config.js';
5741
import { Zone } from './zone.js';
5842
import { DebugBrokerBus } from './debug/broker.js';
@@ -77,7 +61,6 @@ export class FrameworkModule extends createModuleClass({
7761
ApplicationServer,
7862
WebWorkerFactory,
7963
RpcServer,
80-
RpcHooks,
8164
MigrationProvider,
8265
DebugController,
8366
BrokerServer,
@@ -158,7 +141,6 @@ export class FrameworkModule extends createModuleClass({
158141
SessionState,
159142
RpcKernelConnection,
160143
RpcKernelBaseConnection,
161-
RpcHooks,
162144

163145
BrokerDeepkitAdapter,
164146
BrokerCache,

packages/framework/src/rpc.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export class RpcServerActionWithStopwatch extends RpcServerAction {
6666
}
6767

6868
export class RpcKernelConnectionWithStopwatch extends RpcKernelConnection {
69-
protected actionHandler = new RpcServerActionWithStopwatch(this.cache, this, this.controllers, this.injector, this.security, this.sessionState, this.logger, this.hooks);
69+
protected actionHandler = new RpcServerActionWithStopwatch(this.stats, this.cache, this, this.controllers, this.injector, this.eventDispatcher, this.security, this.sessionState, this.logger);
7070
stopwatch?: Stopwatch;
7171

7272
setStopwatch(stopwatch: Stopwatch) {

packages/framework/src/worker.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,7 @@
88
* You should have received a copy of the MIT License along with this program.
99
*/
1010

11-
import {
12-
RpcKernel,
13-
RpcKernelBaseConnection,
14-
RpcKernelConnection,
15-
SessionState,
16-
TransportConnection,
17-
} from '@deepkit/rpc';
11+
import { RpcKernel, RpcKernelBaseConnection, RpcKernelConnection, SessionState, TransportConnection } from '@deepkit/rpc';
1812
import http, { Server } from 'http';
1913
import https from 'https';
2014
import type { Server as WebSocketServer, ServerOptions as WebSocketServerOptions } from 'ws';
@@ -134,11 +128,15 @@ export class RpcServer implements RpcServerInterface {
134128
}
135129
}, req);
136130

137-
ws.on('message', async (message: Uint8Array) => {
131+
ws.on('message', (message: Uint8Array) => {
138132
connection.feed(message);
139133
});
140134

141-
ws.on('close', async () => {
135+
ws.on('error', (error) => {
136+
connection.close(error);
137+
});
138+
139+
ws.on('close', () => {
142140
connection.close();
143141
});
144142
});

packages/http/tests/utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { HttpRouterRegistry } from '../src/router.js';
99
export function createHttpKernel(
1010
controllers: (ClassType | { module: AppModule<any>, controller: ClassType })[] | ((registry: HttpRouterRegistry) => void) = [],
1111
providers: ProviderWithScope[] = [],
12-
listeners: (EventListener<any> | ClassType)[] = [],
12+
listeners: (EventListener | ClassType)[] = [],
1313
middlewares: MiddlewareFactory[] = [],
1414
modules: AppModule<any>[] = []
1515
) {
@@ -21,7 +21,7 @@ export function createHttpKernel(
2121
export function createHttpApp(
2222
controllers: (ClassType | { module: AppModule<any>, controller: ClassType })[] | ((registry: HttpRouterRegistry) => void) = [],
2323
providers: ProviderWithScope[] = [],
24-
listeners: (EventListener<any> | ClassType)[] = [],
24+
listeners: (EventListener | ClassType)[] = [],
2525
middlewares: MiddlewareFactory[] = [],
2626
modules: AppModule<any>[] = []
2727
) {

packages/logger/src/logger.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,15 @@ export class ConsoleLogger extends Logger {
335335
export class MemoryLogger extends Logger {
336336
public memory = new MemoryLoggerTransport();
337337

338-
constructor() {
339-
super([]);
340-
this.transporter.push(this.memory);
338+
constructor(
339+
transporter: LoggerTransport[] = [],
340+
formatter: LoggerFormatter[] = [],
341+
scope: string = ''
342+
) {
343+
super(transporter || [], formatter, scope);
344+
if (transporter.length === 0) {
345+
this.transporter.push(this.memory);
346+
}
341347
}
342348

343349
getOutput(): string {

0 commit comments

Comments
 (0)