标量选择反模式 (The Scalar Select Anti-Pattern) 2025年5月14日

我编写过很多有状态的服务,它们的核心都是一个事件循环:

async for event in events_incoming:
  await process(event)

每次我都需要重构这个循环。例如,导致我写这篇文章的直接原因是这个 TigerBeetle PR。让我把这次重构记录下来,这样下次一开始就能做对了!

标量选择 (Scalar Select)

假设我正在为某种编程语言实现一个 LSP server。 主要有三个事件来源:

在这种情况下,“显而易见”的事件循环设置如下所示:

events_incoming: Stream[Event] = merge(
  events_file_system,
  events_compiler,
  events_lsp,
)
async for event in events_incoming:
  await process(event)

这里,merge 是一个操作符,它接受多个事件流,并将它们合并为一个。 这相当于一个用高阶函数编写的

loop {
  select! {
    ...
  }
}

关键观察 (Key Observation)

关键在于,事件流是进程外部的,由外部 IO 驱动。 你并不知道或无法控制用户_何时_在输入! 而且 process(event) 需要时间。 这意味着当我们完成当前事件的处理时,可能已经有多个事件“准备就绪”,已经存在于我们进程的地址空间中。 我们的“标量选择”会选择其中任意一个,这可能会产生很多开销。

影响 (Implications)

如果你不忽略在处理完前一个事件产生的延迟之后,同时有多个事件可用的事实,你可以应用以下一些具体的优化:

优先级排序

最明显的一个,我们可以选择处理事件的顺序。 对于 LSP 示例,如果你有一个代码补全请求和一个文件修改请求,你希望首先处理文件修改。 经验法则是:写入优先于读取,读取优先于接受(新客户端)。

选择性反压

作为优先级排序的一种极端形式,你可以决定完全不处理特定类型的请求,从而对特定输入施加反压,同时允许其他输入继续进行。

消除

通常,可以根据后续事件完全丢弃一个请求。 例如,如果有一个文件修改完全替换了它的文本,那么之前的所有更改都可以安全地丢弃。

合并

最后,即使不可能完全消除请求,同时处理多个请求通常也更有效。 例如,如果我们有两个增量文件修改事件(例如“在偏移量 `92` 处插入 `'hello'`”),那么首先将它们合并为一个更大的更改是有意义的。 LSP server 会在应用修改后启动一个任务来计算诊断结果。 但是如果我们有两个修改,我们希望在启动单个诊断任务之前应用这两个修改。

数据导向一切 (Data Oriented All The Things)

一旦你看到了问题(这是最难的部分),解决方案就如预期的那样:永远都要批量处理,忘记单数,for 循环下推,变成多个!

换句话说,我们希望将一次只给我们一个事件的标量选择,变成批量选择,一次性给出所有已经收到的事件。 在低负载下,我们将获得单例批次。 但是随着负载的增加,批量大小将会增加,从而以亚线性的速度增加我们的负载!

所以,类似于这样:

events_incoming: Stream[Event] = merge(
  events_file_system,
  events_compiler,
  events_lsp,
)
events_incoming_batched: Stream[List[Event]] =
  batch_stream(events_incoming)
async for event_batch in events_incoming_batched:
  batch: List[Event] = coalesce(event_batch)
  for event in batch:
    await process(event)

秘诀在于 batch_stream 函数,它会等待至少一个事件可用,但会提取所有可用的事件:

async def batch_stream(
  stream: Stream[T]
) -> Stream[List[T]]:
  while True:
    event: T = await stream.next()
    batch: List[T] = [event]
    while event := stream.next_non_blocking():
      batch.append(event)
    yield batch

消息传递时永远都要批量处理!