github home solutions discover blog about | Light Dark 无锁 Rust:如何在火中造过山车 (Lock-Free Rust: How to Build a Rollercoaster While It's on Fire) 发布于 2025-05-12 作者:Julian Goldstein

“那面旗帜上的每一针,都是对无锁线程安全的承诺。std::atomic 就是那根针。” — Betsy Ross, Federalist Papers, Draft 29 (已压制), 1782

系好安全带,戴上头盔,和你的爱人告别吧,因为接下来的几分钟,我将教你关于 Rust 中无锁数据结构的知识,超出任何一个理智的人在没有精神评估的情况下应该合理知道的范围。

所以坐下来,闭上嘴,假装你理解内存排序,因为线程安全只是一种社会结构,而 Ordering::Relaxed 只是加了额外步骤的否认。

TL;DR (太长不看)

我们将构建 LockFreeArray<T, N>,这是一个固定大小的无锁数组,用于存储堆分配的值。它使用 atomics 和一个 freelist 来跨线程插入和获取值,无需锁。你将学到:

如果你喜欢阅读并想看到更多类似的内容,请查看我们的产品 yeet 或我们的 sandbox。谢谢。

无锁:危险的十项全能

无锁编程的存在,就像人们在没有绳索的情况下徒手攀爬悬崖一样:它速度快,很优雅,但如果你做错了,绝对会要了你的命。在 yeet,我们已经成为了这方面的大师。特别是在为来自 BPF 环形缓冲区的大量事件的实时流构建高性能优先级队列时。

使用 Mutex<T>RwLock<T>,你可以获得温暖、模糊的保证——互斥、公平以及编译时安全的舒适性,但以牺牲速度为代价。另一方面,使用无锁,你可以获得速度——但保证它的安全性完全取决于你。没有等待,没有阻塞,没有帮助。

这就像乘坐过山车和在半空中建造一辆着火的过山车之间的区别——而你同时也是乘客。

Mr. Bones Wild Ride

恭喜——如果你还在阅读,你已经正式决定无视你的治疗师,抛弃 borrow checker,并直接面对并发。

认识 Atomics

你不需要全部的 atomics——只需要少数几个危险的:

AtomicPtr<T>

把它想象成原始指针,但在一个共享的闹鬼的房子里。你可以在线程之间传递所有权,但如果你搞砸了,未定义行为的幽灵会在运行时拜访你。

AtomicUsize

用于索引和 freelist 链接。它只是一个普通的数字,只不过 CPU 的竞争条件警报 24/7 全天候监控着它。

Ordering::{Acquire, Release, AcqRel, Relaxed}

这些决定了其他线程 何时 可以看到你的更改。使用错误的排序,你的写入会无序到达,就像在烤箱预热之前就送了蛋糕一样。

就是这些。三个工具。无数种在裹着 unsafe 的医院里醒来的方式。

Ouch

无锁数组:风险自负的索引

这不是你友好的邻居 Vec<T>。这是 Vec<Violence>

没有调整大小。没有边界检查。没有舒适的锁来在你线程开始竞争时握住你的手。只有原始指针,atomics,以及来自略读了一半的文档、猛灌加油站咖啡和低语“这有多难?”的自信。

我们将其称为 LockFreeArray<T, N>

把它想象成一个固定大小的并发槽缓冲区——适用于任务池、按需工作者或无法承受锁的资源槽。

它有三个方法:

new() 初始化数组并设置可用槽的 freelist。 try_insert(value: T) -> Result<usize, T> 尝试将一个值放在一个空槽中。如果成功,返回索引。如果失败,将你的值返回,温柔但坚定。 take(index: usize) -> Option<T> 移除并返回给定 index 处的值 T。如果它已经为空,则返回 None

use std::array;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
/// Index tagging to solve the ABA problem.
fn pack(index: usize, tag: usize) -> usize {
  (tag << 32) | index
}
/// Index tagging to solve the ABA problem.
fn unpack(value: usize) -> (usize, usize) {
let index = value & 0xFFFF_FFFF;
let tag = value >> 32;
  (index, tag)
}
pub struct LockFreeArray<T: Send + Sync, const N: usize> {
slots: [AtomicPtr<T>; N],
freelist_head: AtomicUsize, // stores (tag << 32) | index
next: [AtomicUsize; N],
}
impl<T: Send + Sync, const N: usize> LockFreeArray<T, N> {
pub fn new() -> Self {
let slots = array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
let next = array::from_fn(|i| AtomicUsize::new(if i + 1 < N { i + 1 } else { N }));
Self {
      slots,
      freelist_head: AtomicUsize::new(pack(0, 0)),
      next,
    }
  }
pub fn try_insert(&self, value: T) -> Result<usize, T> {
let boxed = Box::into_raw(Box::new(value));
loop {
let old = self.freelist_head.load(Ordering::Acquire);
let (head, tag) = unpack(old);
if head == N {
let value = unsafe { *Box::from_raw(boxed) };
return Err(value);
      }
let next_index = self.next[head].load(Ordering::Relaxed);
let new = pack(next_index, tag.wrapping_add(1));
if self
        .freelist_head
        .compare_exchange(old, new, Ordering::AcqRel, Ordering::Relaxed)
        .is_ok()
      {
self.slots[head].store(boxed, Ordering::Release);
return Ok(head);
      }
// Retry if compare_exchange failed
    }
  }
pub fn take(&self, index: usize) -> Option<T> {
if index >= N {
return None;
    }
let ptr = self.slots[index].swap(ptr::null_mut(), Ordering::AcqRel);
if ptr.is_null() {
return None;
    }
let value = unsafe { *Box::from_raw(ptr) };
loop {
let old = self.freelist_head.load(Ordering::Acquire);
let (head, tag) = unpack(old);
self.next[index].store(head, Ordering::Relaxed);
let new = pack(index, tag.wrapping_add(1));
if self
        .freelist_head
        .compare_exchange(old, new, Ordering::AcqRel, Ordering::Relaxed)
        .is_ok()
      {
break;
      }
    }
    Some(value)
  }
}

内存安全只是一个建议

恭喜——你现在拥有一个 LockFreeArray<T, N>,可以在线程之间插入和删除值,而无需获取任何锁。你正在实现梦想。

此时,你可能想知道:

“为什么任何一个理性的人会故意构建如此不稳定的东西?”

两个字:现金。

无锁数据结构不仅仅是一种炫耀——它们还很快。就像 “你的 CPU 缓存正在哭泣,但你的延迟是传奇” 那样快。

Cash Money

为了说明这一点,这里有一些使用附录中的代码获得的性能基准测试:

Running 10 trials of producer-consumer workloads
Producers: 6, Consumers: 2, Array Size: 100
------------------------------------------------------
Trial    LockFreeArray (ms) Mutex<Vec<Option<T>>> (ms)    Diff (%)
1            297.616         2347.711     87.32%
2            298.942         1872.945     84.04%
3            273.726         1861.038     85.29%
4            334.799         1494.331     77.60%
5            324.014         2206.664     85.32%
6            307.311         2199.226     86.03%
7            316.705         1297.407     75.59%
8            299.890         1973.732     84.81%
9            323.294         1633.112     80.20%
10           375.024         2627.307     85.73%
Mean          315.132         1951.347     83.19%
Std Dev         25.882          386.424      3.77%
🏁 Winner: LockFreeArray (faster by 83.19% on average)

这正是我们在 product 中需要实现的速度。

Freelist:穷人的分配器

该设计的核心是一个 freelist ——一个可用槽的内部链表。我们不是每次都扫描空索引,而是从 freelist 中弹出索引,并在我们 take() 时将它们推回去。这就像 Vec<T> 有一个后巷表亲,他说:

“没有调整大小。没有边界检查。只有感觉。”

slots: [AtomicPtr<T>; N] 中的每个 slot 都包含一个指向堆分配的 TNULL 的指针。

如果值为 NULL,则该槽是空闲的。如果它为非空(如 *T),则该槽已被占用。

slots: [AtomicPtr<T>; N]
  [0]   [1]   [2]   [3]   [4]
+--------+--------+--------+--------+--------+
| ptr  | ptr  | NULL  | NULL  | NULL  |
+--------+--------+--------+--------+--------+
  |    |
  |    |
  |    |
  v    V
 ["apple"] ["banana"]

next: [AtomicUsize; N] 数组构建了链表。每个索引都指向下一个。

next: [AtomicUsize; N] (freelist linkage between slots)
  [0]   [1]   [2]   [3]   [4]
+-------+-------+-------+-------+-------+
|  1  |  2  |  3  |  4  |  5  |
+-------+-------+-------+-------+-------+
  ^
  |
freelist_head

freelist_head: AtomicUsize 是该列表的头,因此新的插入首先占用 slots[1]

freelist walk:
  [1] -> [2] -> [3] -> [4] -> END

科学走得太远了吗?

此时,如果你认为 “这似乎是个坏主意,” 恭喜——你的生存本能仍然有效。

但你就在这里。

你已经使用原始指针编写了一个内存分配器,绕过了标准库中的每个锁,现在你的 freelist 变成了一个小型的、无人监管的线程狂欢,所有权规则只是礼貌的建议。

Has Science Gone Too Far?

你不再编写 Rust。你正在召唤 Rust

而 borrow checker 呢?他几小时前就离开了。他看到了 AtomicPtr<T>,丢掉了他的剪贴板并逃走了。

try_insert:深入危险

这是核心逻辑,为了最大的清晰度 和焦虑 添加了注释:

第 1 步:装箱该值

pub fn try_insert(&self, value: T) -> Result<usize, T> {
let boxed = Box::into_raw(Box::new(value));

我们获取你良好、纯真的 T,并使用 Box::new() 将其抛到堆上。

为什么?因为在线程之间传递原始指针就像用胶带将烟花绑在 Roomba 上,递给它一把牛排刀,然后大喊“去修理路由器!”——它在技术上是运动,但绝对不是进步。

我们使用 Box::into_raw() 来剥离安全导轨,并将其变成裸指针。此时,编译器正在冒汗,borrow checker 已经逃离现场,而你全权负责确保这个东西被释放。

你猜怎么着? 如果你搞砸了,并且稍后没有 Box::from_raw() 它?

🥳 恭喜,你刚刚泄漏了内存。 并且可能泄漏了你的理智。

Leo Clap

第 2 步:获取 freelist 的头部

loop {
let old = self.freelist_head.load(Ordering::Acquire);
let (head, tag) = unpack(old); // Unpack tagged index to defend against ABA.
if head == N {
// Reclaim value from Box, avoid leak
let value = unsafe { *Box::from_raw(boxed) };
return Err(value);
    }
    ...
  }

我们使用 Ordering::Acquire 来读取 freelist_head,并确保在此 之后 的所有读取都不能 在它之前 重新排序。

如果头是 N,则表示该列表为空——没有可用的槽。因此,我们安全地释放我们装箱的值,返回 Err(value) 并哭泣。

第 3 步:查看下一个节点并尝试声明该槽。

      ...
let next_index = self.next[head].load(Ordering::Relaxed);
let new = pack(next_index, tag.wrapping_add(1)); // Pack tagged index to defend against ABA.
if self
        .freelist_head
        .compare_exchange(old, new, Ordering::AcqRel, Ordering::Relaxed)
        .is_ok()
      {
self.slots[head].store(boxed, Ordering::Release);
return Ok(head);
      }
// Retry if compare_exchange failed
      ...

如果成功,next_index 会告诉我们下一个 freelist 头 应该是什么

我们在这里不需要严格的排序;我们只是在读取一个数字。

现在是关键时刻:compare_exchange,原子操作的皇冠上的宝石。

使用 compare_exchange 就像试图租一个 Craigslist 公寓,但前提是

你带着一个手提箱出现,准备搬进来——但前提是一切都 完全 像你在模糊的 iPhone 4 照片中看到的那样。

如果有任何不对劲?没门。你走人,尊严大部分完好无损。

这就是 compare_exchange:它是 带有或有条款的希望。

“嘿,我刚刚读取到freelist_head 在索引 head 处。如果在此期间没有其他线程更改该值,我想用 next_index 替换它。”

注意:在此代码段中,我们忽略了 ABA problem 以关注 Ordering 语义。

let head = self.freelist_head.load(Ordering::Acquire);
let next_index = self.next[head].load(Ordering::Relaxed);
if self
  .freelist_head
  .compare_exchange(head, next_index, Ordering::AcqRel, Ordering::Relaxed)
  .is_ok()
{
self.slots[head].store(boxed, Ordering::Release);
return Ok(head);
}

通过为 compare_exchange 选择 Ordering::AcqRel,你告诉 CPU:

“我正在锁定这笔 Craigslist 交易的双方——我不会搬出去,直到我知道新地方是我的,并且我不会搬进来,除非上一个人的怪异动漫海报肯定消失了——所有这些都在一次原子握手中完成。”

当 compare_exchange 成功时:

Initial state:
freelist_head = 1
next[1] = 3
Thread A: reads head = 1, next_index = 3
Thread A: compare_exchange(1 -> 3) ✅ succeeds
freelist_head = 3

当 compare_exchange 失败时:

Initial state:
freelist_head = 1
Thread A: reads head = 1
Thread B: steals it and sets freelist_head = 3
Thread A: compare_exchange(1 -> 3) ❌ fails
Thread A: retries

如果你使用 Ordering::Relaxed 而不是 Ordering::AcqRel 会怎样?

// BAD IDEA: Using Relaxed for everything
self.freelist_head.compare_exchange(head, next_index, Ordering::Relaxed, Ordering::Relaxed)

这就像说:

“只要它看起来大致像我在网上看到的地方,我就会搬进这个 Craigslist 公寓。我不需要检查以前的房客是否离开了。或者冲水了。或者把狼蛛带走了。”

现在想象一下这种情况:

欢迎来到数据竞争、内存损坏和未定义行为!

Thread A             Thread B
--------             --------
load freelist_head = 1      load freelist_head = 1
load next[1] = 3         load next[1] = 3
compare_exchange succeeds    compare_exchange fails (good!)
[RELAXED] store pointer...    load slots[1] → still NULL! 😱
                 think it's empty — takes it again

因为 Thread A 没有 保证对 slots[1] 的写入发生在其他线程可以看到 freelist 头部更改之前Thread B 过早地进入了。

这就是为什么 compare_exchange 需要是 Ordering::AcqRel

第 4 步:存储指针并像传奇一样消失

self.slots[head].store(boxed, Ordering::Release);
return Ok(head);

你已经像一只浣熊拆除熊陷阱以获得一片 Cheeto 一样,完成了 compare_exchange——鲁莽、不可能,但不可否认地有效。

你已经声明了该槽!

现在是 存储你装箱的指针 的时候,以便其他线程可以看到它——但 只有 在你所做的其他一切都安全锁定的 之后

这就是 Ordering::Release 的用武之地。这就像在说:

“我已经搬进了这个 Craigslist 公寓。我擦干净了台面,清理了冰箱,并挂上了一张励志猫海报。并且 现在 我才将该房间列为已占用。”

如果没有 Ordering::Release,CPU 可能会 重新排序你的写入,在 甚至搬入你的东西 之前 将房间列为可用。这就是室友如何被复制、线程读取垃圾以及你的程序段错误的原因。

take:一次回收一个槽的内存

try_insert 是一场高风险的土地争夺战,而 take 则是清理人员:它清理一个槽,回收内存,并将索引放回 freelist 上——没有锁,没有废话。

注意:在此代码段中,我们忽略了 ABA problem 以关注 Ordering 语义。

pub fn take(&self, index: usize) -> Option<T> {
if index >= N {
return None;
  }
let ptr = self.slots[index].swap(ptr::null_mut(), Ordering::AcqRel);
if ptr.is_null() {
return None;
  }
let value = unsafe { *Box::from_raw(ptr) };
loop {
let head = self.freelist_head.load(Ordering::Acquire);
self.next[index].store(head, Ordering::Relaxed);
if self
      .freelist_head
      .compare_exchange(head, index, Ordering::AcqRel, Ordering::Relaxed)
      .is_ok()
    {
break;
    }
  }
  Some(value)
}

swap 保证了独占访问。unsafe 块回收所有权。循环将索引推回 freelist,就像一个装满蜜蜂的烤面包机——声音大、鲁莽,而且不知何故仍然是合法的。

在该 load 上使用 Ordering::Acquire 就像打开一个可疑的 Craigslist 公寓门,只是在检查猫眼、听是否有奇怪的声音以及向你的朋友发送你的位置之后,以防万一。

你说:

“我会打开这扇门——但我 需要知道它后面的所有东西都完全在它应该在的位置,然后我才能迈出一步。”

在 CPU 领域,这意味着:

在其他线程释放 freelist 头部之前发生的所有内存写入现在都保证对你可见。

因此,如果另一个线程 将索引添加到 freelist,并使用 Ordering::Release 来执行此操作,则你的 Ordering::Acquire 可确保你 看到完全更新的下一个指针——而不是来自备用时间轴的某些闹鬼的半写入,在该时间轴中,线程中途放弃并启动了一个播客。

简而言之:

Ordering::Acquire = “在知道它安全之前,我不会触摸任何东西。”

Ordering::Release = “我完成了。这是安全的。你现在可以进入。”

现在,对于 freelist_head 上的 compare_exchange,我们选择 Ordering::AcqRel

回到 Craigslist 类比:

Ordering::AcqRel 就像把你的钥匙交给下一个房客——但只是在 你之后:

只有这样,你才更新列表说:“位置已开放。来吧。”

如果你没有使用 Ordering::AcqRel 怎么办?这就像在 从消防通道上下来 的同时 大喊“全是你的!”一样。

使用 Ordering::AcqRel

Thread A:   load freelist_head = 1
       load next[1] = 2
compare_exchange(1 → 2) ✅ succeeds (AcqRel)
       store ptr to slots[1] (Release)
Thread B:   load freelist_head = 2 (sees updated head)
       load next[2] = ...
compare_exchange(2 → ...) ✅ safe, slot[1] is fully written

使用 Ordering::Relaxed

Thread A:   load freelist_head = 1
       load next[1] = 2
compare_exchange(1 → 2) ✅ succeeds (Relaxed)
       store ptr to slots[1] ← DELAYED (write not visible yet)
Thread B:   load freelist_head = 2 (sees update too soon!)
       loads slots[1] → still NULL!
       thinks it's free — tries to insert again ❌
💀 DOUBLE ALLOC / USE AFTER FREE 💀

结论:你已经太深入了,而且它很美

你凝视着无锁编程的深渊,你 原子地交换了你的理智来换取一个指针,并向前迈进 而不是眨眼。

你已经用原始指针、atomics 和感觉构建了自己的线程安全数组。你不仅仅避开了 Mutex<T>——你把它留在了角落里哭泣,而你的 freelist 在停车场里做了甜甜圈。

你现在了解:

好吧,你可以:

但最重要的是,你已经了解到 无锁并不意味着无忧无虑。这是一场在电话亭里与物理和编译器进行的刀战——而你是拿着勺子的人。

如果你喜欢这篇文章或想看到以正确的方式完成的无锁数据结构的威力,请查看我们的 product

👀 特别优惠:为了纪念浣熊:接下来的 100 个注册 安装 yeet 的用户将赢得限量版 T 恤。

更新时间:中部标准时间上午 11:46 - 还剩 77 个

附录

基准测试代码

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
mod lockfree;
use lockfree::LockFreeArray;
const ARRAY_SIZE: usize = 100;
const PRODUCERS: usize = 6;
const CONSUMERS: usize = 2;
const OPS_PER_PRODUCER: usize = 100_000;
const TRIALS: usize = 10;
fn run_lockfree_trial() -> Duration {
let array = Arc::new(Box::new(LockFreeArray::<usize, ARRAY_SIZE>::new()));
let mut handles = Vec::new();
let start = Instant::now();
for _ in 0..PRODUCERS {
let array = Arc::clone(&array);
    handles.push(thread::spawn(move || {
for i in 0..OPS_PER_PRODUCER {
while array.try_insert(i).is_err() {
          std::hint::spin_loop();
        }
      }
    }));
  }
for _ in 0..CONSUMERS {
let array = Arc::clone(&array);
    handles.push(thread::spawn(move || loop {
for i in 0..ARRAY_SIZE {
let _ = array.take(i);
      }
    }));
  }
for handle in handles.into_iter().take(PRODUCERS) {
    handle.join().unwrap();
  }
  Duration::from_secs_f64(start.elapsed().as_secs_f64())
}
fn run_mutex_trial() -> Duration {
let vec = Arc::new(Mutex::new(vec![None; ARRAY_SIZE]));
let mut handles = Vec::new();
let start = Instant::now();
for _ in 0..PRODUCERS {
let vec = Arc::clone(&vec);
    handles.push(thread::spawn(move || {
for i in 0..OPS_PER_PRODUCER {
loop {
let mut guard = vec.lock().unwrap();
if let Some(pos) = guard.iter_mut().position(|v| v.is_none()) {
            guard[pos] = Some(i);
break;
          }
        }
      }
    }));
  }
for _ in 0..CONSUMERS {
let vec = Arc::clone(&vec);
    handles.push(thread::spawn(move || loop {
let mut guard = vec.lock().unwrap();
for val in guard.iter_mut() {
let _ = val.take();
      }
    }));
  }
for handle in handles.into_iter().take(PRODUCERS) {
    handle.join().unwrap();
  }
  Duration::from_secs_f64(start.elapsed().as_secs_f64())
}
fn summarize_trials(lockfree: &[Duration], mutex: &[Duration]) {
let lf_times: Vec<f64> = lockfree.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
let mx_times: Vec<f64> = mutex.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
let percent_diffs: Vec<f64> = mx_times
    .iter()
    .zip(&lf_times)
    .map(|(m, l)| ((m - l) / m) * 100.0)
    .collect();
  println!(
    "{:<10} {:>20} {:>25} {:>15}",
    "Trial", "LockFreeArray (ms)", "Mutex<Vec<Option<T>>> (ms)", "Diff (%)"
  );
for (i, ((lf, mx), diff)) in lf_times
    .iter()
    .zip(&mx_times)
    .zip(&percent_diffs)
    .enumerate()
  {
    println!("{:<10} {:>20.3} {:>25.3} {:>14.2}%", i + 1, lf, mx, diff);
  }
let mean_lf = mean(&lf_times);
let mean_mx = mean(&mx_times);
let mean_diff = mean(&percent_diffs);
let std_lf = std_dev(&lf_times, mean_lf);
let std_mx = std_dev(&mx_times, mean_mx);
let std_diff = std_dev(&percent_diffs, mean_diff);
  println!(
    "{:<10} {:>20.3} {:>25.3} {:>14.2}%",
    "Mean", mean_lf, mean_mx, mean_diff
  );
  println!(
    "{:<10} {:>20.3} {:>25.3} {:>14.2}%",
    "Std Dev", std_lf, std_mx, std_diff
  );
  println!();
if mean_lf < mean_mx {
    println!(
      "🏁 Winner: LockFreeArray (faster by {:.2}% on average)",
      mean_diff
    );
  } else {
    println!(
      "🏁 Winner: Mutex<Vec<Option<T>>> (faster by {:.2}% on average)",
      -mean_diff
    );
  }
}
fn mean(data: &[f64]) -> f64 {
  data.iter().sum::<f64>() / data.len() as f64
}
fn std_dev(data: &[f64], mean: f64) -> f64 {
let variance = data.iter().map(|v| (*v - mean).powi(2)).sum::<f64>() /