You might not need Websockets

或许你并不需要 WebSockets

2025年4月11日

WebSockets 是构建实时应用程序的强大工具,深受开发者喜爱,但你可能出于错误的原因在使用它们。让我们探讨一下 WebSockets 的陷阱,以及如何使用普通的 HTTP 来完成同样的工作。

什么是 WebSocket?

如果你是 Web 开发新手,或者之前没有听说过 WebSocket,那么它是一种使用 HTTP 作为传输协议在客户端和服务器之间打开双向通信通道的方式。用更通俗的话说,它是一种在客户端和服务器之间保持开放通信线路的方式,以便双方可以随时发送和接收消息。 (MDN Reference)

由于其宣传方式,人们很自然地认为 WebSocket 是编排客户端和服务器之间长期数据流的最佳(有时甚至是唯一)方式,例如实时应用程序。但在实践中,你可能不想使用它们的原因有很多:

WebSocket 消息不是事务性的

我看到很多实例中,WebSockets 被用作维护某种状态对象一致性的方式。例如,你使用 socket 的发送端来表示对某个对象的修改,而使用 socket 的接收端来表示该对象被这些修改改变后的状态。这样,如果有多个客户端监听同一个对象,它们都会看到相同的状态变化,而无需刷新页面。

# Client 1
>>> { command: "increment", amount: 5 }
<<< { event: "count", value: 5 }
>>> { command: "decrement", amount: 2 }
<<< { event: "count", value: 3 }
# Client 2
<<< { event: "count", value: 5 }
<<< { event: "count", value: 3 }

但是,如果你对状态对象设置某种不变条件呢?例如,你想确保计数永远不会为负数:

<<< { event: "count", amount: 5 }
>>> { command: "decrement", amount: 6 }
<<< { error: "count cannot be negative" }

这里的问题是,修改和错误消息之间没有关联,因为错误消息将与所有其他消息在同一流中接收。我们无法可靠地说“流中接收到的下一条消息”是先前命令的结果,因为服务器可能在这期间发送了任意数量的消息。

如果我们要更新 UI 以显示错误,则必须以某种方式链接错误事件(例如,在命令和错误消息中提供关联的请求 ID):

>>> { command: "decrement", amount: 6, requestId: "123" }
<<< { error: "count cannot be negative", requestId: "123" }

这变得更加笨拙,因为现在你必须跟踪你发送的每条消息,并且必须找到一种以幂等方式将错误事件冒泡回 UI 的方法。如果你想指示服务器已收到该命令,情况也是如此。在这种情况下,你还需要处理某些难以跟踪的边缘情况:

对于应该很简单的事情,它会产生太多的未知数和复杂性。如果你正在处理需要知道是否已接收的消息,那么最好使用更具事务性的协议(如 HTTP)来表示 socket 的发送端。

( < > ) = HTTP
( <<< >>> ) = WebSocket
# Success
> POST /increment '{ value: 5 }'
< 200 OK
<<< { event: "count", value: 5 }
#- (the update message still gets sent to all connected clients)
# Failure
> POST /decrement '{ value: 6 }'
< 400 Bad Request
#- (no update gets sent because the request failed)

我们已经有效地放弃了 socket 的发送端,并用 HTTP 替换了它,这意味着我们现在依靠 WebSockets 仅表示一个数据流(接收端)。事实证明,还有其他方法可以做到这一点,而不需要全双工连接的开销。 (我们稍后会讨论这个问题)

如果你发送的消息不一定需要确认(例如心跳或键盘输入),那么 WebSockets 是一个很好的选择。因此,本文的标题是,你可能不需要 WebSockets。

你必须管理 socket 的生命周期

使用 WebSockets 时,你不仅仅是随意发送和接收消息,你的应用程序还必须响应连接的打开和关闭。这意味着处理诸如“open”和“close”(或“error”)之类的事件,决定在重新连接尝试期间做什么,以及在不再需要连接时清理资源。

例如,浏览器中 WebSocket 的基本生命周期可能如下所示:

