标量选择反模式 (The Scalar Select Anti-Pattern)
文章探讨了“标量选择反模式”,即在处理异步事件时,每次只处理一个事件的低效问题。作者指出,这种模式在处理来自文件系统、编译器和客户端请求等多个来源的事件时,容易导致性能瓶颈。核心观点是:**应采用批量处理方式,将多个事件合并处理,以提高效率。** 通过优先级排序、选择性反压、消除冗余和合并事件等优化手段,并使用 `batch_stream` 函数实现批量处理,可以有效提升系统性能。
标量选择反模式 (The Scalar Select Anti-Pattern) 2025年5月14日
我编写过很多有状态的服务,它们的核心都是一个事件循环:
async for event in events_incoming:
await process(event)
每次我都需要重构这个循环。例如,导致我写这篇文章的直接原因是这个 TigerBeetle PR。让我把这次重构记录下来,这样下次一开始就能做对了!
标量选择 (Scalar Select)
假设我正在为某种编程语言实现一个 LSP server。 主要有三个事件来源:
- 来自用户输入代码或 git 操作的文件修改,
- 来自客户端的请求(“给我补全建议!”),
- 来自在后台运行的编译任务的输出,包含错误消息等。
在这种情况下,“显而易见”的事件循环设置如下所示:
events_incoming: Stream[Event] = merge(
events_file_system,
events_compiler,
events_lsp,
)
async for event in events_incoming:
await process(event)
这里,merge
是一个操作符,它接受多个事件流,并将它们合并为一个。 这相当于一个用高阶函数编写的
loop {
select! {
...
}
}
关键观察 (Key Observation)
关键在于,事件流是进程外部的,由外部 IO 驱动。 你并不知道或无法控制用户_何时_在输入!
而且 process(event)
需要时间。 这意味着当我们完成当前事件的处理时,可能已经有多个事件“准备就绪”,已经存在于我们进程的地址空间中。 我们的“标量选择”会选择其中任意一个,这可能会产生很多开销。
影响 (Implications)
如果你不忽略在处理完前一个事件产生的延迟之后,同时有多个事件可用的事实,你可以应用以下一些具体的优化:
优先级排序
最明显的一个,我们可以选择处理事件的顺序。 对于 LSP 示例,如果你有一个代码补全请求和一个文件修改请求,你希望首先处理文件修改。 经验法则是:写入优先于读取,读取优先于接受(新客户端)。
选择性反压
作为优先级排序的一种极端形式,你可以决定完全不处理特定类型的请求,从而对特定输入施加反压,同时允许其他输入继续进行。
消除
通常,可以根据后续事件完全丢弃一个请求。 例如,如果有一个文件修改完全替换了它的文本,那么之前的所有更改都可以安全地丢弃。
合并
最后,即使不可能完全消除请求,同时处理多个请求通常也更有效。 例如,如果我们有两个增量文件修改事件(例如“在偏移量 `92` 处插入 `'hello'`”),那么首先将它们合并为一个更大的更改是有意义的。 LSP server 会在应用修改后启动一个任务来计算诊断结果。 但是如果我们有两个修改,我们希望在启动单个诊断任务之前应用这两个修改。
数据导向一切 (Data Oriented All The Things)
一旦你看到了问题(这是最难的部分),解决方案就如预期的那样:永远都要批量处理,忘记单数,将 for
循环下推,变成多个!
换句话说,我们希望将一次只给我们一个事件的标量选择,变成批量选择,一次性给出所有已经收到的事件。 在低负载下,我们将获得单例批次。 但是随着负载的增加,批量大小将会增加,从而以亚线性的速度增加我们的负载!
所以,类似于这样:
events_incoming: Stream[Event] = merge(
events_file_system,
events_compiler,
events_lsp,
)
events_incoming_batched: Stream[List[Event]] =
batch_stream(events_incoming)
async for event_batch in events_incoming_batched:
batch: List[Event] = coalesce(event_batch)
for event in batch:
await process(event)
秘诀在于 batch_stream
函数,它会等待至少一个事件可用,但会提取所有可用的事件:
async def batch_stream(
stream: Stream[T]
) -> Stream[List[T]]:
while True:
event: T = await stream.next()
batch: List[T] = [event]
while event := stream.next_non_blocking():
batch.append(event)
yield batch
消息传递时永远都要批量处理!