或许你并不需要 WebSockets
或许你并不需要 WebSockets
2025年4月11日
WebSockets 是构建实时应用程序的强大工具,深受开发者喜爱,但你可能出于错误的原因在使用它们。让我们探讨一下 WebSockets 的陷阱,以及如何使用普通的 HTTP 来完成同样的工作。
什么是 WebSocket?
如果你是 Web 开发新手,或者之前没有听说过 WebSocket,那么它是一种使用 HTTP 作为传输协议在客户端和服务器之间打开双向通信通道的方式。用更通俗的话说,它是一种在客户端和服务器之间保持开放通信线路的方式,以便双方可以随时发送和接收消息。 (MDN Reference)
由于其宣传方式,人们很自然地认为 WebSocket 是编排客户端和服务器之间长期数据流的最佳(有时甚至是唯一)方式,例如实时应用程序。但在实践中,你可能不想使用它们的原因有很多:
WebSocket 消息不是事务性的
我看到很多实例中,WebSockets 被用作维护某种状态对象一致性的方式。例如,你使用 socket 的发送端来表示对某个对象的修改,而使用 socket 的接收端来表示该对象被这些修改改变后的状态。这样,如果有多个客户端监听同一个对象,它们都会看到相同的状态变化,而无需刷新页面。
# Client 1
>>> { command: "increment", amount: 5 }
<<< { event: "count", value: 5 }
>>> { command: "decrement", amount: 2 }
<<< { event: "count", value: 3 }
# Client 2
<<< { event: "count", value: 5 }
<<< { event: "count", value: 3 }
但是,如果你对状态对象设置某种不变条件呢?例如,你想确保计数永远不会为负数:
<<< { event: "count", amount: 5 }
>>> { command: "decrement", amount: 6 }
<<< { error: "count cannot be negative" }
这里的问题是,修改和错误消息之间没有关联,因为错误消息将与所有其他消息在同一流中接收。我们无法可靠地说“流中接收到的下一条消息”是先前命令的结果,因为服务器可能在这期间发送了任意数量的消息。
如果我们要更新 UI 以显示错误,则必须以某种方式链接错误事件(例如,在命令和错误消息中提供关联的请求 ID):
>>> { command: "decrement", amount: 6, requestId: "123" }
<<< { error: "count cannot be negative", requestId: "123" }
这变得更加笨拙,因为现在你必须跟踪你发送的每条消息,并且必须找到一种以幂等方式将错误事件冒泡回 UI 的方法。如果你想指示服务器已收到该命令,情况也是如此。在这种情况下,你还需要处理某些难以跟踪的边缘情况:
- 如果 socket 在服务器可以处理命令之前关闭了怎么办?
- 如果由于某种原因你从未在 socket 上收到响应消息怎么办?
- 如果你正在处理大量的并发请求怎么办?
对于应该很简单的事情,它会产生太多的未知数和复杂性。如果你正在处理需要知道是否已接收的消息,那么最好使用更具事务性的协议(如 HTTP)来表示 socket 的发送端。
( < > ) = HTTP
( <<< >>> ) = WebSocket
# Success
> POST /increment '{ value: 5 }'
< 200 OK
<<< { event: "count", value: 5 }
#- (the update message still gets sent to all connected clients)
# Failure
> POST /decrement '{ value: 6 }'
< 400 Bad Request
#- (no update gets sent because the request failed)
我们已经有效地放弃了 socket 的发送端,并用 HTTP 替换了它,这意味着我们现在依靠 WebSockets 仅表示一个数据流(接收端)。事实证明,还有其他方法可以做到这一点,而不需要全双工连接的开销。 (我们稍后会讨论这个问题)
如果你发送的消息不一定需要确认(例如心跳或键盘输入),那么 WebSockets 是一个很好的选择。因此,本文的标题是,你可能不需要 WebSockets。
你必须管理 socket 的生命周期
使用 WebSockets 时,你不仅仅是随意发送和接收消息,你的应用程序还必须响应连接的打开和关闭。这意味着处理诸如“open”和“close”(或“error”)之类的事件,决定在重新连接尝试期间做什么,以及在不再需要连接时清理资源。
例如,浏览器中 WebSocket 的基本生命周期可能如下所示:
const socket = new WebSocket("wss://example.com/socket");
socket.addEventListener("open", () => {
console.log("Socket opened");
});
socket.addEventListener("message", (event) => {
console.log("Received message:", event.data);
});
socket.addEventListener("error", (err) => {
console.error("Socket error:", err);
});
socket.addEventListener("close", () => {
console.log("Socket closed. Attempting to reconnect…");
// Potentially restart or schedule a new socket connection here
});
在一个典型的应用程序中,你可能需要重新启动一个关闭的连接,在 socket 关闭时缓冲消息,以及使用指数退避处理重试。忽略这些步骤中的任何一个都可能导致消息丢失、笨拙的用户体验或持久连接。相比之下,使用更简单的请求/响应模型(如 HTTP),生命周期更简单:每个请求开始、完成(或失败),然后你继续前进。
WebSocket 生命周期的额外复杂性是你可能不需要它的主要原因之一——除非绝对没有基于 socket 消息传递的替代方案(部分在上一节中演示),否则你最好使用更简单的通信模式。
它使你的服务器代码更加复杂
当一个新的 WebSocket 连接被启动时,你的服务器必须处理 HTTP “upgrade” 请求握手。服务器不是完成一个普通的请求,而是检查特殊的 header 以指示 WebSocket 握手,然后将连接从 HTTP 升级到持久 socket。这意味着对于每个初始连接,服务器必须解析和验证 WebSocket header,如 “Sec-WebSocket-Key”,并用正确的 “Sec-WebSocket-Accept” header 进行响应。 (MDN Reference)
升级机制本身需要额外的管道:你需要在你的服务器上为升级事件创建一个监听器,确认请求是有效的,完成握手,然后开始广播或接收数据。这不仅增加了更多的活动部件(与标准的请求/响应流相比),而且意味着仅理解 HTTP 不足以进行调试或故障排除——现在你正在处理一个专门的连接协议。
如果你也在处理与我们上面详细描述的类似的请求/响应语义,它可能会引入更多的复杂性,因为现在你的服务器代码是用 socket 的持久性质来编写的,而不是 HTTP 的短暂性质。此外,你的应用程序将需要管理所有的边缘情况:如果客户端尝试以不支持的方式进行升级怎么办?如果握手在中途失败或超时怎么办?需要重新组装的部分数据帧呢?
虽然库和框架在底层隐藏了一些这些细节方面做得非常好,但所有这些潜在的故障点都指向一个事实:如果你真的不需要双向、始终在线的 socket 的强大功能,握手成本和扩展的错误状态可能会掩盖任何性能或实时优势。
那么,替代方案是什么?
我们在前面的章节中非常简要地提到了它,但是如果我们能够抽象出 socket 的发送端,并且只留下接收端的一个单向数据流,我们可以使用一种更简单的通信模式。
HTTP Streaming
如果你更深入地了解 HTTP 的工作原理,你会发现它实际上是一种为流式传输数据而设计的协议。如果不是这样,我们就无法在不先加载整个文件的情况下流式传输视频,或者在不下载整个页面的情况下加载大型网站。
事实证明,该数据流不必被分割成一些大数据块。我们可以使用相同的原理来表示任何任意的数据流,例如我们依赖 WebSockets 的实时更新。
这是一个在服务器端 JavaScript 中使用我们之前的计数器示例的例子,展示了它会是什么样子:
let counter = 0;
let resolvers = new Set();
// this returns a promise that resolves when the next
// value is available.
async function nextValue() {
return new Promise((resolve) => resolvers.add(resolve));
}
// look up what an `async generator` is if you're lost
// looking at this syntax. explaining it is out of scope
// for this post.
async function* valueGenerator() {
// (this loop gets broken when the response stream is closed.)
while (true) {
// every time we get the next value from the iterator,
// we yield the return from an awaited promise that resolves
// when the next value is available.
yield await nextValue();
}
}
async function processCommand(command) {
// this is what handles our "state updates"
counter = nextCounterValue(command);
// for each iterator (i.e. client that called `/stream`)
// that's waiting on a value, we resolve the promise with
// the new value
for (const resolver of resolvers) {
resolver(counter);
resolvers.delete(resolver);
}
}
// this is the function that computes the next state
// based on the command, and enforces any invariants
// that we want to have on the state.
function nextCounterValue(command) {
let next = counter;
if (command.type === "increment") {
next += command.amount;
} else if (command.type === "decrement") {
next -= command.amount;
}
if (next < 0) {
throw new Error("count cannot be negative");
}
return next;
}
// we use hono/express like syntax here, but you can
// use any server framework you want.
app.post("/increment", async (req, res) => {
try {
const { value } = await req.json();
processCommand({ type: "increment", amount: value });
return new Response("OK", 200);
} catch (error) {
return new Response(error.message, 400);
}
});
app.post("/decrement", async (req, res) => {
try {
const { value } = await req.json();
processCommand({ type: "decrement", amount: value });
return new Response("OK", 200);
} catch (error) {
return new Response(error.message, 400);
}
});
app.get("/stream", (req, res) => {
// We can create a stream from any async iterator, so
// we can pass the generator function that yields counter
// updates as they become available.
const stream = ReadableStream.from(valueGenerator());
return new Response(stream);
});
然后,我们可以在浏览器端使用 Stream API 来读取传入的数据,并根据服务器发送的内容更新我们的 UI。
const response = await fetch("/stream");
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
// wait for the next chunk of data
// (will only come when a state update is made)
const { done, value } = await reader.read();
// when the server is done sending data, we break out of the loop
if (done) break;
// decode the chunk since data gets encoded over the network
const chunk = decoder.decode(value);
// update the UI with the new state
updateUI(chunk);
}
通过这种设置,我们完全消除了对 WebSockets 的需求,同时仍然保持多个客户端之间的实时更新!
额外福利:使用 eventkit 使其变得简单
这是一个有点无耻的宣传,但这是我的帖子,所以你只能忍受它。
我一直在开发一个名为 eventkit 的库,它可以轻松地组合和观察异步数据流。如果你熟悉 observable 模式或 RxJS,它非常相似,但具有更好的副作用管理,并且使用 generator 构建。
为了更多地讨论计数器示例,这里是如何使用 eventkit 来实现相同的功能:
// server.ts
import { Stream, AsyncObservable } from "eventkit";
let counter = 0;
const stateUpdates$ = new Stream();
// when a new value is pushed into the stream,
// we update the counter
stateUpdates$.subscribe((value) => {
counter = value;
});
function nextCounterValue(command) {
let next = counter;
if (command.type === "increment") {
next += command.amount;
} else if (command.type === "decrement") {
next -= command.amount;
}
if (next < 0) {
throw new Error("count cannot be negative");
}
return next;
}
app.post("/increment", async (req, res) => {
try {
const { value } = await req.json();
const next = nextCounterValue({ type: "increment", amount: value });
stateUpdates$.push(next);
return new Response("OK", 200);
} catch (error) {
return new Response(error.message, 400);
}
});
app.post("/decrement", async (req, res) => {
try {
const { value } = await req.json();
const next = nextCounterValue({ type: "decrement", amount: value });
stateUpdates$.push(next);
return new Response("OK", 200);
} catch (error) {
return new Response(error.message, 400);
}
});
app.get("/stream", (req, res) => {
// We can use the `Stream` class as an async iterator
// to create a stream from it in the exact same way.
const stream = ReadableSteam.from(stateUpdates$);
return new Response(stream);
});
// client.ts
import { AsyncObservable, map } from "eventkit";
const response = await fetch("/stream");
const decoder = new TextDecoder();
const counter$ = AsyncObservable.from(response.body);
counter$
.pipe(map((value) => decoder.decode(value)))
.subscribe(updateUI);
我在构建它时了解了 Stream API 的功能,并且认为它是你的下一个实时/基于事件的应用程序的绝佳选择。如果你有不同的看法,请打开一个 issue 并告诉我为什么。
如果我不告诉你至少去看看它,我就不是一个好的项目维护者。 我们还编写了一个单独的 HTTP Streaming 指南,如果你有兴趣,它会更深入地探讨这个主题。
感谢阅读这堵文字墙!如果你有任何问题/意见,我可以在 X/Twitter 上找到我。我还在那里发布更多精神分裂的漫谈,所以如果你喜欢这种东西,我会很感激你的关注。