const socket = new WebSocket("wss://example.com/socket");
socket.addEventListener("open", () => {
 console.log("Socket opened");
});
socket.addEventListener("message", (event) => {
 console.log("Received message:", event.data);
});
socket.addEventListener("error", (err) => {
 console.error("Socket error:", err);
});
socket.addEventListener("close", () => {
 console.log("Socket closed. Attempting to reconnect…");
 // Potentially restart or schedule a new socket connection here
});

在一个典型的应用程序中,你可能需要重新启动一个关闭的连接,在 socket 关闭时缓冲消息,以及使用指数退避处理重试。忽略这些步骤中的任何一个都可能导致消息丢失、笨拙的用户体验或持久连接。相比之下,使用更简单的请求/响应模型(如 HTTP),生命周期更简单:每个请求开始、完成(或失败),然后你继续前进。

WebSocket 生命周期的额外复杂性是你可能不需要它的主要原因之一——除非绝对没有基于 socket 消息传递的替代方案(部分在上一节中演示),否则你最好使用更简单的通信模式。

它使你的服务器代码更加复杂

当一个新的 WebSocket 连接被启动时,你的服务器必须处理 HTTP “upgrade” 请求握手。服务器不是完成一个普通的请求,而是检查特殊的 header 以指示 WebSocket 握手,然后将连接从 HTTP 升级到持久 socket。这意味着对于每个初始连接,服务器必须解析和验证 WebSocket header,如 “Sec-WebSocket-Key”,并用正确的 “Sec-WebSocket-Accept” header 进行响应。 (MDN Reference)

升级机制本身需要额外的管道:你需要在你的服务器上为升级事件创建一个监听器,确认请求是有效的,完成握手,然后开始广播或接收数据。这不仅增加了更多的活动部件(与标准的请求/响应流相比),而且意味着仅理解 HTTP 不足以进行调试或故障排除——现在你正在处理一个专门的连接协议。

如果你也在处理与我们上面详细描述的类似的请求/响应语义,它可能会引入更多的复杂性,因为现在你的服务器代码是用 socket 的持久性质来编写的,而不是 HTTP 的短暂性质。此外,你的应用程序将需要管理所有的边缘情况:如果客户端尝试以不支持的方式进行升级怎么办?如果握手在中途失败或超时怎么办?需要重新组装的部分数据帧呢?

虽然库和框架在底层隐藏了一些这些细节方面做得非常好,但所有这些潜在的故障点都指向一个事实:如果你真的不需要双向、始终在线的 socket 的强大功能,握手成本和扩展的错误状态可能会掩盖任何性能或实时优势。

那么,替代方案是什么?

我们在前面的章节中非常简要地提到了它,但是如果我们能够抽象出 socket 的发送端,并且只留下接收端的一个单向数据流,我们可以使用一种更简单的通信模式。

HTTP Streaming

如果你更深入地了解 HTTP 的工作原理,你会发现它实际上是一种为流式传输数据而设计的协议。如果不是这样,我们就无法在不先加载整个文件的情况下流式传输视频,或者在不下载整个页面的情况下加载大型网站。

事实证明,该数据流不必被分割成一些大数据块。我们可以使用相同的原理来表示任何任意的数据流,例如我们依赖 WebSockets 的实时更新。

这是一个在服务器端 JavaScript 中使用我们之前的计数器示例的例子,展示了它会是什么样子:

let counter = 0;
let resolvers = new Set();
// this returns a promise that resolves when the next
// value is available.
async function nextValue() {
  return new Promise((resolve) => resolvers.add(resolve));
}
// look up what an `async generator` is if you're lost
// looking at this syntax. explaining it is out of scope
// for this post.
async function* valueGenerator() {
  // (this loop gets broken when the response stream is closed.)
  while (true) {
    // every time we get the next value from the iterator,
    // we yield the return from an awaited promise that resolves
    // when the next value is available.
    yield await nextValue();
  }
}
async function processCommand(command) {
  // this is what handles our "state updates"
  counter = nextCounterValue(command);
  // for each iterator (i.e. client that called `/stream`)
  // that's waiting on a value, we resolve the promise with
  // the new value
  for (const resolver of resolvers) {
    resolver(counter);
    resolvers.delete(resolver);
  }
}
// this is the function that computes the next state
// based on the command, and enforces any invariants
// that we want to have on the state.
function nextCounterValue(command) {
  let next = counter;
  if (command.type === "increment") {
    next += command.amount;
  } else if (command.type === "decrement") {
    next -= command.amount;
  }
  if (next < 0) {
    throw new Error("count cannot be negative");
  }
  return next;
}
// we use hono/express like syntax here, but you can
// use any server framework you want.
app.post("/increment", async (req, res) => {
  try {
    const { value } = await req.json();
    processCommand({ type: "increment", amount: value });
    return new Response("OK", 200);
  } catch (error) {
    return new Response(error.message, 400);
  }
});
app.post("/decrement", async (req, res) => {
  try {
    const { value } = await req.json();
    processCommand({ type: "decrement", amount: value });
    return new Response("OK", 200);
  } catch (error) {
    return new Response(error.message, 400);
  }
});
app.get("/stream", (req, res) => {
  // We can create a stream from any async iterator, so
  // we can pass the generator function that yields counter
  // updates as they become available.
  const stream = ReadableStream.from(valueGenerator());
  return new Response(stream);
});

