Skip to content

Commit 28bff83

Browse files
committed
feat(orm): add adapter event dispatcher and logger
1 parent e4781db commit 28bff83

File tree

8 files changed

+110
-20
lines changed

8 files changed

+110
-20
lines changed

packages/framework/src/module.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,9 @@ export class FrameworkModule extends createModuleClass({
292292
protected setupDatabase() {
293293
for (const db of this.dbs) {
294294
this.configureProvider<DatabaseRegistry>(v => v.addDatabase(db.classType, {}, db.module));
295-
db.module.configureProvider((db: Database, eventDispatcher: EventDispatcher, stopwatch: Stopwatch) => {
296-
db.eventDispatcher = eventDispatcher;
295+
db.module.configureProvider((db: Database, eventDispatcher: EventDispatcher, logger: Logger, stopwatch: Stopwatch) => {
296+
db.setEventDispatcher(eventDispatcher);
297+
db.setLogger(logger);
297298
db.stopwatch = stopwatch;
298299
}, {}, db.classType);
299300
}

packages/mongo/src/adapter.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import { MongoPersistence } from './persistence.js';
2727
import { MongoClient } from './client/client.js';
2828
import { DeleteCommand } from './client/command/delete.js';
2929
import { MongoQueryResolver } from './query.resolver.js';
30-
import { MongoDatabaseTransaction } from './client/connection.js';
30+
import { MongoDatabaseTransaction, MongoDatabaseTransactionMonitor } from './client/connection.js';
3131
import { CreateIndex, CreateIndexesCommand } from './client/command/createIndexes.js';
3232
import { DropIndexesCommand } from './client/command/dropIndexes.js';
3333
import { CreateCollectionCommand } from './client/command/createCollection.js';
3434
import { entity, ReceiveType, ReflectionClass, resolveReceiveType } from '@deepkit/type';
3535
import { Command } from './client/command/command.js';
3636
import { AggregateCommand } from './client/command/aggregate.js';
37+
import { EventDispatcher } from '@deepkit/event';
38+
import { Logger } from '@deepkit/logger';
3739

3840
export class MongoDatabaseQueryFactory extends DatabaseAdapterQueryFactory {
3941
constructor(
@@ -114,6 +116,7 @@ export class MongoDatabaseAdapter extends DatabaseAdapter {
114116
public readonly client: MongoClient;
115117

116118
protected ormSequences: ReflectionClass<any>;
119+
protected transactionMonitor: MongoDatabaseTransactionMonitor = new MongoDatabaseTransactionMonitor(this.logger);
117120

118121
constructor(
119122
connection: string | MongoClient,
@@ -130,6 +133,17 @@ export class MongoDatabaseAdapter extends DatabaseAdapter {
130133
this.ormSequences = ReflectionClass.from(OrmSequence);
131134
}
132135

136+
setEventDispatcher(eventDispatcher: EventDispatcher) {
137+
super.setEventDispatcher(eventDispatcher);
138+
this.client.setEventDispatcher(eventDispatcher);
139+
}
140+
141+
setLogger(logger: Logger) {
142+
super.setLogger(logger);
143+
this.client.setLogger(logger);
144+
this.transactionMonitor.logger = logger;
145+
}
146+
133147
rawFactory(session: DatabaseSession<this>): MongoRawFactory {
134148
return new MongoRawFactory(session, this.client);
135149
}
@@ -147,7 +161,7 @@ export class MongoDatabaseAdapter extends DatabaseAdapter {
147161
}
148162

149163
createTransaction(session: DatabaseSession<this>): MongoDatabaseTransaction {
150-
return new MongoDatabaseTransaction;
164+
return new MongoDatabaseTransaction(this.transactionMonitor);
151165
}
152166

153167
isNativeForeignKeyConstraintSupported() {

packages/mongo/src/client/connection.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { AbortTransactionCommand } from './command/abortTransaction.js';
2424
import { DataEvent, EventDispatcher, EventTokenSync } from '@deepkit/event';
2525
import { IsMasterCommand } from './command/ismaster.js';
2626
import { Logger } from '@deepkit/logger';
27-
import { ConnectionOptions } from './options.js';
27+
import { CommandOptions, ConnectionOptions } from './options.js';
2828

2929
export enum MongoConnectionStatus {
3030
pending = 'pending',
@@ -585,22 +585,56 @@ export function readUint32LE(buffer: Uint8Array | ArrayBuffer, offset: number =
585585
return buffer[offset] + (buffer[offset + 1] * 2 ** 8) + (buffer[offset + 2] * 2 ** 16) + (buffer[offset + 3] * 2 ** 24);
586586
}
587587

588+
/**
589+
* This class is responsible to monitor leaking transactions and automatically
590+
* rollback them if the DatabaseSession was garbage collected without commit or rollback.
591+
*/
592+
export class MongoDatabaseTransactionMonitor {
593+
finalizer = new FinalizationRegistry<{
594+
connection: MongoConnection;
595+
lsid: { id: string };
596+
txnNumber: bigint;
597+
}>((value) => {
598+
this.logger.warn(`Leaking transaction detected. Automatically rollback transaction ${value.txnNumber}`);
599+
value.connection.execute(new AbortTransactionCommand())
600+
.catch((error) => {
601+
this.logger.error(`Could not rollback transaction ${value.txnNumber}: ${formatError(error)}`);
602+
}).finally(() => {
603+
value.connection.release();
604+
});
605+
});
606+
607+
constructor(public logger: Logger) {
608+
}
609+
}
610+
588611
export class MongoDatabaseTransaction extends DatabaseTransaction {
612+
commandOptions: CommandOptions = {};
589613
static txnNumber: bigint = 0n;
590614

591615
connection?: MongoConnection;
592616
lsid?: { id: string };
593617
txnNumber: bigint = 0n;
594618
started: boolean = false;
595619

620+
constructor(protected monitor: MongoDatabaseTransactionMonitor) {
621+
super();
622+
}
623+
624+
with(commandOptions: CommandOptions): this {
625+
Object.assign(this.commandOptions, commandOptions);
626+
return this;
627+
}
628+
596629
applyTransaction(cmd: TransactionalMessage) {
597-
if (!this.lsid) return;
630+
if (!this.lsid || !this.connection) return;
598631
cmd.lsid = this.lsid;
599632
cmd.txnNumber = this.txnNumber;
600633
cmd.autocommit = false;
601634
if (!this.started && !cmd.abortTransaction && !cmd.commitTransaction) {
602635
this.started = true;
603636
cmd.startTransaction = true;
637+
this.monitor.finalizer.register(this, { connection: this.connection, lsid: this.lsid, txnNumber: this.txnNumber });
604638
}
605639
}
606640

packages/mongo/tests/mongo.spec.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,24 @@ import {
1616
UUID,
1717
uuid,
1818
} from '@deepkit/type';
19-
import { getInstanceStateFromItem, UniqueConstraintFailure } from '@deepkit/orm';
19+
import { Database, getInstanceStateFromItem, UniqueConstraintFailure } from '@deepkit/orm';
2020
import { SimpleModel, SuperSimple } from './entities.js';
2121
import { createDatabase } from './utils.js';
2222
import { databaseFactory } from './factory.js';
23+
import { MongoDatabaseAdapter } from '../src/adapter.js';
24+
import { MemoryLogger } from '@deepkit/logger';
2325

2426
Error.stackTraceLimit = 100;
2527

28+
test('logger', async () => {
29+
const database = new Database(new MongoDatabaseAdapter('mongodb://invalid-host'));
30+
const logger = new MemoryLogger();
31+
database.setLogger(logger);
32+
await expect(database.adapter.client.connect()).rejects.toThrow('getaddrinfo ENOTFOUND invalid-host');
33+
expect(logger.memory.messageStrings[0]).toContain('getaddrinfo ENOTFOUND invalid-host');
34+
database.disconnect();
35+
});
36+
2637
test('test save undefined values', async () => {
2738
const session = await createDatabase('test save undefined values');
2839

packages/mongo/tests/replica.spec.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ describe('replica set, primary secondary', () => {
8282
expect(client.config.hosts[1].stats.commandsExecuted).toBe(3);
8383
});
8484

85-
test('transaction', async () => {
85+
test('transaction write', async () => {
8686
const client = createClient(`mongodb://primary`);
8787
const database = new Database(new MongoDatabaseAdapter(client), [User]);
8888

@@ -113,6 +113,21 @@ describe('replica set, primary secondary', () => {
113113
expect(await database.query(User).filter({ username: 'user1 changed' }).has()).toBe(true);
114114
}
115115
});
116+
117+
test('transaction read', async () => {
118+
const client = createClient(`mongodb://primary`);
119+
const database = new Database(new MongoDatabaseAdapter(client), [User]);
120+
121+
{
122+
await database.query(User).deleteMany();
123+
await database.persist(new User('user1'));
124+
125+
const session = database.createSession();
126+
session.useTransaction();
127+
const user = await session.query(User).findOne();
128+
expect(user.username).toBe('user1');
129+
}
130+
});
116131
});
117132

118133
describe.skip('local replica', () => {

packages/mongo/tsconfig.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
"module": "CommonJS",
1515
"lib": [
1616
"es2020",
17-
"es2022.error"
17+
"es2022.error",
18+
"es2021.weakref"
1819
],
1920
"esModuleInterop": true,
2021
"outDir": "./dist/cjs",

packages/orm/src/database-adapter.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,11 @@ import {
1818
isClass,
1919
stringifyValueWithType,
2020
} from '@deepkit/core';
21-
import {
22-
is,
23-
isSameType,
24-
ItemChanges,
25-
PrimaryKeyFields,
26-
ReceiveType,
27-
ReflectionClass,
28-
ReflectionKind,
29-
stringifyType,
30-
Type,
31-
} from '@deepkit/type';
21+
import { is, isSameType, ItemChanges, PrimaryKeyFields, ReceiveType, ReflectionClass, ReflectionKind, stringifyType, Type } from '@deepkit/type';
3222
import { Query } from './query.js';
3323
import { DatabaseSession, DatabaseTransaction } from './database-session.js';
24+
import { EventDispatcher } from '@deepkit/event';
25+
import { Logger } from '@deepkit/logger';
3426

3527
export abstract class DatabaseAdapterQueryFactory {
3628
abstract createQuery<T extends OrmEntity>(type?: ReceiveType<T> | ClassType<T> | AbstractClassType<T> | ReflectionClass<T>): Query<T>;
@@ -109,6 +101,17 @@ export class MigrateOptions {
109101
* You can specify a more specialized adapter like MysqlDatabaseAdapter/MongoDatabaseAdapter with special API for MySQL/Mongo.
110102
*/
111103
export abstract class DatabaseAdapter {
104+
eventDispatcher: EventDispatcher = new EventDispatcher();
105+
logger: Logger = new Logger();
106+
107+
setEventDispatcher(eventDispatcher: EventDispatcher) {
108+
this.eventDispatcher = eventDispatcher;
109+
}
110+
111+
setLogger(logger: Logger) {
112+
this.logger = logger;
113+
}
114+
112115
abstract queryFactory(session: DatabaseSession<this>): DatabaseAdapterQueryFactory;
113116

114117
rawFactory(session: DatabaseSession<this>): RawFactory<any> {

packages/orm/src/database.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { Stopwatch } from '@deepkit/stopwatch';
3232
import { getClassState, getInstanceState, getNormalizedPrimaryKey } from './identity-map.js';
3333
import { EventDispatcher, EventDispatcherUnsubscribe, EventListenerCallback, EventToken } from '@deepkit/event';
3434
import { DatabasePlugin, DatabasePluginRegistry } from './plugin/plugin.js';
35+
import { Logger } from '@deepkit/logger';
3536

3637
/**
3738
* Hydrates not completely populated item and makes it completely accessible.
@@ -174,6 +175,16 @@ export class Database<ADAPTER extends DatabaseAdapter = DatabaseAdapter> {
174175
}
175176
}
176177

178+
setEventDispatcher(eventDispatcher: EventDispatcher) {
179+
this.eventDispatcher = eventDispatcher;
180+
this.adapter.setEventDispatcher(eventDispatcher);
181+
}
182+
183+
setLogger(logger: Logger) {
184+
this.logger.logger = logger;
185+
this.adapter.setLogger(logger);
186+
}
187+
177188
registerPlugin(...plugins: DatabasePlugin[]): void {
178189
for (const plugin of plugins) {
179190
this.pluginRegistry.add(plugin);

0 commit comments

Comments
 (0)