A Visual Journey Through Async Rust
Async Rust 可视化之旅
可视化
所有的可视化代码都可以在这里找到。
为了可视化时间的流逝,我们将绘制一些形状。一个简单的正弦波就足够了。首先,我们创建一个异步计算 N
个正弦函数值的 future。在每次迭代中,我们计算一个值并 yield_now().await
,将执行权让给其他 future。我们将这些计算出的值发送到一个 channel,稍后使用 matplotlib 和 pyo3 以图形方式显示结果。
/// 每个计算的正弦值都是我们跟踪的样本
struct Sample {
fut_name: String,
value: f32,
t: u128
}
async fn produce_sin(run_start: Instant, fut_name: String, tx: mpsc::UnboundedSender<Sample>) {
for i in 1..N {
let t = run_start.elapsed().as_micros();
let value = sin(i);
tx.send(Sample { fut_name, value, t }).unwrap();
// 让出执行权,以便其他 future 可以执行它们的操作
tokio::task::yield_now().await;
}
}
现在,让我们创建几个这样的 future,并排查看两个正弦波的计算结果:
#[tokio::main]
async fn main() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut futs = Vec::new();
let run_start = Instant::now();
futs.push(produce_sin(run_start, "fut1", tx.clone()).boxed());
futs.push(produce_sin(run_start, "fut2", tx.clone()).boxed());
futures::future::join_all(futs).await;
drop(tx);
plot_samples(rx).await;
}
我们得到的是:
很好!我们设法以有史以来最复杂的方式绘制了两个正弦波! 而且 两张图是彼此平行绘制的。难道 Tokio future 实际上是并行的,而不仅仅是并发的吗?让我们仔细看看。
首先,让我们将计算正弦值的时间添加到我们生成的 sample 中,并更新绘图代码以在图形中显示此持续时间。
struct Sample {
fut_name: String,
value: f32,
start_t: u128,
end_t: u128
}
async fn produce_sin(run_start: Instant, fut_name: String, tx: mpsc::UnboundedSender<Sample>) {
for i in 1..N {
let start_t = run_start.elapsed().as_micros();
let value = sin(i);
let end_t = run_start.elapsed().as_micros();
tx.send(Sample { fut_name, value, start_t, end_t }).unwrap();
tokio::task::yield_now().await;
}
}
这是我们现在看到的:
蓝色块表示我们计算正弦值所花费的时间。换句话说,这是我们的 future 由 runtime 执行的时间,也就是被 polled 的时间。请注意,为了便于可视化,我们的 sin()
函数内置了 100 微秒的同步睡眠。这有助于使时序更加突出和统一。
现在,让我们稍微放大到前几个 sample:
啊哈!正弦值的计算是交替进行的!当第一个 future 在其调用时计算正弦值时,另一个 future 处于空闲状态,或者更正确的说法是,第二个 future 让出 了执行权,让另一个 future 继续其工作。
这正是 concurrency (并发) 和 parallelism (并行) 之间的区别。如果 async runtime 以 并行 方式运行 future,我们将会看到所有蓝色块一个接一个地排列在下方,或多或少在相同的时间范围内。
CPU 密集型代码
在 async 编程中,“CPU 密集型代码会阻塞 async executor”是一个常识。从 Node.js 和 Python 的 asyncio 到 Rust 的 async executor,CPU 绑定的代码都是魔鬼。它会减慢甚至挂起不同的并发 async 操作。那么,“CPU 密集型”到底有多密集呢?计算 sin()
是否密集? SHA1 呢? JSON 解析呢?
让我们看看 CPU 密集型代码如何影响我们的可视化。我们将定义一个新的 sin_high_cpu()
方法。此正弦生成方法比常规 f32::sin
的 CPU 密集程度更高;事实上,它需要 500 微秒。
让我们添加一个使用 sin_high_cpu()
生成正弦值的 future:
#[tokio::main]
async fn main() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut futs = Vec::new();
let run_start = Instant::now();
futs.push(produce_sin(run_start, "fut0", tx.clone()).boxed());
futs.push(produce_sin(run_start, "fut1", tx.clone()).boxed());
futs.push(produce_sin_high_cpu(run_start, "high cpu", tx.clone()).boxed());
futures::future::join_all(futs).await;
drop(tx);
plot_samples(rx).await;
}
这是生成的图:
正如预期的那样,produce_sin_high_cpu()
完成需要更长的时间,几乎 500 毫秒,而前一个示例中为 160 毫秒。但是请注意,其他 future,即使用常规 sin()
方法的 future,也花费了相同的时间。
放大的视图显示了发生了什么:
CPU 繁重的 future 占用了所有的 CPU。当它工作时,其他两个 future 等待它让出,并且无法执行它们的快速操作。这演示了 CPU 密集型代码的效果。即使是 500 微秒的“短”操作也会影响其他并发运行的 future。
这个测试现实吗?你的 async 代码是否循环并执行小的 CPU 绑定任务?想想处理大量多个请求的 HTTP 服务器、解析传入和传出的 JSON 字符串、发送和接收数据库查询结果等。另一个常见的例子是消息队列消费者。这是一个重要的见解:使用单个 Tokio task 会限制多核利用率。 你的机器可能具有多个可用核心,但你需要利用它们。一种方法是生成 Tokio task。让我们接下来检查一下。
Task
与 Node.js 不同,Rust 的 Tokio 允许我们生成一个新的 Task
并在其中运行 future。在多线程 runtime中,Tokio 将创建 worker threads,这些线程将托管和运行 task。默认情况下,Tokio 的线程池将为 CPU 的每个核心维护一个线程,此外还有主线程。我在一个 4 核的 GitHub workspace 上工作,因此我有 1+4 个可用线程。
让我们在 Tokio task 中生成 produce_sin_high_cpu
,看看它如何影响绘图:
#[tokio::main]
async fn main() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut futs = Vec::new();
let run_start = Instant::now();
futs.push(produce_sin(run_start, "fut0", tx.clone()).boxed());
futs.push(produce_sin(run_start, "fut1", tx.clone()).boxed());
futs.push(
tokio::spawn(produce_sin_high_cpu(run_start, "spawned", tx.clone()).boxed())
.map(|_| ()) // 我们需要将返回值替换为 () 以匹配其他 future
.boxed(),
);
futures::future::join_all(futs).await;
drop(tx);
plot_samples(rx).await;
}
这是生成的图:
并放大:
太棒了!我们的前两个 future 再次生成快速正弦波,而 CPU 绑定的 sin_high_cpu
受到限制,不会影响代码的其余部分。请注意,当他们不需要与 sin_high_cpu
竞争 CPU 时间时,前两个 future 现在完成 N
个值的计算速度要快得多。
这是一个重要的教训:生成一个新的 task 使我们能够轻松地利用 CPU 的多个核心,并让“生成”的 future 与其他两个 future 并行 执行。
正如你们中更敏锐的人可能已经注意到的那样,我根据执行代码的线程,以不同的颜色对执行时间进行了着色。“fut0”和“fut1”在主线程(蓝色)上执行,而“生成”的 future 在某个 worker thread(绿松石色)上执行。
更多 Task
好的,所以如果 Tokio 的多线程 runtime 有 1+4 个可用线程,那么如果我们生成多个 task 会发生什么?让我们为绘图添加更多 sin_high_cpu
task:
#[tokio::main]
async fn main() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut futs = Vec::new();
let run_start = Instant::now();
futs.push(produce_sin(run_start, "fut0", tx.clone()).boxed());
for i in 1..7 {
let fut_name = format!("spawned{i}");
futs.push(
tokio::spawn(produce_sin_heavy(run_start, fut_name, tx.clone()).boxed())
.map(|_| ())
.boxed(),
);
}
futures::future::join_all(futs).await;
drop(tx);
plot_samples(rx).await;
}
这是生成的图:
并放大:
Tokio 在线程之间协调 task。每个 future polling 可能会在不同的线程中运行。由于我们没有足够的可用线程,我们的 CPU 绑定问题再次显现,有时一个 task 可能会阻碍其他 task 上的工作。有意思。生成新的 task 可以提高并行性,但有一个硬性限制。这绝对是我们应该记住的事情。在我们的示例中,看看“spawned2” future。请注意,此 future 和 task 如何与棕色和灰色线程上的“spawned4”和“spawned5”竞争。
Spawn Blocking
Tokio 工具带中另一个相关的工具是 tokio::task::spawn_blocking()。 spawn_blocking()
将在专用的线程池(blocking 池)中生成一个(非 async)代码块。此线程池维护一个更大的线程池。
让我们添加一个在 spawn_blocking
调用下运行 sin_heavy()
的正弦生成器版本:
async fn produce_sin_heavy_blocking(
run_start: Instant,
fut_name: String,
tx: mpsc::UnboundedSender<Sample>,
) {
for i in 1..N {
let start = run_start.elapsed().as_micros();
let tx = tx.clone();
let (t_id, value) = tokio::task::spawn_blocking(move || {
let value = sin_heavy(i);
let t_id = thread_id::get();
(t_id, value)
})
.await
.unwrap();
let end = run_start.elapsed().as_micros();
let sample = Sample { fut_name, value, start, end, thread_id: t_id };
tx.send(sample).unwrap();
tokio::task::yield_now().await;
}
}
并将这些添加到我们的可视化中:
for i in 1..7 {
futs.push(
produce_sin_heavy_blocking(run_start, format!("spawn_blocking{i}"), tx.clone())
.boxed(),
);
}
这是生成的图:
并放大:
我不知道你怎么想,但我发现这个放大的图非常令人满意。每个正弦计算都是在空闲线程上完成的,并且我们已经达到了峰值效率。当然,我们需要记住,我们并没有凭空增加 CPU 核心。毕竟,其中一些线程运行在同一核心上,上下文切换和共享资源。
事实上,spawn_blocking
并非始终是 CPU 繁重任务的最佳选择。通常,我们希望为这类代码分配有限数量的线程。Async: What is blocking? 是 Alice Ryhl 的一篇不错的文章,其中详细介绍了并调查了一些替代方案。
我们学到了什么
像这样可视化 future runtime 真的让人顿悟。并发和并行执行之间的区别显而易见,并且多核利用率变得直观。
现在更容易推理 Tokio 的行为,即使对它的代码库不是很熟悉。Rust 中的 async 编程很微妙,但我希望这篇文章能帮助你更好地掌握它。
附录:运行演示代码
演示代码使用一些 Python 代码来绘制可视化。要运行它:
- 确保你已安装 rye
- 安装 Python 依赖项:
rye sync
- 激活 Python 虚拟环境:
source .venv/bin/activate
- 运行代码:
cargo run