Aller au contenu principal
Version : 11.x

WebSockets

Traduction Bêta Non Officielle

Cette page a été traduite par PageTurner AI (bêta). Non approuvée officiellement par le projet. Vous avez trouvé une erreur ? Signaler un problème →

Vous pouvez utiliser WebSockets pour tout ou partie de la communication avec votre serveur. Consultez wsLink pour la configuration côté client.

astuce

Ce document détaille l'utilisation spécifique des WebSockets. Pour une vue d'ensemble des abonnements, référez-vous à notre guide sur les abonnements.

Création d'un serveur WebSocket

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
 
const wss = new WebSocketServer({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
 
wss.on('connection', (ws) => {
console.log(`++ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`-- Connection (${wss.clients.size})`);
});
});
console.log('WebSocket Server listening on ws://localhost:3001');
 
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
 
const wss = new WebSocketServer({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
 
wss.on('connection', (ws) => {
console.log(`++ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`-- Connection (${wss.clients.size})`);
});
});
console.log('WebSocket Server listening on ws://localhost:3001');
 
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

Configuration de TRPCClient pour utiliser WebSockets

astuce

Vous pouvez utiliser des Links pour router les queries/mutations via HTTP et les abonnements via WebSockets.

client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
 
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
 
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
 
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
 
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

Authentification / paramètres de connexion

astuce

Dans une application web, vous pouvez ignorer cette section car les cookies sont envoyés avec la requête.

Pour vous authentifier via WebSockets, définissez des connectionParams dans createWSClient. Ils seront envoyés comme premier message lors de l'établissement de la connexion.

server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
import superjson from 'superjson';
 
const wsClient = createWSClient({
url: `ws://localhost:3000`,
 
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
import superjson from 'superjson';
 
const wsClient = createWSClient({
url: `ws://localhost:3000`,
 
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});

Suivi automatique d'ID avec tracked() (recommandé)

Si vous utilisez yield avec notre helper tracked() en incluant un id, le client se reconnectera automatiquement après une déconnexion et renverra le dernier ID connu via lastEventId.

Vous pouvez envoyer un lastEventId initial lors de l'initialisation de l'abonnement, qui sera automatiquement mis à jour au fur et à mesure que le navigateur reçoit des données.

info

Si vous récupérez des données basées sur lastEventId et que la capture de tous les événements est critique, utilisez des ReadableStream ou un pattern similaire comme intermédiaire, comme dans notre exemple SSE full-stack, pour éviter l'ignorance d'événements émis pendant le traitement du batch initial basé sur lastEventId.

ts
import EventEmitter, { on } from 'events';
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';
 
type Post = { id: string; title: string };
 
const t = initTRPC.create();
const publicProcedure = t.procedure;
const router = t.router;
 
const ee = new EventEmitter();
 
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input?.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events since this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';
 
type Post = { id: string; title: string };
 
const t = initTRPC.create();
const publicProcedure = t.procedure;
const router = t.router;
 
const ee = new EventEmitter();
 
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input?.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events since this id
yield tracked(post.id, post);
}
}),
});

Spécification RPC pour WebSockets

Pour plus de détails, explorez les définitions TypeScript :

query / mutation

Requête

ts
interface RequestMessage {
id: number | string;
jsonrpc?: '2.0';
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
interface RequestMessage {
id: number | string;
jsonrpc?: '2.0';
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

Réponse

... ci-dessous, ou une erreur.

ts
interface ResponseMessage {
id: number | string;
jsonrpc?: '2.0';
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
};
}
ts
interface ResponseMessage {
id: number | string;
jsonrpc?: '2.0';
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
};
}

subscription / subscription.stop

Démarrer un abonnement

ts
interface SubscriptionRequest {
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
interface SubscriptionRequest {
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

Pour annuler un abonnement, appelez subscription.stop

ts
interface SubscriptionStopRequest {
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
interface SubscriptionStopRequest {
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

Format de réponse d'abonnement

... ci-dessous, ou une erreur.

ts
interface SubscriptionResponse {
id: number | string;
jsonrpc?: '2.0';
result:
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
};
}
ts
interface SubscriptionResponse {
id: number | string;
jsonrpc?: '2.0';
result:
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
};
}

Paramètres de connexion

Si la connexion est initialisée avec ?connectionParams=1, le premier message doit contenir les paramètres de connexion.

ts
interface ConnectionParamsMessage {
data: Record<string, string> | null;
method: 'connectionParams';
}
ts
interface ConnectionParamsMessage {
data: Record<string, string> | null;
method: 'connectionParams';
}

Erreurs

Voir https://www.jsonrpc.org/specification#error_object ou Formatage des erreurs.

Notifications du serveur au client

{ id: null, type: 'reconnect' }

Demande aux clients de se reconnecter avant l'arrêt du serveur. Appelé via wssHandler.broadcastReconnectNotification().