Suscripciones
Esta página fue traducida por PageTurner AI (beta). No está respaldada oficialmente por el proyecto. ¿Encontraste un error? Reportar problema →
Introducción
Las suscripciones son un tipo de flujo de eventos en tiempo real entre el cliente y el servidor. Úsalas cuando necesites enviar actualizaciones en tiempo real al cliente.
Con las suscripciones de tRPC, el cliente establece y mantiene una conexión persistente con el servidor, además de intentar reconectarse y recuperarse automáticamente si se desconecta, gracias a los eventos tracked().
¿WebSockets o Eventos Enviados por el Servidor?
Puedes usar WebSockets o Eventos Enviados por el Servidor (SSE) para configurar suscripciones en tiempo real con tRPC.
-
Para WebSockets, consulta la página de WebSockets
-
Para SSE, consulta el httpSubscriptionLink
Si no estás seguro de cuál usar, te recomendamos SSE para suscripciones porque es más fácil de configurar y no requiere un servidor WebSocket.
Proyectos de referencia
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
Ejemplo básico
Para un ejemplo completo, consulta nuestro ejemplo full-stack con SSE.
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
Seguimiento automático de ID usando tracked() (recomendado)
Si envías (yield) un evento usando nuestro helper tracked() e incluyes un id, el cliente se reconectará automáticamente al desconectarse y enviará el último ID conocido.
Puedes enviar un lastEventId inicial al iniciar la suscripción, que se actualizará automáticamente cuando el navegador reciba datos.
-
En SSE, esto es parte de la especificación
EventSourcey se propagará mediantelastEventIden tu.input(). -
En WebSockets, nuestro
wsLinkenviará automáticamente el último ID conocido y lo actualizará cuando el navegador reciba datos.
Si obtienes datos basados en el lastEventId y capturar todos los eventos es crítico, asegúrate de configurar el listener de eventos antes de obtener eventos de tu base de datos, como se hace en nuestro ejemplo full-stack con SSE. Esto evita que se ignoren eventos nuevos emitidos mientras se envía (yield) el lote original basado en lastEventId.
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
Obtener datos periódicamente
Esta técnica es útil cuando necesitas verificar periódicamente nuevos datos desde fuentes como una base de datos y enviarlos al cliente.
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
Detener una suscripción desde el servidor
Si necesitas detener una suscripción desde el servidor, simplemente usa return en la función generadora.
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
En el cliente, solo ejecuta .unsubscribe() en la suscripción.
Limpieza de efectos secundarios
Si necesitas limpiar efectos secundarios de tu suscripción, usa el patrón try...finally, ya que trpc invoca .return() en la instancia del generador cuando la suscripción se detiene por cualquier motivo.
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
Manejo de errores
Lanzar un error en una función generadora propaga el error al onError() de trpc en el backend.
Si el error es de tipo 5xx, el cliente intentará reconectarse automáticamente usando el último ID de evento rastreado con tracked(). Para otros errores, la suscripción se cancelará y propagará al callback onError().
Validación de salida
Dado que las suscripciones son iteradores asíncronos, debes recorrer el iterador para validar la salida.
Ejemplo con Zod v4
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
Ahora puedes usar este ayudante para validar la salida de tus procedimientos de suscripción:
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});