订阅
本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →
简介
订阅是客户端与服务器之间的实时事件流机制。当需要向客户端推送实时更新时,请使用订阅功能。
通过 tRPC 的订阅功能,客户端将与服务器建立并维护持久连接,并在连接中断时借助 tracked() 事件自动尝试优雅重连和恢复。
WebSockets 还是 Server-sent Events?
在 tRPC 中设置实时订阅时,您可以选择使用 WebSockets 或 Server-sent Events(SSE)。
-
关于 WebSockets,请参阅 WebSockets 文档
-
关于 SSE,请参阅 httpSubscriptionLink
若不确定如何选择,我们推荐使用 SSE 实现订阅功能,因其配置更简单且无需搭建 WebSocket 服务器。
参考项目
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
基础示例
完整示例请参考 我们的全栈 SSE 示例。
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
使用 tracked() 自动追踪 ID(推荐)
当您使用 tracked() 辅助函数 yield 包含 id 的事件时,客户端在连接中断时会自动重连并发送最后已知的 ID。
初始化订阅时可发送初始 lastEventId,浏览器在接收数据时会自动更新该值。
-
对于 SSE,此机制遵循
EventSource规范,并通过.input()中的lastEventId传递 -
对于 WebSockets,我们的
wsLink会自动发送并更新最后已知 ID
若需基于 lastEventId 获取数据且必须捕获所有事件,请确保在从数据库获取事件前设置事件监听器(如我们的全栈 SSE 示例所示),这能防止在根据 lastEventId 生成原始批次时忽略新产生的事件。
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
循环拉取数据
此方案适用于需要定期检查数据库等数据源,并将新数据推送给客户端的场景。
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
从服务器停止订阅
若需从服务器端停止订阅,只需在生成器函数中执行 return。
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
在客户端,直接对订阅执行 .unsubscribe() 即可。
副作用清理
如需清理订阅产生的副作用,可采用 try...finally 模式。当订阅因任何原因终止时,trpc 会调用生成器实例的 .return()。
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
错误处理
在生成器函数中抛出错误会传递至后端 trpc 的 onError()。
若抛出 5xx 错误,客户端将基于通过 tracked() 追踪的最后事件 ID 自动重连。其他错误将导致订阅取消并触发 onError() 回调。
输出验证
由于订阅是异步迭代器,必须通过迭代器遍历才能验证输出结果。
Zod v4 使用示例
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
现在你可以使用这个辅助函数来验证订阅过程的输出:
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});