Skip to content

Commit 349668f

Browse files
committed
feat(rpc): add utility functions to create Subject/Observable synchronously from an RPC controller
Fix also race condition in DirectClient
1 parent 9d9e29a commit 349668f

File tree

5 files changed

+224
-15
lines changed

5 files changed

+224
-15
lines changed

packages/rpc/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ export * from './src/model.js';
2424
export * from './src/protocol.js';
2525
export * from './src/progress.js';
2626
export * from './src/transport.js';
27+
export * from './src/utils.js';

packages/rpc/src/client/client-direct.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ export class RpcDirectClientAdapter implements ClientTransportAdapter {
2828
const kernelConnection = this.rpcKernel.createConnection({
2929
writeBinary: (buffer) => {
3030
if (closed) return;
31-
connection.readBinary(buffer);
31+
setImmediate(() => {
32+
connection.readBinary(buffer);
33+
});
3234
},
3335
close: () => {
3436
closed = true;

packages/rpc/src/utils.ts

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import { Observable, Subject } from 'rxjs';
2+
import { isFunction } from '@deepkit/core';
3+
4+
5+
/**
6+
* Create a Subject with teardown function that is called when
7+
* client disconnects or completes the subject.
8+
*
9+
* The producer is called in the next tick, no matter if the client
10+
* subscribes to the subject or not. This is fundamentally different
11+
* from Observables, where the producer is only called when the Observable
12+
* is subscribed to.
13+
*
14+
* Teardown is also called when the producer errors or completes.
15+
*
16+
* ```typescript
17+
* class Controller {
18+
* @rpc.action()
19+
* subscribeChats(channel: string): Subject<Message> {
20+
* return createSubject((subject) => {
21+
* subject.next({ text: 'hello' });
22+
* subject.next({ text: 'world' });
23+
* subject.complete();
24+
* }, () => {
25+
* // cleanup
26+
* });
27+
* }
28+
* }
29+
* ```
30+
*/
31+
export function createSubject<T>(
32+
producer: (subject: Subject<T>) => void | Promise<void>,
33+
teardown?: () => void,
34+
): Subject<T> {
35+
const subject = new Subject<T>();
36+
setImmediate(async () => {
37+
try {
38+
await producer(subject);
39+
} catch (error) {
40+
subject.error(error);
41+
}
42+
});
43+
if (teardown) subject.subscribe().add(teardown);
44+
return subject;
45+
}
46+
47+
export type InstanceProducer<T> = T | Promise<T> | (() => T) | (() => Promise<T>);
48+
49+
async function resolveProducer<T>(producer: InstanceProducer<T>): Promise<T> {
50+
if (producer instanceof Promise) {
51+
return producer;
52+
}
53+
if (isFunction(producer)) {
54+
return producer();
55+
}
56+
return producer;
57+
}
58+
59+
/**
60+
* Returns a Subject that is immediately subscribed to the given producer.
61+
* The producer can be a Promise, a function that returns a Promise, or a Subject.
62+
*
63+
* Note that the Subject will be requested from the Promise right away.
64+
*
65+
* ```typescript
66+
* const client = new RpcClient();
67+
* const controller = client.controller<MyController>('controller');
68+
*
69+
* // normally you would do this:
70+
* const subject = await controller.subscribeChats('asd');
71+
*
72+
* // but with instantSubject you can do this, allowing you to get
73+
* // a Subject in a synchronous way.
74+
* const subject = instantSubject(controller.subscribeChats('asd'));
75+
* ```
76+
*/
77+
export function instantSubject<T>(producer: InstanceProducer<Subject<T>>): Subject<T> {
78+
const subject = new Subject<T>();
79+
resolveProducer(producer).then((s) => {
80+
s.subscribe(subject);
81+
subject.subscribe().add(() => s.complete());
82+
}, (error) => {
83+
subject.error(error);
84+
});
85+
return subject;
86+
}
87+
88+
/**
89+
* Returns an Observable that, when subscribed to, calls the producer and subscribes to the returned Observable.
90+
* The producer can be a Promise, a function that returns a Promise, or an Observable.
91+
*
92+
* The Observable from the RPC on the server will be automatically subscribed and unsubscribed.
93+
*
94+
* ```typescript
95+
* const client = new RpcClient();
96+
* const controller = client.controller<MyController>('controller');
97+
*
98+
* // normally you would do this:
99+
* const observable = await controller.subscribeChats('asd');
100+
*
101+
* // but with instantObservable you can do this, allowing you to get
102+
* // an Observable in a synchronous way.
103+
* const observable = instantObservable(controller.subscribeChats('asd'));
104+
*
105+
* // or with a function, which is useful when you only want to do RPC calls
106+
* // when the Observable is actually subscribed to.
107+
* const observable = instantObservable(() => controller.subscribeChats('asd'));
108+
*/
109+
export function instantObservable<T>(producer: InstanceProducer<Observable<T>>): Observable<T> {
110+
return new Observable<T>((observer) => {
111+
resolveProducer(producer).then((o) => {
112+
o.subscribe(observer);
113+
}, (error) => {
114+
observer.error(error);
115+
});
116+
});
117+
}

packages/rpc/tests/controller.spec.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -966,15 +966,13 @@ test('connection disconnect back-controller', async () => {
966966
}
967967
}
968968

969-
let actionPromise: Promise<any> | undefined;
970-
971969
class ServerController {
972970
constructor(private connection: RpcKernelConnection) {
973971
}
974972

975973
@rpc.action()
976974
async start() {
977-
actionPromise = this.connection.controller<ClientController>('client').action();
975+
return this.connection.controller<ClientController>('client').action();
978976
}
979977
}
980978

@@ -984,16 +982,14 @@ test('connection disconnect back-controller', async () => {
984982
const controller = client.controller<ServerController>('myController');
985983

986984
await client.connect();
987-
await controller.start();
988-
await expect(actionPromise).rejects.toThrow('RpcClient has no controllers registered');
985+
await expect(controller.start()).rejects.toThrow('RpcClient has no controllers registered');
989986

990987
client.registerController(ClientController, 'client');
991-
await controller.start();
992-
await expect(actionPromise).resolves.toBe(true);
988+
await expect(controller.start()).resolves.toBe(true);
993989

994-
await controller.start();
990+
const promise = controller.start();
995991
await client.disconnect();
996-
await expect(actionPromise).rejects.toThrow('Connection closed');
992+
await expect(promise).rejects.toThrow('Connection closed');
997993
});
998994

999995

packages/rpc/tests/observable.spec.ts

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import { sleep } from '@deepkit/core';
22
import { entity } from '@deepkit/type';
33
import { expect, test } from '@jest/globals';
4-
import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
4+
import { BehaviorSubject, Observable, Subject, Subscription, toArray } from 'rxjs';
55
import { first, take } from 'rxjs/operators';
66
import { DirectClient } from '../src/client/client-direct.js';
77
import { rpc } from '../src/decorators.js';
88
import { RpcKernel } from '../src/server/kernel.js';
9+
import { createSubject, instantObservable, instantSubject } from '../src/utils';
910

1011
test('observable basics', async () => {
1112
@entity.name('model')
@@ -214,7 +215,7 @@ test('subject redirect of global subject', async () => {
214215
const sub = globalSubject.subscribe(subject);
215216
subject.subscribe().add(() => {
216217
completes++;
217-
sub.unsubscribe()
218+
sub.unsubscribe();
218219
});
219220
return subject;
220221
}
@@ -279,7 +280,7 @@ test('observable unsubscribes automatically when connection closes', async () =>
279280
return {
280281
unsubscribe() {
281282
unsubscribed = true;
282-
}
283+
},
283284
};
284285
});
285286
}
@@ -465,7 +466,7 @@ test('observable complete', async () => {
465466
unsubscribe() {
466467
done = true;
467468
active = false;
468-
}
469+
},
469470
};
470471
});
471472
}
@@ -487,7 +488,7 @@ test('observable complete', async () => {
487488
return {
488489
unsubscribe() {
489490
unsubscribedCalled = true;
490-
}
491+
},
491492
};
492493
});
493494
{
@@ -537,3 +538,95 @@ test('observable complete', async () => {
537538
}
538539
});
539540

