HTTP 订阅链接
本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →
httpSubscriptionLink 是一个使用 Server-sent Events (SSE) 处理订阅的终止链接。
SSE 是实现实时通信的不错选择,它比建立 WebSocket 服务器要简单一些。
设置
如果客户端环境不支持 EventSource,您需要安装EventSource polyfill。React Native 的具体配置请参考兼容性章节。
使用 httpSubscriptionLink 时,需要通过splitLink明确指定使用 SSE 处理订阅请求。
client/index.tstsimport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,loggerLink ,splitLink ,} from '@trpc/client';import type {AppRouter } from './server';consttrpcClient =createTRPCClient <AppRouter >({/*** @see https://trpc.io/docs/v11/client/links*/links : [// adds pretty logs to your console in development and logs errors in productionloggerLink (),splitLink ({// uses the httpSubscriptionLink for subscriptionscondition : (op ) =>op .type === 'subscription',true :httpSubscriptionLink ({url : `/api/trpc`,}),false :httpBatchLink ({url : `/api/trpc`,}),}),],});
client/index.tstsimport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,loggerLink ,splitLink ,} from '@trpc/client';import type {AppRouter } from './server';consttrpcClient =createTRPCClient <AppRouter >({/*** @see https://trpc.io/docs/v11/client/links*/links : [// adds pretty logs to your console in development and logs errors in productionloggerLink (),splitLink ({// uses the httpSubscriptionLink for subscriptionscondition : (op ) =>op .type === 'subscription',true :httpSubscriptionLink ({url : `/api/trpc`,}),false :httpBatchLink ({url : `/api/trpc`,}),}),],});
本文档重点介绍 httpSubscriptionLink 的具体用法。关于订阅功能的通用指南,请参阅订阅功能文档。
请求头与认证鉴权
Web 应用场景
同域名场景
在 Web 应用中,只要客户端与服务器处于同一域名下,Cookie 将自动随请求发送。
跨域名场景
当客户端与服务器处于不同域名时,可设置 withCredentials: true(详见 MDN 文档)。
示例:
tsx// [...]httpSubscriptionLink ({url : 'https://example.com/api/trpc',eventSourceOptions () {return {withCredentials : true, // <---};},});
tsx// [...]httpSubscriptionLink ({url : 'https://example.com/api/trpc',eventSourceOptions () {return {withCredentials : true, // <---};},});
通过 Ponyfill 添加自定义请求头
推荐用于非 Web 环境
您可以通过 ponyfill 引入 EventSource,并使用 eventSourceOptions 回调函数配置请求头。
tsximport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,splitLink ,} from '@trpc/client';import {EventSourcePolyfill } from 'event-source-polyfill';import type {AppRouter } from './server';// Initialize the tRPC clientconsttrpc =createTRPCClient <AppRouter >({links : [splitLink ({condition : (op ) =>op .type === 'subscription',true :httpSubscriptionLink ({url : 'http://localhost:3000',// ponyfill EventSourceEventSource :EventSourcePolyfill ,// options to pass to the EventSourcePolyfill constructoreventSourceOptions : async ({op }) => {// you can use this to generate a signature for the operationconstsignature = awaitgetSignature (op );return {headers : {authorization : 'Bearer supersecret','x-signature':signature ,},};},}),false :httpBatchLink ({url : 'http://localhost:3000',}),}),],});
tsximport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,splitLink ,} from '@trpc/client';import {EventSourcePolyfill } from 'event-source-polyfill';import type {AppRouter } from './server';// Initialize the tRPC clientconsttrpc =createTRPCClient <AppRouter >({links : [splitLink ({condition : (op ) =>op .type === 'subscription',true :httpSubscriptionLink ({url : 'http://localhost:3000',// ponyfill EventSourceEventSource :EventSourcePolyfill ,// options to pass to the EventSourcePolyfill constructoreventSourceOptions : async ({op }) => {// you can use this to generate a signature for the operationconstsignature = awaitgetSignature (op );return {headers : {authorization : 'Bearer supersecret','x-signature':signature ,},};},}),false :httpBatchLink ({url : 'http://localhost:3000',}),}),],});
更新活动连接的配置
httpSubscriptionLink 基于 EventSource 实现 SSE,确保网络故障或错误响应码等连接问题会自动重试。但 EventSource 不允许重新执行 eventSourceOptions() 或 url() 来更新配置,这在认证信息过期时尤为重要。
为解决此限制,可将 retryLink 与 httpSubscriptionLink 配合使用。此方案能确保使用最新配置(包括更新的认证信息)重新建立连接。
请注意:重启连接会导致 EventSource 完全重建,所有已跟踪的事件状态将丢失。
tsximport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,retryLink ,splitLink ,} from '@trpc/client';import {EventSourcePolyfill ,EventSourcePolyfillInit ,} from 'event-source-polyfill';import type {AppRouter } from './server';// Initialize the tRPC clientconsttrpc =createTRPCClient <AppRouter >({links : [splitLink ({condition : (op ) =>op .type === 'subscription',false :httpBatchLink ({url : 'http://localhost:3000',}),true : [retryLink ({retry : (opts ) => {opts .op .type ;// ^? will always be 'subscription' since we're in a splitLinkconstcode =opts .error .data ?.code ;if (!code ) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole .error ('No error code found, retrying',opts );return true;}if (code === 'UNAUTHORIZED' ||code === 'FORBIDDEN') {console .log ('Retrying due to 401/403 error');return true;}return false;},}),httpSubscriptionLink ({url : async () => {// calculate the latest URL if needed...returngetAuthenticatedUri ();},// ponyfill EventSourceEventSource :EventSourcePolyfill ,eventSourceOptions : async () => {// ...or maybe renew an access tokenconsttoken = awaitauth .getOrRenewToken ();return {headers : {authorization : `Bearer ${token }`,},};},}),],}),],});
tsximport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,retryLink ,splitLink ,} from '@trpc/client';import {EventSourcePolyfill ,EventSourcePolyfillInit ,} from 'event-source-polyfill';import type {AppRouter } from './server';// Initialize the tRPC clientconsttrpc =createTRPCClient <AppRouter >({links : [splitLink ({condition : (op ) =>op .type === 'subscription',false :httpBatchLink ({url : 'http://localhost:3000',}),true : [retryLink ({retry : (opts ) => {opts .op .type ;// ^? will always be 'subscription' since we're in a splitLinkconstcode =opts .error .data ?.code ;if (!code ) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole .error ('No error code found, retrying',opts );return true;}if (code === 'UNAUTHORIZED' ||code === 'FORBIDDEN') {console .log ('Retrying due to 401/403 error');return true;}return false;},}),httpSubscriptionLink ({url : async () => {// calculate the latest URL if needed...returngetAuthenticatedUri ();},// ponyfill EventSourceEventSource :EventSourcePolyfill ,eventSourceOptions : async () => {// ...or maybe renew an access tokenconsttoken = awaitauth .getOrRenewToken ();return {headers : {authorization : `Bearer ${token }`,},};},}),],}),],});
连接参数
若需通过 EventSource 进行认证,可在 httpSubscriptionLink 中定义 connectionParams。由于参数会作为 URL 的一部分发送,因此推荐使用其他认证方式。
server/context.tstsimport type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tstsimport type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tstsimport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,splitLink ,} from '@trpc/client';import type {AppRouter } from './server';// Initialize the tRPC clientconsttrpc =createTRPCClient <AppRouter >({links : [splitLink ({condition : (op ) =>op .type === 'subscription',true :httpSubscriptionLink ({url : 'http://localhost:3000',connectionParams : async () => {// Will be serialized as part of the URLreturn {token : 'supersecret',};},}),false :httpBatchLink ({url : 'http://localhost:3000',}),}),],});
client/trpc.tstsimport {createTRPCClient ,httpBatchLink ,httpSubscriptionLink ,splitLink ,} from '@trpc/client';import type {AppRouter } from './server';// Initialize the tRPC clientconsttrpc =createTRPCClient <AppRouter >({links : [splitLink ({condition : (op ) =>op .type === 'subscription',true :httpSubscriptionLink ({url : 'http://localhost:3000',connectionParams : async () => {// Will be serialized as part of the URLreturn {token : 'supersecret',};},}),false :httpBatchLink ({url : 'http://localhost:3000',}),}),],});
超时配置
httpSubscriptionLink 支持通过 reconnectAfterInactivityMs 选项配置空闲超时。如果在指定时间内未收到任何消息(包括心跳消息),连接将标记为"连接中"状态并自动尝试重连。
超时配置在服务端初始化 tRPC 时设置:
server/trpc.tstsimport {initTRPC } from '@trpc/server';export constt =initTRPC .create ({sse : {client : {reconnectAfterInactivityMs : 3_000,},},});
server/trpc.tstsimport {initTRPC } from '@trpc/server';export constt =initTRPC .create ({sse : {client : {reconnectAfterInactivityMs : 3_000,},},});
服务端心跳配置
可配置服务端定期发送心跳消息以保持连接活跃,防止超时断开。此功能与 reconnectAfterInactivityMs 选项配合使用效果尤佳。
server/trpc.tstsimport {initTRPC } from '@trpc/server';export constt =initTRPC .create ({sse : {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_000,ping : {// Enable periodic ping messages to keep connection aliveenabled : true,// Send ping message every 2sintervalMs : 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
server/trpc.tstsimport {initTRPC } from '@trpc/server';export constt =initTRPC .create ({sse : {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_000,ping : {// Enable periodic ping messages to keep connection aliveenabled : true,// Send ping message every 2sintervalMs : 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
兼容性 (React Native)
httpSubscriptionLink 使用了 EventSource API、Streams API 和 AsyncIterator,这些 API 在 React Native 中不被原生支持,需要通过 ponyfill 实现。
实现 EventSource 的 ponyfill 时,建议选用基于 React Native 原生网络库的方案,而非基于 XMLHttpRequest API 的方案。使用 XMLHttpRequest 实现的 EventSource polyfill 在应用从后台唤醒后无法自动重连。推荐考虑使用 rn-eventsource-reborn 包。
Streams API 可通过 web-streams-polyfill 包实现 ponyfill。
AsyncIterator 可通过 @azure/core-asynciterator-polyfill 包实现 polyfill。
安装步骤
安装所需 polyfill:
- npm
- yarn
- pnpm
- bun
- deno
npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
yarn add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
pnpm add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
bun add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
deno add npm:rn-eventsource-reborn npm:web-streams-polyfill npm:@azure/core-asynciterator-polyfill
在链接被使用前(例如在添加 TRPCReact.Provider 的位置)将 polyfill 引入项目:
utils/api.tsxtsimport '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsxtsimport '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
完成 ponyfill 引入后,即可按照设置章节的说明继续配置 httpSubscriptionLink。
httpSubscriptionLink 配置选项
tstypeHTTPSubscriptionLinkOptions <TRoot extendsAnyClientTypes ,TEventSource extendsEventSourceLike .AnyConstructor = typeofEventSource ,> = {/*** The URL to connect to (can be a function that returns a URL)*/url : string | (() => string |Promise <string>);/*** Connection params that are available in `createContext()`* Serialized as part of the URL under the `connectionParams` query parameter*/connectionParams ?:|Record <string, string>| null| (() =>|Record <string, string>| null|Promise <Record <string, string> | null>);/*** Data transformer* @see https://trpc.io/docs/v11/data-transformers*/transformer ?:DataTransformerOptions ;/*** EventSource ponyfill*/EventSource ?:TEventSource ;/*** EventSource options or a callback that returns them*/eventSourceOptions ?:|EventSourceLike .InitDictOf <TEventSource >| ((opts : {op :Operation ;}) =>|EventSourceLike .InitDictOf <TEventSource >|Promise <EventSourceLike .InitDictOf <TEventSource >>);};
tstypeHTTPSubscriptionLinkOptions <TRoot extendsAnyClientTypes ,TEventSource extendsEventSourceLike .AnyConstructor = typeofEventSource ,> = {/*** The URL to connect to (can be a function that returns a URL)*/url : string | (() => string |Promise <string>);/*** Connection params that are available in `createContext()`* Serialized as part of the URL under the `connectionParams` query parameter*/connectionParams ?:|Record <string, string>| null| (() =>|Record <string, string>| null|Promise <Record <string, string> | null>);/*** Data transformer* @see https://trpc.io/docs/v11/data-transformers*/transformer ?:DataTransformerOptions ;/*** EventSource ponyfill*/EventSource ?:TEventSource ;/*** EventSource options or a callback that returns them*/eventSourceOptions ?:|EventSourceLike .InitDictOf <TEventSource >| ((opts : {op :Operation ;}) =>|EventSourceLike .InitDictOf <TEventSource >|Promise <EventSourceLike .InitDictOf <TEventSource >>);};
服务端 SSE 配置选项
tsexport interfaceSSEStreamProducerOptions <TValue = unknown> {ping ?: {/*** Enable ping comments sent from the server* @default false*/enabled : boolean;/*** Interval in milliseconds* @default 1000*/intervalMs ?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs ?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately ?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client ?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs ?: number;};}
tsexport interfaceSSEStreamProducerOptions <TValue = unknown> {ping ?: {/*** Enable ping comments sent from the server* @default false*/enabled : boolean;/*** Interval in milliseconds* @default 1000*/intervalMs ?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs ?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately ?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client ?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs ?: number;};}