Skip to content

Commit e0b89ff

Browse files
committed
feat(hono-websocket): add Hono WebSocket adapter support
Add comprehensive Hono WebSocket integration to enable kkrpc usage with Hono framework. Includes new adapter implementation with manual message handling, support for multiple runtime environments (Bun, Deno, Cloudflare Workers), and complete test coverage covering RPC calls, concurrent connections, property access, and error handling.
1 parent 32eb19d commit e0b89ff

File tree

6 files changed

+371
-951
lines changed

6 files changed

+371
-951
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
"turbo": "^2.4.4",
2222
"typescript": "5.8.2"
2323
},
24-
"packageManager": "pnpm@10.19.0",
24+
"packageManager": "pnpm@10.20.0",
2525
"engines": {
2626
"node": ">=18"
2727
},
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import { afterAll, beforeAll, expect, test } from "bun:test"
2+
import { Hono } from "hono"
3+
import { upgradeWebSocket, websocket } from "hono/bun"
4+
import { RPCChannel, WebSocketClientIO, createHonoWebSocketHandler } from "../mod.ts"
5+
import type { DestroyableIoInterface } from "../src/interface.ts"
6+
import { apiMethods, type API } from "./scripts/api.ts"
7+
8+
const PORT = 3002
9+
let server: ReturnType<typeof Bun.serve> | null = null
10+
11+
beforeAll(() => {
12+
// Create Hono app with WebSocket support
13+
const app = new Hono()
14+
15+
app.get("/ws", upgradeWebSocket(() => {
16+
return createHonoWebSocketHandler<API>({
17+
expose: apiMethods
18+
})
19+
}))
20+
21+
// Start server
22+
server = Bun.serve({
23+
port: PORT,
24+
fetch: app.fetch,
25+
websocket
26+
})
27+
})
28+
29+
afterAll(() => {
30+
if (server) {
31+
server.stop()
32+
}
33+
})
34+
35+
test("Hono WebSocket RPC", async () => {
36+
const clientIO = new WebSocketClientIO({
37+
url: `ws://localhost:${PORT}/ws`
38+
})
39+
40+
const clientRPC = new RPCChannel<API, API, DestroyableIoInterface>(clientIO, {
41+
expose: apiMethods
42+
})
43+
const api = clientRPC.getAPI()
44+
45+
// Test individual calls
46+
const sum = await api.add(5, 3)
47+
expect(sum).toBe(8)
48+
49+
const product = await api.math.grade2.multiply(4, 6)
50+
expect(product).toBe(24)
51+
52+
// Test concurrent calls
53+
const results = await Promise.all([
54+
api.add(10, 20),
55+
api.math.grade2.multiply(10, 20),
56+
api.add(30, 40),
57+
api.math.grade2.multiply(30, 40)
58+
])
59+
60+
expect(results).toEqual([30, 200, 70, 1200])
61+
62+
// Test multiple random calls
63+
for (let i = 0; i < 50; i++) {
64+
const a = Math.floor(Math.random() * 100)
65+
const b = Math.floor(Math.random() * 100)
66+
67+
const sum = await api.add(a, b)
68+
expect(sum).toBe(a + b)
69+
70+
const product = await api.math.grade2.multiply(a, b)
71+
expect(product).toBe(a * b)
72+
}
73+
74+
clientIO.destroy()
75+
})
76+
77+
test("Hono WebSocket concurrent connections", async () => {
78+
const numClients = 5
79+
const clients = Array.from({ length: numClients }, () => {
80+
const clientIO = new WebSocketClientIO({
81+
url: `ws://localhost:${PORT}/ws`
82+
})
83+
return {
84+
io: clientIO,
85+
rpc: new RPCChannel<{}, API, DestroyableIoInterface>(clientIO)
86+
}
87+
})
88+
89+
try {
90+
// Test concurrent calls from multiple clients
91+
const results = await Promise.all(
92+
clients.flatMap(({ rpc }) => {
93+
const api = rpc.getAPI()
94+
return [api.add(10, 20), api.math.grade2.multiply(10, 20)]
95+
})
96+
)
97+
98+
// Verify results
99+
for (let i = 0; i < results.length; i += 2) {
100+
expect(results[i]).toBe(30) // add result
101+
expect(results[i + 1]).toBe(200) // multiply result
102+
}
103+
} finally {
104+
// Cleanup
105+
clients.forEach(({ io }) => io.destroy())
106+
}
107+
})
108+
109+
test("Hono WebSocket property access", async () => {
110+
const clientIO = new WebSocketClientIO({
111+
url: `ws://localhost:${PORT}/ws`
112+
})
113+
114+
const clientRPC = new RPCChannel<{}, API, DestroyableIoInterface>(clientIO)
115+
const api = clientRPC.getAPI()
116+
117+
// Test property access
118+
const counter = await api.counter
119+
expect(counter).toBe(42)
120+
121+
const nestedValue = await api.nested.value
122+
expect(nestedValue).toBe("hello world")
123+
124+
const deepProp = await api.nested.deepObj.prop
125+
expect(deepProp).toBe(true)
126+
127+
clientIO.destroy()
128+
})
129+
130+
test("Hono WebSocket error handling", async () => {
131+
const clientIO = new WebSocketClientIO({
132+
url: `ws://localhost:${PORT}/ws`
133+
})
134+
135+
const clientRPC = new RPCChannel<{}, API, DestroyableIoInterface>(clientIO)
136+
const api = clientRPC.getAPI()
137+
138+
// Test error throwing
139+
await expect(api.throwSimpleError()).rejects.toThrow("This is a simple error")
140+
141+
await expect(api.throwCustomError()).rejects.toThrow("This is a custom error")
142+
143+
clientIO.destroy()
144+
})

