Saltar al contenido principal
Versión: 11.x

WebSockets

Traducción Beta No Oficial

Esta página fue traducida por PageTurner AI (beta). No está respaldada oficialmente por el proyecto. ¿Encontraste un error? Reportar problema →

Puedes usar WebSockets para toda o parte de la comunicación con tu servidor. Consulta wsLink para configurarlo en el cliente.

consejo

Este documento detalla aspectos específicos de WebSockets. Para uso general de suscripciones, visita nuestra guía de suscripciones.

Creación de un servidor WebSocket

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
 
const wss = new WebSocketServer({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
 
wss.on('connection', (ws) => {
console.log(`++ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`-- Connection (${wss.clients.size})`);
});
});
console.log('WebSocket Server listening on ws://localhost:3001');
 
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
 
const wss = new WebSocketServer({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
 
wss.on('connection', (ws) => {
console.log(`++ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`-- Connection (${wss.clients.size})`);
});
});
console.log('WebSocket Server listening on ws://localhost:3001');
 
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

Configurar TRPCClient para usar WebSockets

consejo

Puedes usar Links para enrutar queries y/o mutations a transporte HTTP, y suscripciones a través de WebSockets.

client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
 
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
 
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
 
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
 
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

Autenticación / parámetros de conexión

consejo

En aplicaciones web puedes omitir esta sección, ya que las cookies se envían automáticamente con la solicitud.

Para autenticación con WebSockets, define connectionParams en createWSClient. Esto se enviará como primer mensaje cuando el cliente establezca la conexión WebSocket.

server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
import superjson from 'superjson';
 
const wsClient = createWSClient({
url: `ws://localhost:3000`,
 
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
import superjson from 'superjson';
 
const wsClient = createWSClient({
url: `ws://localhost:3000`,
 
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});

Seguimiento automático de ID con tracked() (recomendado)

Si usas yield con nuestro helper tracked() incluyendo un id, el cliente se reconectará automáticamente al desconectarse y enviará el último ID conocido como parte del input lastEventId.

Puedes enviar un lastEventId inicial al iniciar la suscripción, que se actualizará automáticamente cuando el navegador reciba datos.

información

Si obtienes datos basados en lastEventId y es crítico capturar todos los eventos, considera usar ReadableStream o patrones similares como intermediarios, como en nuestro ejemplo SSE full-stack, para evitar ignorar eventos nuevos mientras se procesa el lote original basado en lastEventId.

ts
import EventEmitter, { on } from 'events';
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';
 
type Post = { id: string; title: string };
 
const t = initTRPC.create();
const publicProcedure = t.procedure;
const router = t.router;
 
const ee = new EventEmitter();
 
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input?.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events since this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';
 
type Post = { id: string; title: string };
 
const t = initTRPC.create();
const publicProcedure = t.procedure;
const router = t.router;
 
const ee = new EventEmitter();
 
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input?.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events since this id
yield tracked(post.id, post);
}
}),
});

Especificación RPC para WebSockets

Puedes explorar más detalles en las definiciones de TypeScript:

query / mutation

Solicitud

ts
interface RequestMessage {
id: number | string;
jsonrpc?: '2.0';
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
interface RequestMessage {
id: number | string;
jsonrpc?: '2.0';
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

Respuesta

... o un error a continuación.

ts
interface ResponseMessage {
id: number | string;
jsonrpc?: '2.0';
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
};
}
ts
interface ResponseMessage {
id: number | string;
jsonrpc?: '2.0';
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
};
}

subscription / subscription.stop

Iniciar una suscripción

ts
interface SubscriptionRequest {
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
interface SubscriptionRequest {
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

Para cancelar una suscripción, llama subscription.stop

ts
interface SubscriptionStopRequest {
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
interface SubscriptionStopRequest {
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

Formato de respuesta de suscripción

... o un error a continuación.

ts
interface SubscriptionResponse {
id: number | string;
jsonrpc?: '2.0';
result:
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
};
}
ts
interface SubscriptionResponse {
id: number | string;
jsonrpc?: '2.0';
result:
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
};
}

Parámetros de conexión

Si la conexión se inicializa con ?connectionParams=1, el primer mensaje deben ser los parámetros de conexión.

ts
interface ConnectionParamsMessage {
data: Record<string, string> | null;
method: 'connectionParams';
}
ts
interface ConnectionParamsMessage {
data: Record<string, string> | null;
method: 'connectionParams';
}

Errores

Consulta https://www.jsonrpc.org/specification#error_object o Formato de errores.

Notificaciones de servidor a cliente

{ id: null, type: 'reconnect' }

Indica a los clientes que se reconecten antes de apagar el servidor. Se invoca mediante wssHandler.broadcastReconnectNotification().