然后,我们可以在浏览器端使用 Stream API 来读取传入的数据,并根据服务器发送的内容更新我们的 UI。

const response = await fetch("/stream");
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
  // wait for the next chunk of data
  // (will only come when a state update is made)
  const { done, value } = await reader.read();
  // when the server is done sending data, we break out of the loop
  if (done) break;
  // decode the chunk since data gets encoded over the network
  const chunk = decoder.decode(value);
  // update the UI with the new state
  updateUI(chunk);
}

通过这种设置,我们完全消除了对 WebSockets 的需求,同时仍然保持多个客户端之间的实时更新!

额外福利:使用 eventkit 使其变得简单

这是一个有点无耻的宣传,但这是我的帖子,所以你只能忍受它。

我一直在开发一个名为 eventkit 的库,它可以轻松地组合和观察异步数据流。如果你熟悉 observable 模式或 RxJS,它非常相似,但具有更好的副作用管理,并且使用 generator 构建。

为了更多地讨论计数器示例,这里是如何使用 eventkit 来实现相同的功能:

// server.ts
import { Stream, AsyncObservable } from "eventkit";

let counter = 0;
const stateUpdates$ = new Stream();

// when a new value is pushed into the stream,
// we update the counter
stateUpdates$.subscribe((value) => {
  counter = value;
});

function nextCounterValue(command) {
  let next = counter;
  if (command.type === "increment") {
    next += command.amount;
  } else if (command.type === "decrement") {
    next -= command.amount;
  }
  if (next < 0) {
    throw new Error("count cannot be negative");
  }
  return next;
}

app.post("/increment", async (req, res) => {
  try {
    const { value } = await req.json();
    const next = nextCounterValue({ type: "increment", amount: value });
    stateUpdates$.push(next);
    return new Response("OK", 200);
  } catch (error) {
    return new Response(error.message, 400);
  }
});

app.post("/decrement", async (req, res) => {
  try {
    const { value } = await req.json();
    const next = nextCounterValue({ type: "decrement", amount: value });
    stateUpdates$.push(next);
    return new Response("OK", 200);
  } catch (error) {
    return new Response(error.message, 400);
  }
});

app.get("/stream", (req, res) => {
  // We can use the `Stream` class as an async iterator
  // to create a stream from it in the exact same way.
  const stream = ReadableSteam.from(stateUpdates$);
  return new Response(stream);
});
// client.ts
import { AsyncObservable, map } from "eventkit";

const response = await fetch("/stream");
const decoder = new TextDecoder();
const counter$ = AsyncObservable.from(response.body);

counter$
  .pipe(map((value) => decoder.decode(value)))
  .subscribe(updateUI);

我在构建它时了解了 Stream API 的功能,并且认为它是你的下一个实时/基于事件的应用程序的绝佳选择。如果你有不同的看法,请打开一个 issue 并告诉我为什么。

如果我不告诉你至少去看看它,我就不是一个好的项目维护者。 我们还编写了一个单独的 HTTP Streaming 指南,如果你有兴趣,它会更深入地探讨这个主题。

感谢阅读这堵文字墙!如果你有任何问题/意见,我可以在 X/Twitter 上找到我。我还在那里发布更多精神分裂的漫谈,所以如果你喜欢这种东西,我会很感激你的关注。