Prenumerationer
Denna sida har översatts av PageTurner AI (beta). Inte officiellt godkänd av projektet. Hittade du ett fel? Rapportera problem →
Introduktion
Prenumerationer är en typ av realtidshändelseström mellan klient och server. Använd prenumerationer när du behöver skicka realtidsuppdateringar till klienten.
Med tRPC:s prenumerationer upprättar och underhåller klienten en beständig anslutning till servern samt försöker automatiskt återansluta och återhämta sig smidigt vid avbrott med hjälp av tracked()-händelser.
WebSockets eller Server-Sent Events?
Du kan antingen använda WebSockets eller Server-Sent Events (SSE) för att konfigurera realtidsprenumerationer i tRPC.
-
För WebSockets, se WebSockets-sidan
-
För SSE, se httpSubscriptionLink
Om du är osäker på vilken du ska använda rekommenderar vi SSE för prenumerationer eftersom det är enklare att konfigurera och inte kräver att du sätter upp en WebSocket-server.
Referensprojekt
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
Grundläggande exempel
För ett komplett exempel, se vårt fullstack SSE-exempel.
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
Automatisk ID-spårning med tracked() (rekommenderas)
Om du yield:ar en händelse med vår tracked()-hjälpfunktion och inkluderar ett id, kommer klienten automatiskt återansluta vid avbrott och skicka det senast kända ID:t.
Du kan skicka ett initialt lastEventId vid prenumerationsstart, vilket automatiskt uppdateras när webbläsaren tar emot data.
-
För SSE är detta en del av
EventSource-specifikationen och kommer propagera vialastEventIdi ditt.input(). -
För WebSockets kommer vår
wsLinkautomatiskt skicka det senast kända ID:t och uppdatera det när data tas emot.
Om du hämtar data baserat på lastEventId och det är kritiskt att fånga alla händelser, se till att du konfigurerar händelselyssnaren innan du hämtar händelser från din databas, så som det görs i vårt fullstack SSE-exempel. Detta kan förhindra att nyligen emitterade händelser ignoreras medan den ursprungliga batchen baserad på lastEventId yield'as.
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
Hämta data i loop
Detta tillvägagångssätt är användbart när du regelbundet vill kontrollera efter ny data från en källa (t.ex. databas) och skicka den till klienten.
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
Avsluta prenumeration från servern
För att avsluta en prenumeration från servern, gör enkelt return i generatorfunktionen.
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
På klientsidan anropar du bara .unsubscribe() på prenumerationen.
Rensning av biverkningar
För att rensa upp biverkningar från din prenumeration kan du använda mönstret try...finally, eftersom trpc anropar .return() på Generator-instansen när prenumerationen avslutas av någon anledning.
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
Felhantering
Att kasta ett fel i en generatorfunktion propageras till trpc:s onError() på serverdelen.
Om felet är ett 5xx-fel kommer klienten automatiskt försöka återansluta baserat på senaste händelse-ID som spåras med tracked(). För andra fel avbryts prenumerationen och felet propageras till onError()-callbacken.
Validering av utdata
Eftersom prenumerationer är asynkrona iteratorer måste du iterera genom dem för att validera utdatan.
Exempel med Zod v4
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
Nu kan du använda den här hjälpfunktionen för att validera utdatan från dina prenumerationsprocedurer:
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});