packages/kkrpc/mod.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* - deno
1111
* - websocket
1212
* - http
13+
* - hono-websocket
1314
* - RPC Channel
1415
* - serialization
1516
*
@@ -21,6 +22,7 @@ export * from "./src/adapters/node.ts"
2122
export * from "./src/adapters/websocket.ts"
2223
export * from "./src/adapters/http.ts"
2324
export * from "./src/adapters/tauri.ts"
25+
export * from "./src/adapters/hono-websocket.ts"
2426
export * from "./src/interface.ts"
2527
export * from "./src/channel.ts"
2628
export * from "./src/utils.ts"

packages/kkrpc/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,18 @@
114114
"peerDependencies": {
115115
"typescript": "^5.0.0",
116116
"socket.io": "^4.0.0",
117-
"socket.io-client": "^4.0.0"
117+
"socket.io-client": "^4.0.0",
118+
"hono": "^4.0.0"
118119
},
119120
"peerDependenciesMeta": {
120121
"socket.io": {
121122
"optional": true
122123
},
123124
"socket.io-client": {
124125
"optional": true
126+
},
127+
"hono": {
128+
"optional": true
125129
}
126130
},
127131
"dependencies": {
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import type { DestroyableIoInterface, IoCapabilities, IoMessage } from "../interface.ts"
2+
import { RPCChannel } from "../channel.ts"
3+
4+
/**
5+
* Options for creating a Hono WebSocket handler
6+
*/
7+
export interface HonoWebSocketOptions<API extends Record<string, any>> {
8+
/** The API implementation to expose on the server */
9+
expose: API
10+
/** Optional serialization options */
11+
serialization?: {
12+
version: "json" | "superjson"
13+
}
14+
}
15+
16+
/**
17+
* WebSocket IO adapter specifically for Hono that processes messages manually
18+
* This is needed because Hono handles messages through callbacks rather than native onmessage
19+
*/
20+
class HonoWebSocketIO implements DestroyableIoInterface {
21+
name = "hono-websocket-io"
22+
private messageQueue: string[] = []
23+
private resolveRead: ((value: string | null) => void) | null = null
24+
capabilities: IoCapabilities = {
25+
structuredClone: false,
26+
transfer: false
27+
}
28+
29+
constructor(private ws: WebSocket) {}
30+
31+
/**
32+
* Manually feed a message from Hono's onMessage callback
33+
*/
34+
feedMessage(message: string): void {
35+
const DESTROY_SIGNAL = "__DESTROY__"
36+
37+
if (message === DESTROY_SIGNAL) {
38+
this.destroy()
39+
return
40+
}
41+
42+
if (this.resolveRead) {
43+
this.resolveRead(message)
44+
this.resolveRead = null
45+
} else {
46+
this.messageQueue.push(message)
47+
}
48+
}
49+
50+
async read(): Promise<string | null> {
51+
if (this.messageQueue.length > 0) {
52+
return this.messageQueue.shift() ?? null
53+
}
54+
55+
return new Promise((resolve) => {
56+
this.resolveRead = resolve
57+
})
58+
}
59+
60+
async write(message: string | IoMessage): Promise<void> {
61+
if (typeof message !== "string") {
62+
throw new Error("HonoWebSocketIO only supports string messages")
63+
}
64+
this.ws.send(message)
65+
}
66+
67+
destroy(): void {
68+
this.ws.close()
69+
}
70+
71+
signalDestroy(): void {
72+
this.write("__DESTROY__")
73+
}
74+
}
75+
76+
/**
77+
* Creates a Hono WebSocket handler that integrates kkrpc with Hono's upgradeWebSocket
78+
*
79+
* This function works with Hono's upgradeWebSocket from:
80+
* - hono/bun
81+
* - hono/deno
82+
* - hono/cloudflare-workers
83+
*
84+
* @example
85+
* ```ts
86+
* // Bun example
87+
* import { Hono } from 'hono'
88+
* import { upgradeWebSocket, websocket } from 'hono/bun'
89+
* import { createHonoWebSocketHandler } from 'kkrpc'
90+
*
91+
* const app = new Hono()
92+
*
93+
* app.get('/ws', upgradeWebSocket(() => {
94+
* return createHonoWebSocketHandler({
95+
* expose: myAPI
96+
* })
97+
* }))
98+
*
99+
* Bun.serve({
100+
* fetch: app.fetch,
101+
* websocket
102+
* })
103+
* ```
104+
*
105+
* @example
106+
* ```ts
107+
* // Deno example
108+
* import { Hono } from 'hono'
109+
* import { upgradeWebSocket } from 'hono/deno'
110+
* import { createHonoWebSocketHandler } from 'kkrpc'
111+
*
112+
* const app = new Hono()
113+
*
114+
* app.get('/ws', upgradeWebSocket(() => {
115+
* return createHonoWebSocketHandler({
116+
* expose: myAPI
117+
* })
118+
* }))
119+
*
120+
* Deno.serve({ fetch: app.fetch, port: 8000 })
121+
* ```
122+
*/
123+
export function createHonoWebSocketHandler<API extends Record<string, any>>(
124+
options: HonoWebSocketOptions<API>
125+
): {
126+
onMessage(event: MessageEvent, ws: any): void
127+
onClose(): void
128+
onError?(event: Event, ws: any): void
129+
onOpen?(event: Event, ws: any): void
130+
} {
131+
let serverIO: HonoWebSocketIO | null = null
132+
let rpc: RPCChannel<API, API> | null = null
133+
134+
return {
135+
onOpen(_event: Event, ws: any) {
136+
// Create the IO adapter and RPC channel when connection opens
137+
// Hono passes different WebSocket types depending on runtime (WebSocket or WSContext)
138+
// Extract the actual WebSocket if it's wrapped
139+
const actualWs = (ws as any).raw || ws
140+
serverIO = new HonoWebSocketIO(actualWs)
141+
rpc = new RPCChannel<API, API>(serverIO, {
142+
expose: options.expose,
143+
serialization: options.serialization
144+
})
145+
},
146+
onMessage(event: MessageEvent, _ws: any) {
147+
// Convert message to string if needed
148+
let message = event.data
149+
if (typeof message === "object" && message !== null && "toString" in message) {
150+
message = message.toString("utf-8")
151+
} else if (typeof message !== "string") {
152+
message = String(message)
153+
}
154+
155+
// Feed the message to the IO adapter for processing
156+
if (serverIO) {
157+
serverIO.feedMessage(message)
158+
}
159+
},
160+
onClose() {
161+
if (serverIO) {
162+
serverIO.destroy()
163+
serverIO = null
164+
rpc = null
165+
}
166+
},
167+
onError(event: Event, _ws: any) {
168+
console.error("Hono WebSocket error:", event)
169+
if (serverIO) {
170+
serverIO.destroy()
171+
serverIO = null
172+
rpc = null
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)