541+
test('createSubject', async () => {
542+
let teardowns = 0;
543+
544+
class Controller {
545+
@rpc.action()
546+
async subscribeChats() {
547+
return createSubject<string>((subject) => {
548+
subject.next('hello');
549+
subject.next('world');
550+
}, () => teardowns++);
551+
}
552+
}
553+
554+
const kernel = new RpcKernel();
555+
kernel.registerController(Controller, 'myController');
556+
557+
const client = new DirectClient(kernel);
558+
const controller = client.controller<Controller>('myController');
559+
560+
{
561+
const o = await controller.subscribeChats();
562+
expect(o).toBeInstanceOf(Subject);
563+
const values = await o.pipe(take(2), toArray()).toPromise();
564+
expect(values).toEqual(['hello', 'world']);
565+
o.complete();
566+
await sleep(0);
567+
expect(teardowns).toBe(1);
568+
}
569+
570+
{
571+
const s = instantSubject(controller.subscribeChats());
572+
const values = await s.pipe(take(2), toArray()).toPromise();
573+
expect(values).toEqual(['hello', 'world']);
574+
s.complete();
575+
await sleep(0);
576+
expect(teardowns).toBe(2);
577+
}
578+
579+
{
580+
const s = instantSubject(() => controller.subscribeChats());
581+
const values = await s.pipe(take(2), toArray()).toPromise();
582+
expect(values).toEqual(['hello', 'world']);
583+
s.complete();
584+
await sleep(0);
585+
expect(teardowns).toBe(3);
586+
}
587+
});
588+
589+
test('instantObservable', async () => {
590+
let teardowns = 0;
591+
592+
class Controller {
593+
@rpc.action()
594+
subscribeChats(channel: string) {
595+
return new Observable<string>((subject) => {
596+
subject.next(channel);
597+
subject.next('hello');
598+
subject.next('world');
599+
return () => teardowns++;
600+
});
601+
}
602+
}
603+
604+
const kernel = new RpcKernel();
605+
kernel.registerController(Controller, 'myController');
606+
607+
const client = new DirectClient(kernel);
608+
const controller = client.controller<Controller>('myController');
609+
610+
{
611+
const o = await controller.subscribeChats('a');
612+
expect(o).toBeInstanceOf(Observable);
613+
expect(teardowns).toBe(0);
614+
const values = await o.pipe(take(3), toArray()).toPromise();
615+
expect(teardowns).toBe(1);
616+
expect(values).toEqual(['a', 'hello', 'world']);
617+
}
618+
619+
{
620+
expect(teardowns).toBe(1);
621+
const values = await instantObservable(controller.subscribeChats('b')).pipe(take(3), toArray()).toPromise();
622+
expect(teardowns).toBe(2);
623+
expect(values).toEqual(['b', 'hello', 'world']);
624+
}
625+
626+
{
627+
expect(teardowns).toBe(2);
628+
const values = await instantObservable(() => controller.subscribeChats('b')).pipe(take(3), toArray()).toPromise();
629+
expect(teardowns).toBe(3);
630+
expect(values).toEqual(['b', 'hello', 'world']);
631+
}
632+
});

0 commit comments

Comments
 (0)