WebSockets
本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →
您可以将 WebSockets 用于与服务器的全部或部分通信,客户端配置方法请参阅 wsLink。
本文档重点说明 WebSockets 的特定使用细节。关于订阅的常规用法,请参见 订阅指南。
创建 WebSocket 服务器
bashyarn add ws
bashyarn add ws
server/wsServer.tstsimport {applyWSSHandler } from '@trpc/server/adapters/ws';import {WebSocketServer } from 'ws';import {appRouter } from './routers/app';import {createContext } from './trpc';constwss = newWebSocketServer ({port : 3001,});consthandler =applyWSSHandler ({wss ,router :appRouter ,createContext ,// Enable heartbeat messages to keep connection open (disabled by default)keepAlive : {enabled : true,// server ping message interval in millisecondspingMs : 30000,// connection is terminated if pong message is not received in this many millisecondspongWaitMs : 5000,},});wss .on ('connection', (ws ) => {console .log (`++ Connection (${wss .clients .size })`);ws .once ('close', () => {console .log (`-- Connection (${wss .clients .size })`);});});console .log ('WebSocket Server listening on ws://localhost:3001');process .on ('SIGTERM', () => {console .log ('SIGTERM');handler .broadcastReconnectNotification ();wss .close ();});
server/wsServer.tstsimport {applyWSSHandler } from '@trpc/server/adapters/ws';import {WebSocketServer } from 'ws';import {appRouter } from './routers/app';import {createContext } from './trpc';constwss = newWebSocketServer ({port : 3001,});consthandler =applyWSSHandler ({wss ,router :appRouter ,createContext ,// Enable heartbeat messages to keep connection open (disabled by default)keepAlive : {enabled : true,// server ping message interval in millisecondspingMs : 30000,// connection is terminated if pong message is not received in this many millisecondspongWaitMs : 5000,},});wss .on ('connection', (ws ) => {console .log (`++ Connection (${wss .clients .size })`);ws .once ('close', () => {console .log (`-- Connection (${wss .clients .size })`);});});console .log ('WebSocket Server listening on ws://localhost:3001');process .on ('SIGTERM', () => {console .log ('SIGTERM');handler .broadcastReconnectNotification ();wss .close ();});
设置 TRPCClient 使用 WebSockets
您可以使用链接将查询和/或变更路由到 HTTP 传输,而订阅通过 WebSockets 传输。
client.tstsximport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';// create persistent WebSocket connectionconstwsClient =createWSClient ({url : `ws://localhost:3001`,});// configure TRPCClient to use WebSockets transportconstclient =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,}),],});
client.tstsximport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';// create persistent WebSocket connectionconstwsClient =createWSClient ({url : `ws://localhost:3001`,});// configure TRPCClient to use WebSockets transportconstclient =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,}),],});
认证/连接参数
如果是 Web 应用程序,可忽略本节内容,因为 cookies 会随请求自动发送。
要在 WebSockets 中进行身份认证,您可为 createWSClient 定义 connectionParams。该参数将在客户端建立 WebSocket 连接时作为第一条消息发送。
server/context.tstsimport type {CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';export constcreateContext = async (opts :CreateWSSContextFnOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tstsimport type {CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';export constcreateContext = async (opts :CreateWSSContextFnOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tstsimport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';importsuperjson from 'superjson';constwsClient =createWSClient ({url : `ws://localhost:3000`,connectionParams : async () => {return {token : 'supersecret',};},});export consttrpc =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,transformer :superjson })],});
client/trpc.tstsimport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';importsuperjson from 'superjson';constwsClient =createWSClient ({url : `ws://localhost:3000`,connectionParams : async () => {return {token : 'supersecret',};},});export consttrpc =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,transformer :superjson })],});
使用 tracked() 自动追踪 ID(推荐)
若您使用 tracked() 辅助函数 yield 事件并包含 id,客户端在断开连接后将自动重连,并在重连时通过 lastEventId 参数发送最后已知的 ID。
初始化订阅时可发送初始 lastEventId,浏览器在接收数据时会自动更新该值。
若基于 lastEventId 获取数据且事件完整性至关重要,建议使用 ReadableStream 或类似模式作为中间层(如 全栈 SSE 示例 所示),避免在基于 lastEventId 批量生成时忽略新产生的事件。
tsimportEventEmitter , {on } from 'events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is abortedsignal :opts .signal ,})) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is abortedsignal :opts .signal ,})) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
WebSockets RPC 规范
您可以通过深入查看 TypeScript 类型定义获取更多细节:
query / mutation
请求
tsinterfaceRequestMessage {id : number | string;jsonrpc ?: '2.0';method : 'query' | 'mutation';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
tsinterfaceRequestMessage {id : number | string;jsonrpc ?: '2.0';method : 'query' | 'mutation';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
响应
... 如下,或错误
tsinterfaceResponseMessage {id : number | string;jsonrpc ?: '2.0';result : {type : 'data'; // always 'data' for mutation / queriesdata :TOutput ; // output from procedure};}
tsinterfaceResponseMessage {id : number | string;jsonrpc ?: '2.0';result : {type : 'data'; // always 'data' for mutation / queriesdata :TOutput ; // output from procedure};}
subscription / subscription.stop
开始订阅
tsinterfaceSubscriptionRequest {id : number | string;jsonrpc ?: '2.0';method : 'subscription';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
tsinterfaceSubscriptionRequest {id : number | string;jsonrpc ?: '2.0';method : 'subscription';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
取消订阅需调用 subscription.stop
tsinterfaceSubscriptionStopRequest {id : number | string; // <-- id of your created subscriptionjsonrpc ?: '2.0';method : 'subscription.stop';}
tsinterfaceSubscriptionStopRequest {id : number | string; // <-- id of your created subscriptionjsonrpc ?: '2.0';method : 'subscription.stop';}
订阅响应结构
... 如下,或错误
tsinterfaceSubscriptionResponse {id : number | string;jsonrpc ?: '2.0';result :| {type : 'data';data :TData ; // subscription emitted data}| {type : 'started'; // subscription started}| {type : 'stopped'; // subscription stopped};}
tsinterfaceSubscriptionResponse {id : number | string;jsonrpc ?: '2.0';result :| {type : 'data';data :TData ; // subscription emitted data}| {type : 'started'; // subscription started}| {type : 'stopped'; // subscription stopped};}
连接参数
若连接通过 ?connectionParams=1 初始化,首条消息必须是连接参数。
tsinterfaceConnectionParamsMessage {data :Record <string, string> | null;method : 'connectionParams';}
tsinterfaceConnectionParamsMessage {data :Record <string, string> | null;method : 'connectionParams';}
错误处理
参见 https://www.jsonrpc.org/specification#error_object 或 错误格式化。
服务端到客户端的通知
{ id: null, type: 'reconnect' }
通知客户端在关闭服务器前重新连接。通过 wssHandler.broadcastReconnectNotification() 调用。