サブスクリプション
このページは PageTurner AI で翻訳されました(ベータ版)。プロジェクト公式の承認はありません。 エラーを見つけましたか? 問題を報告 →
はじめに
サブスクリプションはクライアントとサーバー間のリアルタイムイベントストリームです。クライアントにリアルタイム更新をプッシュする必要がある場合に使用します。
tRPCのサブスクリプションでは、クライアントはサーバーへの永続的な接続を確立・維持し、tracked()イベントの助けを借りて切断時に自動的に再接続を試み、正常に復旧します。
WebSocketとServer-sent Eventsの選択
tRPCでリアルタイムサブスクリプションを設定するには、WebSocketまたはServer-sent Events(SSE)のいずれかを使用できます。
-
WebSocketについてはWebSocketページを参照
-
SSEについてはhttpSubscriptionLinkを参照
どちらを使用するか迷った場合は、サブスクリプションにSSEを使用することをお勧めします。設定が簡単でWebSocketサーバーのセットアップが不要だからです。
参考プロジェクト
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
基本例
完全な例は当社のフルスタックSSEサンプルをご覧ください。
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
tracked()を使用したIDの自動追跡(推奨)
tracked()ヘルパーを使用してidを含むイベントをyieldすると、クライアントは切断時に自動的に再接続し、最後に確認されたIDを送信します。
サブスクリプションの初期化時にlastEventIdを送信すると、ブラウザがデータを受信するたびに自動的に更新されます。
-
SSEの場合、これは
EventSource仕様の一部であり、.input()のlastEventId経由で伝播されます。 -
WebSocketの場合、当社の
wsLinkが最後に確認されたIDを自動送信し、ブラウザがデータを受信すると更新します。
lastEventIdに基づいてデータを取得する場合で、すべてのイベントの捕捉が重要なときは、当社のフルスタックSSEサンプルで行われているように、データベースからイベントを取得する前にイベントリスナーを設定してください。これにより、lastEventIdに基づく元のバッチをyieldしている間に新しく発生したイベントが無視されるのを防げます。
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
ループでのデータ取得
この方法は、データベースなどのソースから定期的に新しいデータをチェックし、クライアントにプッシュしたい場合に有用です。
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
サーバー側からのサブスクリプション停止
サーバーからサブスクリプションを停止する必要がある場合は、ジェネレーター関数内で単にreturnします。
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
クライアント側では、サブスクリプションを.unsubscribe()するだけです。
副作用のクリーンアップ
サブスクリプションの副作用をクリーンアップする必要がある場合は、try...finallyパターンを使用できます。trpcはサブスクリプションが何らかの理由で停止するとジェネレーターインスタンスの.return()を呼び出すためです。
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
エラー処理
ジェネレーター関数内でエラーをスローすると、バックエンドのtrpcのonError()に伝播します。
スローされたエラーが5xxエラーの場合、クライアントはtracked()で追跡された最後のイベントIDに基づいて自動的に再接続を試みます。それ以外のエラーの場合、サブスクリプションはキャンセルされonError()コールバックに伝播します。
出力検証
サブスクリプションは非同期イテレータであるため、出力を検証するにはイテレータを処理する必要があります。
Zod v4の使用例
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
このヘルパーを使用して、サブスクリプションプロシージャの出力を検証できるようになります:
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});