跳转至

原子操作与 C++ 内存模型

难度:⭐⭐⭐⭐⭐ | 建议用时:2 周 | 前置要求:C++语言核心/类型系统与值类别推导 值类别与类型系统、C++语言核心/RAII与智能指针 RAII、C++语言核心/错误处理与异常安全 错误处理、线程管理与互斥同步 线程管理与互斥同步


前置自测

答不出两题以上,建议先回到 线程管理与互斥同步 复习线程生命周期、互斥锁、条件变量和传感器队列。 原子操作不是“更快的 mutex”,而是 C++ 对跨线程可见性、指令重排和单个内存位置并发访问给出的低层接口。 如果没有 线程管理与互斥同步 的互斥同步基础,直接看 memory_order 很容易把它误解成性能开关。

  1. 两个线程同时读写同一个普通 bool stop,即使只是一写一读,为什么仍然是 data race?
  2. std::mutex 保护的是一段临界区,std::atomic<int> 保护的是什么?
  3. counter.fetch_add(1, std::memory_order_relaxed) 为什么可以用于统计计数,却不能用于发布一整帧位姿数据?
  4. volatile bool stop 为什么不能作为线程退出标志?
  5. 生产者先写 pose,再写 ready=true;消费者看到 ready=true 后读 pose。这中间至少需要什么同步关系?
  6. SPSC 队列里的 SPSC 分别是什么意思?如果变成两个生产者,原来的实现还能直接用吗?

本章目标

学完本章后,你应该能做到:

  1. 区分 data racerace condition 和普通逻辑竞态。
  2. 用 happens-before 解释为什么某些跨线程读写安全,某些跨线程读写未定义。
  3. 正确使用 std::atomic<bool>std::atomic<int>std::atomic<std::shared_ptr<T>>atomic_flag
  4. 判断什么时候需要 mutex,什么时候可以使用 atomic,什么时候应避免手写无锁结构。
  5. 解释 relaxedreleaseacquireacq_relseq_cst 的工程含义。
  6. 用 release-acquire 实现“发布数据,再发布标志”的消息传递。
  7. 用 relaxed 原子实现统计计数,而不误用它发布复杂状态。
  8. 读懂 CAS 循环和 compare_exchange_weak 的重试逻辑。
  9. 解释 ABA、false sharing、cache line 抖动为什么会让”无锁”并不一定更快。
  10. 为 Mini SLAM Concurrent Pipeline 增加原子停止标志、帧计数器和最新状态发布。
  11. 了解 C++23/26 并发新特性的方向:std::atomic<std::shared_ptr<T>>(C++20)的实际使用、hazard pointer 与 RCU 的工程价值。

知识树

原子操作与 C++ 内存模型
├── 为什么 mutex 不总是合适 ⭐⭐
│   ├── 停止标志 / 计数器 / 最新状态
│   └── atomic 保护单对象 vs mutex 保护临界区
├── data race vs race condition ⭐⭐⭐
│   ├── C++ 语言层 UB vs 业务逻辑错误
│   └── call_once / CAS 保护一次性操作
├── C++ 内存模型 ⭐⭐⭐⭐
│   ├── 三层不可见性(编译器 / CPU / 缓存)
│   ├── happens-before 关系
│   └── sequenced-before / synchronizes-with
├── memory_order 六种语义 ⭐⭐⭐⭐⭐
│   ├── relaxed:只保证原子性
│   ├── release / acquire:发布-获取配对
│   ├── acq_rel:读改写的双向屏障
│   ├── seq_cst:全序一致
│   └── consume:数据依赖(实践中退化为 acquire)
├── CAS 与无锁编程 ⭐⭐⭐⭐
│   ├── compare_exchange_weak/strong
│   ├── CAS 循环的重试逻辑
│   └── ABA 问题与解决方案
├── 工程陷阱 ⭐⭐⭐⭐
│   ├── false sharing 与 cache line 对齐
│   ├── volatile 不是同步工具
│   └── 无锁不一定更快
├── SPSC 队列与双缓冲 ⭐⭐⭐⭐
│   ├── ring buffer + release-acquire
│   └── 不可变快照发布
├── C++20/23/26 并发演进 ⭐⭐⭐
│   ├── atomic<shared_ptr>(C++20)
│   ├── hazard pointer(C++26 提案)
│   └── RCU(C++26 提案)
└── 并发测试与验证 ⭐⭐⭐⭐
    ├── ThreadSanitizer 实战
    └── 并发测试的维度覆盖

本章在课程中的位置

线程管理与互斥同步 讲的是线程和锁。 那里我们把共享数据包进 mutex 保护的临界区:

{
    std::lock_guard<std::mutex> lock(mutex_);
    queue_.push_back(frame);
}

这条路径稳定、可读、适合绝大多数共享数据。 但机器人系统里还有另一类问题:

场景 用 mutex 的问题 更合适的工具
停止标志 每次循环加锁太重 std::atomic<bool>
帧计数器 只需要原子递增 std::atomic<std::uint64_t>
最新状态发布 读者只要最近一帧 不可变快照或带生命周期协议的缓冲
SPSC 数据通道 单生产者单消费者假设明确 ring buffer + 原子下标
性能统计 只关心最终计数 relaxed 原子

本章不是要替代 线程管理与互斥同步。 它要回答一个更底层的问题:

当多个线程不通过 mutex 进入同一个临界区时,C++ 如何定义它们看到的内存?

这一章与机器人部署关系很紧。 很多桌面 x86 机器上“偶然能跑”的并发代码,到了 ARM 平台、Jetson、移动端或者高优化编译选项下就会暴露问题。 原因不是 ARM 更“脆弱”,而是 C++ 语言本来就没有承诺普通变量跨线程可见。

本质洞察:atomic 的本质不是“速度更快”,而是“把某个内存位置的跨线程访问提升为 C++ 语言承认的同步事件”。没有这个承认,编译器和 CPU 都可以做你没有预料到的重排、缓存和优化。


18.1 为什么 mutex 不总是合适:停止标志、计数器与最新状态 ⭐⭐

工程问题:每次循环都加锁会把简单信号变重

考虑一个 Tracking 线程:

while (!stop_requested) {
    processOneFrame();
}

主线程想让它退出:

stop_requested = true;

如果 stop_requested 是普通 bool,两个线程一个写、一个读,没有同步。 这不是“偶尔读到旧值”的小问题,而是 data race。 C++ 标准层面直接进入未定义行为。

线程管理与互斥同步 的直接做法是用 mutex:

std::mutex stop_mutex;
bool stop_requested = false;

bool shouldStop() {
    std::lock_guard<std::mutex> lock(stop_mutex);
    return stop_requested;
}

void requestStop() {
    std::lock_guard<std::mutex> lock(stop_mutex);
    stop_requested = true;
}

这能保证正确。 但如果 Tracking 循环每帧都要检查几十次停止标志,加锁会引入不必要的临界区成本。 停止标志只有一个独立布尔值,不需要保护一组复杂不变量。 这正是 atomic 的适用场景。

std::atomic<bool> stop_requested{false};

while (!stop_requested.load(std::memory_order_relaxed)) {
    processOneFrame();
}

stop_requested.store(true, std::memory_order_relaxed);

这里使用 relaxed 是因为停止标志只表达“是否退出”。 它不负责发布其他数据。 如果主线程还要先写配置、再用 stop 标志通知工作线程读取配置,那就不是这个例子。

反面失败:把所有共享变量都换成 atomic

atomic 只让单个原子对象的访问合法化。 它不会自动保护多个变量之间的关系。

下面这个结构表示位姿:

struct Pose2D {
    double x = 0.0;
    double y = 0.0;
    double yaw = 0.0;
};

如果把每个字段都改成 atomic:

struct AtomicPose2D {
    std::atomic<double> x{0.0};
    std::atomic<double> y{0.0};
    std::atomic<double> yaw{0.0};
};

读者可能看到:

x   来自第 100 帧
y   来自第 101 帧
yaw 来自第 100 帧

每个字段单独安全,不代表三者组成的位姿一致。 位姿的不变量是“三个字段来自同一次发布”。 这不是三个 atomic 能自然表达的。

更稳的方式是:

  1. 用 mutex 保护整个 Pose2D
  2. 或用双缓冲发布完整快照。
  3. 或用 seqlock 验证读取期间没有写入。

抽象不变量:atomic 保护一个对象,不保护一段算法

mutex 的抽象对象是临界区。 atomic 的抽象对象是单个原子对象。

工具 保护对象 适合表达
std::mutex 多个语句组成的临界区 队列、map、状态机、复杂不变量
std::atomic<T> 一个原子对象的读改写 标志、计数器、索引、版本号
condition_variable 等待条件 队列非空、状态变化
SPSC queue 特定拓扑的数据传递 单生产者单消费者
双缓冲 最新完整快照 位姿、状态、控制命令

判断是否能用 atomic,不是看“数据类型能不能原子化”,而是看“要保护的不变量是否只落在一个原子对象上”。

规则推导:从停止标志到版本号

停止标志只需要一个布尔值:

std::atomic<bool> stop{false};

帧计数器只需要一个整数递增:

std::atomic<std::uint64_t> frame_count{0};

void onFrame() {
    frame_count.fetch_add(1, std::memory_order_relaxed);
}

最新状态通常需要两个概念:

  1. 状态内容。
  2. 哪个状态是当前可读版本。

这时可以把“当前版本”做成 atomic,把状态内容放在两个普通缓冲区里:

struct RobotState {
    double x = 0.0;
    double y = 0.0;
    double yaw = 0.0;
};

std::array<RobotState, 2> states;
std::atomic<int> active_index{0};

写者写非活动缓冲区,写完后发布索引。 读者先读索引,再读取对应缓冲区。 如果只有一个写者,并且状态对象较小,这种模式可以避免读写同一对象。 但要想完全严格,还需要处理读者读取期间写者是否复用同一缓冲区的问题。 第 18.9 节会把这个问题展开。

工程边界:atomic 不是低成本万能药

atomic 可能比普通变量慢很多。 它会影响编译器优化,也可能触发 CPU 缓存一致性流量。 如果多个核心频繁写同一个 atomic,cache line 会在核心之间来回迁移。 这叫 false sharing 或 cache line bouncing 的相关问题。

工程上应先问:

问题 倾向选择
是否保护多个字段的一致性? mutex / 双缓冲 / seqlock
是否只记录一个独立计数? atomic relaxed
是否用一个标志发布其他数据? atomic release-acquire
是否需要等待队列非空? mutex + condition_variable
是否要求固定拓扑的高速通道? SPSC queue
是否读写频率不高且代码要清楚? mutex

代码验证:停止标志的最小模型

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

class CountingLoop {
public:
    void start() {
        thread_ = std::thread([this] { run(); });
    }

    void requestStop() {
        stop_.store(true, std::memory_order_relaxed);
    }

    void join() {
        if (thread_.joinable()) {
            thread_.join();
        }
    }

    std::uint64_t count() const {
        return count_.load(std::memory_order_relaxed);
    }

private:
    void run() {
        while (!stop_.load(std::memory_order_relaxed)) {
            count_.fetch_add(1, std::memory_order_relaxed);
        }
    }

    std::atomic<bool> stop_{false};
    std::atomic<std::uint64_t> count_{0};
    std::thread thread_;
};

int main() {
    CountingLoop counter;
    counter.start();
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    counter.requestStop();
    counter.join();
    std::cout << counter.count() << "\n";
}

这个例子里 stop_count_ 都不发布复杂数据。 relaxed 足够。 如果在 requestStop() 前写入一个配置对象,并希望工作线程看到它,就必须重新设计同步关系。

atomic 与 mutex 的关系可以类比"签名"与"保险箱"。保险箱(mutex)保护一整批文件——打开保险箱才能读写任何文件,关上保险箱才能让别人用。签名(atomic)只保护一张纸条——你可以在不开保险箱的情况下安全地传递这张纸条。如果你需要传递的是一张简单的 Yes/No 纸条(停止标志),用签名比开保险箱更轻量。但如果你需要传递一整套文件(位姿的 x/y/yaw),每张纸单独签名不能保证整套文件来自同一批次——还是需要保险箱。

⚠️ 编程陷阱:把多字段结构体的每个字段都改成 atomic 错误做法struct AtomicPose { atomic<double> x; atomic<double> y; atomic<double> yaw; }; 现象:读者可能看到 x 来自第 100 帧、y 来自第 101 帧——三个字段不构成一致快照。 根本原因:每个 atomic 只保护自己那个字段的原子性。三个 atomic 的读写之间没有关联,不能保证一起被观察到。 正确做法:用 mutex 保护整个结构体;或用双缓冲发布完整快照;或用 atomic<shared_ptr<const Pose>> 发布不可变对象。

💡 概念误区:认为 atomic 是"更快的 mutex" 新手想法:"atomic 不需要加锁解锁,所以比 mutex 快,应该尽量用 atomic 替代 mutex。" 实际上:atomic 和 mutex 解决不同层次的问题。atomic 保护单个原子对象的读改写;mutex 保护多个语句组成的临界区。把复杂不变量拆成多个 atomic 不是"更快的 mutex",而是"丢失了不变量保护"。 正确理解:先问"要保护的不变量是否只落在一个原子对象上"。是则考虑 atomic,否则用 mutex。

练习

  1. [分析题]:列出 3 个适合用 atomic 的场景和 3 个不适合的场景。对每个不适合的场景,解释为什么 mutex 更合适。
  2. [代码题]:实现一个 CountingLoop 类,用 atomic<bool> stop 控制退出,用 atomic<uint64_t> count 统计循环次数。验证 relaxed 内存序对这两个独立用途是否足够。
  3. [跨章综合题]:线程管理与互斥同步 的 MessageQueue 用 mutex + condition_variable 实现阻塞队列。本节的 atomic 标志用于停止控制。讨论:一个完整的 SLAM 流水线中,哪些同步用 mutex,哪些用 atomic?画出同步工具分布图。

18.2 Data Race 与 Race Condition:名字相近,本质不同 ⭐⭐⭐

工程问题:有些竞态是未定义行为,有些是业务逻辑错误

两个术语经常混用:

术语 C++ 层含义 后果
data race 多线程并发访问同一内存位置,至少一个写,且没有同步 未定义行为
race condition 程序结果依赖线程时序 可能是合法但逻辑错误

data race 是语言层问题。 一旦出现,编译器不再保证程序行为。

race condition 是逻辑层问题。 即使所有访问都用 mutex 或 atomic,逻辑仍可能错。

反面失败:用 atomic 消除 data race 后仍有逻辑竞态

下面的抢占式初始化没有 data race:

std::atomic<bool> initialized{false};
Resource resource;

void initIfNeeded() {
    if (!initialized.load(std::memory_order_acquire)) {
        resource.initialize();
        initialized.store(true, std::memory_order_release);
    }
}

如果两个线程同时看到 initialized == false,它们可能都调用 resource.initialize()。 atomic 让 initialized 的读写合法,但没有保护 resource.initialize() 的“只执行一次”不变量。 更进一步,release/acquire 只为 initialized 这个原子量建立同步——它并不保护 resource 本身。两个线程同时执行 resource.initialize() 就是对 resource 的并发写(没有任何 mutex 或 atomic 保护它),这本身又是一个 data race(未定义行为),而不只是“初始化了两次”的逻辑问题。 正确工具可能是 std::call_once

std::once_flag init_flag;
Resource resource;

void initIfNeeded() {
    std::call_once(init_flag, [] {
        resource.initialize();
    });
}

抽象不变量:C++ 先禁止 data race,再允许你处理逻辑竞态

可以把并发问题分成两层:

第 1 层:程序是否有定义?
  - 没有 data race
  - 对象生命周期有效
  - 同步关系足够

第 2 层:程序逻辑是否符合需求?
  - 是否只初始化一次
  - 是否读到同一帧状态
  - 是否满足实时约束
  - 是否不会丢掉关键数据

atomic 主要帮你通过第 1 层。 第 2 层仍要靠设计。

规则推导:什么情况下出现 data race

C++ 中 data race 的核心条件是:

  1. 两个 evaluation 位于不同线程。
  2. 它们访问同一个 memory location。
  3. 至少一个访问是写。
  4. 这些访问不是 atomic。
  5. 它们之间没有 happens-before。

例如:

bool stop = false;

void threadA() {
    stop = true;
}

void threadB() {
    while (!stop) {
        // work
    }
}

这是 data race。 把 stop 改成 atomic 后,访问本身合法:

std::atomic<bool> stop{false};

但如果 stop 同时承载其他对象的发布语义,就需要选择合适的内存序。

工程边界:mutex 也可能有逻辑竞态

下面的代码每个函数内部都加锁:

class Queue {
public:
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return data_.empty();
    }

    Frame pop() {
        std::lock_guard<std::mutex> lock(mutex_);
        Frame frame = data_.front();
        data_.pop_front();
        return frame;
    }

private:
    mutable std::mutex mutex_;
    std::deque<Frame> data_;
};

调用端写:

if (!queue.empty()) {
    Frame frame = queue.pop();
}

两个函数都无 data race。 但 empty()pop() 之间队列可能被另一个线程改变。 这是 race condition。 更好的接口是把检查和弹出合并:

std::optional<Frame> tryPop();

这说明并发接口设计不能只看每个函数是否加锁。 要看一个业务动作需要的完整不变量在哪里被保护。

代码验证:ThreadSanitizer 检查 data race

最小 data race:

#include <thread>

int counter = 0;

int main() {
    std::thread a([] { ++counter; });
    std::thread b([] { ++counter; });
    a.join();
    b.join();
}

用 ThreadSanitizer:

g++ -std=c++20 -fsanitize=thread -O1 -g race.cpp -o race
./race

改成 atomic:

#include <atomic>
#include <thread>

std::atomic<int> counter{0};

int main() {
    std::thread a([] { counter.fetch_add(1, std::memory_order_relaxed); });
    std::thread b([] { counter.fetch_add(1, std::memory_order_relaxed); });
    a.join();
    b.join();
}

data race 消失。 但这只能说明计数器访问合法,不能说明整个程序逻辑一定正确。

data race 和 race condition 的区别就像"违章驾驶"和"路线选错"。data race 是违反交通规则(C++ 语言规则)——闯红灯(无同步访问共享内存),交警(编译器)有权不再保证任何行为。race condition 是在合法驾驶的前提下,因为走了错误路线(业务逻辑时序不对)而到达错误目的地。消除 data race 让程序行为有定义,但不保证业务逻辑正确。

⚠️ 编程陷阱:用 atomic 消除 data race 后仍有逻辑竞态 错误做法:两个线程同时检查 if (!initialized.load()) 后都执行 resource.initialize()现象:资源被初始化两次,可能导致数据覆盖、资源泄漏或不一致状态。 根本原因:atomic 让 initialized 的读写合法,但不能把"检查+初始化"变成原子操作。两个线程都可能在对方 store 之前读到 false正确做法:使用 std::call_once 或 CAS 循环保证"只执行一次"语义。

🧠 思维陷阱:认为每个函数加锁就没有竞态 新手想法:"empty() 加锁、pop() 加锁,两个函数都线程安全,组合使用也安全。" 实际上:每个函数单独无 data race,但 if (!q.empty()) q.pop() 在两次调用之间有间隙。另一个线程可能在间隙中清空队列。这是 race condition。 正确思维:并发接口应把"检查+操作"合并为原子接口(如 tryPop() -> optional<T>),而不是让调用者拼接两个独立的安全操作。

练习

  1. [分析题]:区分以下场景中哪些是 data race,哪些是 race condition:(a) 两线程无锁写同一 int;(b) 两线程各自加锁后都执行 if (empty()) return + pop();(c) 两线程用 atomic<bool> 竞争初始化资源。
  2. [代码题]:编写一个最小 data race 程序,用 ThreadSanitizer 检测。然后改为 atomic,验证 data race 消失。
  3. [跨章综合题]:线程管理与互斥同步 的 MessageQueueempty() 检查和 pop() 合并在同一个锁保护的 pop() 中。解释这种设计如何同时消除 data race 和 race condition。

18.3 C++ 内存模型:为什么需要 happens-before ⭐⭐⭐⭐

什么是"可见性"?——不是定义,而是一个真实的故障场景

在进入形式化的 happens-before 定义之前,先用一个具体场景理解"可见性"问题的直觉。

假设 Tracking 线程计算出了新的位姿 pose,然后设置标志 ready = true。可视化线程不断检查 ready,看到 true 后读取 pose 并渲染。在你的脑中,这个过程是确定的:先写 pose,再写 ready;看到 ready 为 true 时,pose 一定已经写好了。

但在真实的多核 CPU 上,这个直觉可能是错的。线程 A 写了一个 flag,线程 B 为什么可能看不到? 答案藏在 CPU 的物理结构中:

Store buffer(存储缓冲区)。当 Tracking 线程执行 pose = new_pose 时,这次写入不是直接写进内存——它首先进入 CPU 核心的 store buffer(一个几十个条目的硬件队列)。Store buffer 的存在是为了让 CPU 不用等待慢速的 cache/内存写入就能继续执行下一条指令——这让单核性能提升了 10-30%。但代价是:store buffer 中的数据只对**本核心**可见。可视化线程运行在另一个核心上,它的 load 指令读取的是自己核心的 cache 或内存——而 Tracking 线程写入的 pose 可能还停留在 Tracking 核心的 store buffer 中,根本没有传播过来。

Cache(缓存)。即使 pose 已经从 store buffer 刷入了 Tracking 核心的 L1 cache,可视化线程核心的 L1 cache 中可能还缓存着 pose 的旧值。缓存一致性协议(MESI)最终会让所有核心看到一致的数据,但这个"最终"可能是几百纳秒甚至更久——在这段时间内,两个核心看到的 pose 值可能不同。

编译器重排。更隐蔽的是,编译器可能把 pose = new_pose; ready = true; 重排为 ready = true; pose = new_pose;。在单线程中两者等价(poseready 之间没有数据依赖),但在多线程中,可视化线程可能先看到 ready == true,然后读到 pose 的旧值——因为新的 pose 还没被写入。

这三层"不可见性"叠加在一起,构成了并发编程中最反直觉的现象:一个线程先写了数据再写了标志,另一个线程可能先看到标志更新而数据还是旧的。 C++ 内存模型和 memory_order 的全部意义,就是让程序员能够控制这些不可见性——告诉编译器和 CPU"这些写入的顺序必须被其他核心观察到"。

工程问题:C++ 内存模型为什么存在——从三层重排理解可见性问题

在理解 happens-before 之前,需要先理解它要解决的问题:为什么一个线程写入的数据,另一个线程可能看不到?答案涉及三个不同层次的"不可见性",每层都有独立的机制:

第一层:编译器重排。 编译器为了优化性能(如减少流水线停顿、利用寄存器),可以重排没有数据依赖的语句。例如 data = 42; ready = true; 可能被优化为 ready = true; data = 42;——在单线程中两者等价(因为 dataready 没有直接依赖),但在多线程中,消费者可能先看到 ready == true 再看到 data 仍是旧值。GCC 的 -O2 以上就可能做这种重排。

第二层:CPU 乱序执行与 store buffer。 现代 CPU 有 store buffer——写入指令的结果先进入 store buffer 而非直接写入 L1 cache。这让 CPU 不必等待写入完成就能继续执行后续指令,大幅提升吞吐。但代价是:同一核心上后续的 load 可以从 store buffer 中转发(store-to-load forwarding),而其他核心看不到 store buffer 中的值——它们只能看到已经从 store buffer 刷入 cache 的值。这意味着不同核心可能以不同顺序观察到同一线程的写入。

第三层:缓存一致性协议的延迟。 即使写入已经从 store buffer 刷入了本核心的 cache,缓存一致性协议(如 MESI)仍需要时间把这次写入传播到其他核心的 cache。在 x86 的 TSO(Total Store Order)模型下,所有 store 的顺序在全局可见时被保留(TSO 本身就较强),但在 ARM 和 POWER 架构上,不同核心看到不同 store 的顺序可能不一致(它们使用更弱的内存模型以换取更低的功耗和更高的可扩展性)。

这三层合在一起解释了一个关键事实:"一个线程先写了 data 再写了 ready"这个源代码级别的顺序,不能保证另一个线程观察到同样的顺序。 C++ 内存模型的作用就是在这些物理现实之上建立语言级的保证:通过 memory_order_release 告诉编译器和 CPU "不要把之前的写入重排到这个 store 之后",通过 memory_order_acquire 告诉编译器和 CPU "不要把之后的读取重排到这个 load 之前"。

考虑生产者:

data = 42;
ready = true;

消费者:

if (ready) {
    use(data);
}

人类直觉是:看到 ready == true 就说明 data == 42 已经可见。 C++ 不会对普通变量自动给出这个保证——三层重排中的任何一层都可能破坏这个直觉。

反面失败:只让标志 atomic,但内存序不发布数据

int data = 0;
std::atomic<bool> ready{false};

void producer() {
    data = 42;
    ready.store(true, std::memory_order_relaxed);
}

void consumer() {
    while (!ready.load(std::memory_order_relaxed)) {
    }
    assert(data == 42);
}

ready 没有 data race。 但 data 的可见性没有通过 ready 建立同步关系。 这里需要 release-acquire:

void producer() {
    data = 42;
    ready.store(true, std::memory_order_release);
}

void consumer() {
    while (!ready.load(std::memory_order_acquire)) {
    }
    assert(data == 42);
}

release store 与 acquire load 读到同一个值时,会建立 synchronizes-with。 于是 producer 中 data = 42 happens-before consumer 中读取 data

抽象不变量:happens-before 关系的完整形式化定义

C++ 并发里最重要的不是”哪条语句先写在源代码里”,而是能否建立 happens-before。这个概念是 C++ 内存模型的核心,值得从基础开始完整推导。

C++ 内存模型为什么存在? 这个问题的答案涉及三个不同层次的”重排”,每个层次都可能让多线程程序的行为偏离程序员的直觉:

  1. 编译器重排:编译器为了优化性能,可以重排没有数据依赖的指令。例如 a = 1; b = 2; 可能被重排为 b = 2; a = 1;,因为在单线程中两者等价。但如果另一个线程用 b 作为 a 已写入的信号,重排就破坏了通信语义。
  2. CPU 乱序执行:现代 CPU 有超标量流水线和乱序执行引擎(OoO execution),指令可能以与程序顺序不同的顺序完成。CPU 保证单核心上的”仿佛顺序执行”(通过 ROB reorder buffer),但不保证其他核心观察到的写入顺序。
  3. 缓存一致性延迟:即使 CPU 按程序顺序执行了写入,写入的值首先进入本核心的 store buffer,然后通过缓存一致性协议(如 MESI/MOESI)传播到其他核心的 cache。这个传播不是瞬时的,不同核心可能在不同时间看到同一次写入。

C++ 内存模型([intro.multithread], [atomics.order])的作用是:在这三层物理现实之上,建立一套语言级的可见性保证体系。程序员不需要了解具体硬件的重排规则,只需要使用 C++ 提供的同步原语(atomic、mutex、thread join 等),就能在所有符合标准的平台上得到正确的跨线程可见性。

三个基础关系的严格定义

名称 含义 建立方式
sequenced-before 同一线程内,由 C++ 语言规则确定的求值先后顺序 分号分隔的完整表达式之间、函数调用的实参与函数体之间
synchronizes-with 跨线程的同步关系——一个线程的操作”发布”给另一个线程 atomic 的 release store 与读到该值的 acquire load 之间;mutex 的 unlock 与后续 lock 之间;thread 的完成与 join 返回之间
happens-before 可见性保证——如果操作 A happens-before 操作 B,则 A 的效果对 B 可见 sequenced-before 和 synchronizes-with 的传递闭包

happens-before 的传递闭包是关键。如果操作 A sequenced-before 操作 B(同一线程内的先后),且操作 B synchronizes-with 操作 C(跨线程同步),且操作 C sequenced-before 操作 D(同一线程内的先后),那么 A happens-before D——尽管 A 和 D 在不同线程中。这条传递链就是”写入对读取可见”的语言级证明。

为什么默认用 seq_cst 而不是 relaxed? 初学者容易被 memory_order_relaxed 的”高性能”吸引而过早使用它。正确的思考顺序是:先用 seq_cst(默认)写出正确代码,然后在性能测量证明它是瓶颈时,基于同步链分析减弱到 release-acquirerelaxed。这遵循 Knuth 的原则:”premature optimization is the root of all evil”。seq_cst 提供最强的顺序保证——所有 seq_cst 操作形成一个全局一致的顺序,最接近程序员的顺序直觉。relaxed 只保证单个原子对象的原子性,不建立任何跨线程可见性关系。从 seq_cstrelaxed 的每一步减弱都是在用增加的推理难度换取可能的性能收益——这个权衡只应在有数据支持时做出。

可以把它画成:

Producer thread:
data = 42
  sequenced-before
ready.store(true, release)
  synchronizes-with
ready.load(acquire) == true
  sequenced-before
read data

结论:data = 42 happens-before read data

这条链就是“消费者能看到生产者写入”的语言级证明。

acquire/release 语义的深层直觉——"门"模型

前面已经介绍了 acquire 和 release 的形式化定义。但工程实践中,一个好的直觉模型比形式化定义更有用——因为写代码时你需要快速判断"这里需要什么内存序"。

"门"模型:把每个 release store 想象成一扇从内向外关闭的门,把每个 acquire load 想象成一扇从外向内打开的门。

  • release store = 关门:门内侧(时间上在 release store 之前)的所有写操作,不能穿过这扇门跑到外侧(时间上在 release store 之后)。换句话说,release store 是一个"写屏障"——它保证"在我发布这个标志之前,我做的所有修改都已经对外可见"。
  • acquire load = 开门:门外侧(时间上在 acquire load 之前)的所有读操作,不能穿过这扇门跑到内侧(时间上在 acquire load 之后)。换句话说,acquire load 是一个"读屏障"——它保证"在我获取到这个标志之后,我读到的数据一定是发布者在发布标志之前写好的"。

当 acquire load 读到了 release store 写入的值时,两扇门"对接"——门内的所有写入(生产者侧)对门内的所有读取(消费者侧)可见。

这个模型也解释了为什么 relaxed 不够:relaxed 没有门——写入和读取可以自由地越过原子操作的位置,因此一个线程看到 ready == true 不代表它能看到 data == 42

一个更具体的比喻:release-acquire 像邮局的挂号信系统。寄件人(生产者)把信件内容(payload)放入信封,在邮局盖上日期戳后寄出(release store)。收件人(消费者)在邮局签收信件(acquire load),签收动作确认了信件内容已经到达。邮局系统保证:信封里的内容一定是在盖日期戳之前放入的,收件人在签收之后读到的内容一定完整。如果用普通邮寄(relaxed),邮局不保证信封和内容的对应关系——收件人可能收到空信封,或者信件到了但内容还没写好。

规则推导:mutex 的 unlock/lock 也建立同步

线程管理与互斥同步 中的 mutex 本质上也建立 happens-before:

std::mutex mutex;
int data = 0;

void producer() {
    std::lock_guard<std::mutex> lock(mutex);
    data = 42;
}

void consumer() {
    std::lock_guard<std::mutex> lock(mutex);
    use(data);
}

同一个 mutex 的 unlock synchronizes-with 后续成功 lock。 所以 mutex 不只是“互斥”,也是“可见性同步”。

atomic 的内存序让你在不进入临界区的情况下建立部分同步关系。 但这也意味着你必须自己说清楚同步链。

工程边界:不要把 happens-before 当成时间先后

happens-before 是语言关系,不是墙上时钟时间。 一个操作在真实时间上先发生,不代表另一个线程一定能看到。 如果没有同步,编译器和 CPU 可以让可见性顺序与直觉不同。

机器人日志里经常看到:

T=10.000 Mapping wrote pose
T=10.001 Visualization read ready flag

这不能证明 Visualization 一定看到 pose。 只有同步关系能证明。

代码验证:发布配置对象

#include <atomic>
#include <cassert>
#include <thread>

struct Config {
    int max_iterations = 0;
    double threshold = 0.0;
};

Config config;
std::atomic<bool> ready{false};

void producer() {
    config.max_iterations = 20;
    config.threshold = 0.1;
    ready.store(true, std::memory_order_release);
}

void consumer() {
    while (!ready.load(std::memory_order_acquire)) {
    }
    assert(config.max_iterations == 20);
    assert(config.threshold == 0.1);
}

int main() {
    std::thread a(producer);
    std::thread b(consumer);
    a.join();
    b.join();
}

这里 config 不是 atomic。 它能被安全读取,是因为所有写入都发生在 release store 之前,而 consumer 的 acquire load 读到了这个 release store 发布的值。

本质洞察:happens-before 不是"时间先后",而是"可见性保证"。一个操作在墙上时钟上先发生,不代表另一个线程能看到它的效果。只有通过 synchronizes-with(atomic release/acquire、mutex unlock/lock、thread join 等)建立的 happens-before 链,才能在 C++ 语义层证明"写入对读取可见"。日志时间戳不能替代同步关系。

happens-before 就像快递追踪系统。卖家发货(release store)并在系统中标记"已发出"。买家查看快递状态(acquire load),看到"已发出"后才去取件。"已发出"状态是同步点——它保证包裹的内容(payload 数据)在标记之前已经装好。如果卖家只是把包裹放在门口但没有在系统中标记(relaxed store),买家查看系统时看不到这个包裹,即使包裹在物理上已经存在。

如果所有 atomic 操作都用 relaxed 会怎样?relaxed 只保证单个原子对象的原子性——读写不会被撕裂。但它不建立任何跨线程的可见性关系。生产者写完 data = 42 后用 relaxedready = true,消费者用 relaxed 读到 ready == true 后读 data——但 data 的写入可能对消费者不可见!编译器可能重排 dataready 的写入顺序,CPU 的 store buffer 可能延迟 data 的发布。只有 release-acquire 对才能阻止这种重排。

⚠️ 编程陷阱:用 relaxed 内存序发布数据 错误做法data = 42; ready.store(true, memory_order_relaxed); 现象:在 x86 上"偶然正确"(因为 TSO 内存模型较强),在 ARM/Jetson 上出现 ready == truedata != 42 的情况。 根本原因relaxed 不阻止编译器和 CPU 重排 dataready 的写入。消费者看到 ready 更新不代表看到 data 更新。 正确做法:生产者用 memory_order_release,消费者用 memory_order_acquire

💡 概念误区:认为"在我的 x86 机器上能跑"就证明代码正确 新手想法:"这段并发代码在我的 Intel 笔记本上跑了一万次都没问题,应该是正确的。" 实际上:x86 的 TSO(Total Store Order)内存模型比 C++ 标准要求的更强。许多错误代码在 x86 上"偶然正确",但在 ARM、POWER 或 RISC-V 上立刻暴露。机器人系统经常部署在 ARM 平台(Jetson、树莓派、嵌入式 SBC),x86 上的"通过"不能移植。 正确理解:C++ 正确性按 C++ 内存模型证明,不按特定硬件行为证明。

练习

  1. [分析题]:画出 release-acquire 消息传递的 happens-before 链。标注 sequenced-before 和 synchronizes-with 关系。
  2. [代码题]:编写一个"发布配置"的程序:生产者写配置后 release store 标志;消费者 acquire load 标志后读配置。用 assert 验证配置可见。然后故意改成 relaxed,讨论在 ARM 上可能的表现。
  3. [跨章综合题]:线程管理与互斥同步 的 mutex unlock/lock 也建立 synchronizes-with。解释为什么 mutex 本质上也是一种 release-acquire 机制。两者在使用复杂度和性能上有什么权衡?

18.4 std::atomic<T> 对象模型:原子读写、交换与 CAS ⭐⭐⭐⭐

std::atomic<T> 的设计原理:从硬件原子操作到语言层抽象

std::atomic<T> 不是"给普通变量加了锁"——它是 C++ 对硬件原子操作指令的直接抽象。理解它的设计需要从硬件层面出发。

硬件原子操作的物理基础。 现代 CPU 对基本数据类型(通常是机器字长以内的整数和指针)提供硬件级的原子读写支持。x86 保证对齐的 32 位和 64 位读写天然是原子的——一次 MOV 指令完成整个读或写,不会被另一个核心的操作"拦腰截断"。这意味着另一个核心要么看到完整的旧值,要么看到完整的新值,不会看到"半旧半新"的撕裂值(torn read/write)。但注意:这种原子性只保证单次读写操作不被撕裂,不保证可见性顺序——后者需要内存序来控制。

为什么 std::atomic 不可拷贝? std::atomic<int> b = a; 被删除不是技术限制——编译器完全可以生成"原子读 a,然后原子写 b"的代码。问题在于语义:这个"复制"不是原子的。在"读 a"和"写 b"之间,a 的值可能已经被另一个线程修改了。b 得到的是 a 的一个"快照",而不是 a 的"副本"——两者的区别在于快照可能立刻过时。禁止拷贝是为了防止程序员误以为"复制了一个 atomic 就得到了一个同步的副本"。正确做法是显式地 b.store(a.load())——这让"我读了 a 的当前值并写入 b"的语义更清晰。

is_lock_free() 的含义。 并非所有 std::atomic<T> 都能用硬件原子指令实现。对于大型结构体(如 struct { double x, y, z; }),硬件没有单条指令能原子地读写 24 字节。此时编译器会退而使用内部 mutex——atomic<BigStruct> 的每次 loadstore 都变成了加锁、读写、解锁。is_lock_free() 让你检查某个 atomic<T> 是否真正使用了硬件原子指令。在性能敏感的场景中(如实时循环中的状态标志),应确认 static_assert(std::atomic<T>::is_always_lock_free) 或在运行时检查。非 lock-free 的 atomic 可能比直接使用 mutex 更慢(因为 atomic 的内部 mutex 不支持条件等待,也不支持 try_lock)。

atomic 的操作分类。 std::atomic<T> 提供的操作可以按复杂度分为三级:

级别 操作 语义 典型用途
一级 load() / store() 原子读 / 原子写 停止标志、状态查询
二级 exchange() 原子写并返回旧值 一次性状态切换
三级 compare_exchange_weak/strong() 比较并条件性写入 CAS 循环、状态机

一级操作最简单也最高效——通常只是一条 MOV 指令加可能的内存屏障。二级操作需要 x86 的 XCHG 指令或 ARM 的 SWP 指令。三级操作需要 CMPXCHG(x86)或 LDXR/STXR 循环(ARM),是最复杂也最灵活的原子操作。

工程问题:原子对象不是普通变量加锁

std::atomic<T> 是一个特殊对象。 它的拷贝构造和拷贝赋值通常被删除,因为复制一个正在并发访问的同步对象没有清晰语义。

std::atomic<int> a{1};
// std::atomic<int> b = a;  // 不允许

要读写它,使用成员函数:

int value = a.load();
a.store(2);
int old = a.exchange(3);

这让读写语义显式化。 也让你可以指定内存序。

反面失败:把 atomic 当成普通字段随便放进可拷贝状态

struct Stats {
    std::atomic<int> dropped_frames{0};
};

std::vector<Stats> stats;
// stats.push_back(Stats{});  // 可能因为不可拷贝/不可移动而失败

包含 atomic 的类型通常不再自然满足普通 value type 语义。 如果要放进容器,常见做法是:

  1. 预先构造固定数量对象。
  2. std::unique_ptr<Stats> 存储。
  3. 把 atomic 统计字段放到单独的监控对象中。
  4. 自定义移动逻辑,但必须非常谨慎。

抽象不变量:atomic 操作是对单个对象的不可分割访问

atomic 保证的是单个原子操作不可被其他线程观察成“执行到一半”。

操作 含义 常见场景
load 原子读取 停止标志、当前索引
store 原子写入 发布标志、切换索引
exchange 原子替换并返回旧值 状态切换、一次性标志
fetch_add 原子加法并返回旧值 计数器
compare_exchange 条件替换 无锁结构、状态机

不可分割不等于“自动有序”。 是否发布其他数据取决于 memory order。

规则推导:exchange 适合一次性接管状态

假设一个后台保存线程只允许启动一次:

std::atomic<bool> saving{false};

bool tryStartSaving() {
    const bool was_saving =
        saving.exchange(true, std::memory_order_acq_rel);
    return !was_saving;
}

void finishSaving() {
    saving.store(false, std::memory_order_release);
}

exchange 是读改写操作。 它在一个原子步骤中读取旧值并写入新值。 如果两个线程同时调用 tryStartSaving(),只有一个会看到旧值为 false

CAS:compare-and-swap 的基本形态

CAS 的逻辑是:

如果当前值等于 expected:
    写入 desired
    返回 true
否则:
    把当前值写回 expected
    返回 false

C++ 写法:

std::atomic<int> state{0};

bool transitionZeroToOne() {
    int expected = 0;
    return state.compare_exchange_strong(
        expected,
        1,
        std::memory_order_acq_rel,
        std::memory_order_acquire);
}

失败时 expected 会变成实际读到的值。 这让循环可以基于新值继续计算。

weakstrong

compare_exchange_weak 允许“伪失败”。 也就是说,即使当前值等于 expected,也可能返回 false。 它通常用于循环:

void increment(std::atomic<int>& value) {
    int old = value.load(std::memory_order_relaxed);
    int desired = 0;

    do {
        desired = old + 1;
    } while (!value.compare_exchange_weak(
        old,
        desired,
        std::memory_order_relaxed,
        std::memory_order_relaxed));
}

注意循环中没有手动重新 load。 失败时 old 已经被更新为当前值。 这里使用 relaxed,因为这个计数器没有发布任何其他数据。 它只要求“加一”这个读改写操作本身不可被撕裂。 如果 CAS 成功后还要让其他线程看到某个普通对象的写入,才需要把成功内存序提升到 releaseacq_rel,并在读取侧配对 acquire

这也是内存序教学中最容易混淆的地方:原子操作有两个层次。 第一层是“这个原子变量本身不会数据竞争”,relaxed 已经满足。 第二层是“它是否还承担发布其他内存写入的职责”,这才需要 release/acquire。

工程边界:atomic shared_ptr 的版本边界

C++20 提供 std::atomic<std::shared_ptr<T>> 特化。 它适合发布不可变配置快照:

std::atomic<std::shared_ptr<const Config>> current_config;

void publish(std::shared_ptr<const Config> config) {
    current_config.store(std::move(config), std::memory_order_release);
}

std::shared_ptr<const Config> loadConfig() {
    return current_config.load(std::memory_order_acquire);
}

这不是实时热路径的默认选择。 shared_ptr 的引用计数本身也有成本。 实时循环更常用预分配缓冲、双缓冲或显式生命周期管理。

代码验证:一次性状态切换

#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

int main() {
    std::atomic<bool> started{false};
    std::atomic<int> winners{0};

    auto task = [&] {
        bool expected = false;
        if (started.compare_exchange_strong(
                expected, true, std::memory_order_acq_rel)) {
            winners.fetch_add(1, std::memory_order_relaxed);
        }
    };

    std::vector<std::thread> threads;
    for (int i = 0; i < 8; ++i) {
        threads.emplace_back(task);
    }
    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << winners.load(std::memory_order_relaxed) << "\n";
}

预期输出是 1。 CAS 把”检查是否未开始”和”标记已开始”合成一个原子动作。

⚠️ 编程陷阱:包含 atomic 的类型放进 std::vector 错误做法std::vector<Stats> stats; stats.push_back(Stats{});Stats 包含 atomic<int> 成员)。 现象:编译报错——atomic 不可拷贝不可移动,导致包含它的类型也不满足容器的元素要求。 根本原因:复制一个正在被并发访问的同步对象没有清晰语义,所以 std::atomic 的拷贝构造和拷贝赋值被删除。 正确做法:预先构造固定数量对象(std::array)、用 std::unique_ptr<Stats> 存储、或把 atomic 统计字段放到单独的监控对象中。

💡 概念误区:认为 compare_exchange_weak 的”伪失败”是 bug 新手想法:”weak 版本即使值相等也可能返回 false?这不是实现有 bug 吗?” 实际上:在 LL/SC(Load-Linked/Store-Conditional)架构上,如果在 LL 和 SC 之间有缓存行被替换(即使值没变),SC 也会失败。weak 暴露了这个硬件行为,让循环在大多数情况下生成更高效的机器码。strong 版本会在内部加循环保证不伪失败,但在循环 CAS 场景下这个内部循环是冗余的。 正确理解:在循环中用 weak(更高效),在单次判断中用 strong(语义更清晰)。

练习

  1. [分析题]:解释 exchangecompare_exchange_strong 的区别。exchange 适合什么场景?compare_exchange 适合什么场景?
  2. [代码题]:用 CAS 实现一个线程安全的”只执行一次”保护器(类似 std::call_once 的简化版)。8 个线程同时尝试启动,验证只有 1 个成功。
  3. [跨章综合题]:线程管理与互斥同步 用 mutex 保护 MessageQueue 的入队和出队。讨论:能否用 CAS 循环实现无锁版本的 push?如果可以,需要什么假设(如 SPSC)?如果不可以,瓶颈在哪里?

18.5 Memory Order:不要先背枚举,先看消息传递 ⭐⭐⭐⭐⭐

工程问题:同一个 atomic 标志可能有不同语义

std::atomic<bool> flag 可以表示很多事:

语义 是否发布其他数据 常用内存序
停止请求 不发布 relaxed
配置已准备好 发布配置对象 release-acquire
状态机切换 可能发布状态 acq_rel
全局调试顺序 想要最强直觉 seq_cst

所以不能说“atomic bool 就用 relaxed”。 要看它是不是同步其他数据。

反面失败:把 memory order 当性能按钮

下面的注释是错误思路:

ready.store(true, std::memory_order_relaxed);  // 更快

如果 ready 是发布数据的标志,relaxed 就是错误。 性能不是第一问题。 先建立正确同步,再考虑是否能减弱内存序。

抽象不变量:memory_order 六种选项的语义——从最强到最弱逐级放松

内存序描述的不是"这个操作有多快",而是"这个原子操作如何参与跨线程的顺序关系"。理解内存序的最好方式**不是背一张六行的表格**,而是从最强的 seq_cst 出发,逐步放松——每放松一级,明确回答三个问题:放弃了什么保证?换来了什么性能?什么场景下这个放弃是安全的?

这就像从"全副武装"开始卸装备:先穿全套盔甲(seq_cst),确认安全后脱掉头盔(release-acquire),再确认安全后脱掉护甲(relaxed)。每脱一件都要确认"不穿这件在我的场景中不会受伤"。如果不确定,就保持穿着——seq_cst 的"性能损失"在绝大多数场景中完全可以忽略(x86 上 seq_cst store 的额外开销是一条 MFENCE 指令,约 20-40ns),而用错内存序的代价是"程序在 ARM 上偶发崩溃且无法复现"。

seq_cst(顺序一致性)——最强保证,默认选择。 所有 seq_cst 操作在所有线程中形成单一的全局顺序(total order)。这意味着如果线程 A 先执行 seq_cst store X 再执行 seq_cst store Y,那么任何线程看到 Y 时一定也看到 X。这最接近程序员的顺序直觉——"代码写在前面的先发生"。代价是在弱内存模型平台(ARM、POWER)上可能需要额外的内存屏障(memory fence),带来性能开销。在 x86 的 TSO 模型上,store 已经天然有 release 语义,seq_cst 的额外开销主要是 store 后的 MFENCE 指令。

release(发布语义)——"门的关闭"。 release store 保证:当前线程中,这个 store 之前的所有内存写入(包括非 atomic 的普通变量写入)不会被重排到这个 store 之后。直觉理解:release store 像一扇门的关闭——门内(之前)的操作不能逃出门外(之后)。它的作用是"发布"——告诉其他线程"在你看到我发布的值之后,我之前做的所有修改你都能看到"。

acquire(获取语义)——"门的打开"。 acquire load 保证:当前线程中,这个 load 之后的所有内存读取不会被重排到这个 load 之前。直觉理解:acquire load 像一扇门的打开——门外(之前)的操作不能跑进门内(之后)。它的作用是"获取"——"我读到了你发布的值,现在我能安全地读取你之前修改的所有数据"。

acquire/release 配对的直觉解释。 把 release 想象成"把一个包裹封好并发出",acquire 想象成"收到包裹并打开"。release 保证包裹里的所有物品(之前的写入)在封好之前已经放入;acquire 保证收到包裹后才开始检查物品。两者配对形成 synchronizes-with 关系,建立 happens-before 链。

acq_rel(获取-发布语义)。 同时具有 acquire 和 release 语义。用于读-改-写操作(fetch_addcompare_exchange 等),这些操作既要"获取"当前值(看到其他线程之前的写入),又要"发布"新值(让后续读者看到自己之前的写入)。

relaxed(松散语义)——最弱保证。 只保证这个原子对象本身的读写是原子的(不会被撕裂),不建立任何跨线程的可见性关系。编译器和 CPU 可以自由地把这个操作与其他操作重排。适用场景:操作只涉及一个独立的原子变量,不需要通过这个变量"传递"其他数据的可见性。典型例子是统计计数器——多个线程各自递增同一个 atomic 计数器,最终只关心总数,不关心计数器更新的顺序或与其他数据的关系。

consume——理论上的数据依赖优化。 consume 的设计意图是只对与 load 值有数据依赖的后续操作建立同步,而不是对所有后续操作。这在某些架构上可以避免不必要的屏障。但实际上,consume 长期以来无法被编译器正确实现——几乎所有编译器都把 consume 提升为 acquire。C++ 标准委员会目前建议不使用 consume

内存序 保证 典型用途
seq_cst 所有 seq_cst 操作形成单一全序 默认安全选择、调试优先
acq_rel 同时获取和发布 读改写状态机
release 之前的写入不能越过发布点 生产者发布数据
acquire 之后的读取不能跑到获取点之前 消费者获取数据
relaxed 只保证原子性 计数器、独立统计
consume 理论上仅同步数据依赖链 不推荐使用

规则推导:release-acquire 消息传递

生产者:

payload = makePayload();
ready.store(true, std::memory_order_release);

消费者:

while (!ready.load(std::memory_order_acquire)) {
}
use(payload);

这条模式的推导是:

  1. payload = makePayload() sequenced-before release store。
  2. acquire load 读到 release store 写入的 true
  3. release store synchronizes-with acquire load。
  4. 因此 payload 写入 happens-before payload 读取。

relaxed 计数器

统计处理帧数:

std::atomic<std::uint64_t> processed_frames{0};

void onFrameProcessed() {
    processed_frames.fetch_add(1, std::memory_order_relaxed);
}

这里没有通过计数器发布其他对象。 读取端只是想知道大概处理了多少帧。 relaxed 合适。

如果读取端根据计数器判断“第 N 帧数据已经写好”,就不再是纯统计。 此时要重新设计同步。

seq_cst 的直觉

默认的 atomic 操作使用 seq_cst。 它提供最强的跨线程顺序直觉:

flag.store(true);  // seq_cst
if (flag.load()) { // seq_cst
}

所有 seq_cst 操作形成一个全局顺序。 这通常更容易写对。 缺点是可能限制优化,尤其在弱内存模型平台上成本更明显。

教学和初版工程里,可以先用 seq_cst 写正确。 当性能测量证明它是瓶颈,再基于同步链减弱到 release-acquire 或 relaxed。

⚠️ 编程陷阱:把 memory order 当性能旋钮 错误做法:不分析同步需求,直接把所有 atomic 操作改成 relaxed 以"提升性能"。 现象:在 x86 上可能看不到问题(TSO 较强);部署到 ARM/Jetson 后出现数据不一致、断言失败或偶发崩溃。 根本原因:内存序描述的是"这个 atomic 操作如何参与顺序关系",不是"这个操作有多快"。错误的内存序不是"慢",是"语义错误"。 正确做法:先用 seq_cst 写正确代码。分析同步链后,只在能证明安全的地方减弱到 release-acquire 或 relaxed。

🧠 思维陷阱:认为"同一个 atomic 变量总是用同一种内存序" 新手想法:"stop 标志全部用 relaxed 就好了。" 实际上:同一个 atomic<bool> 在不同语义下需要不同内存序。如果 stop 只表示"是否退出"且不发布其他数据,relaxed 足够。如果 stop 标志之前还写了 shutdown reason,希望读到 stop == true 后能看到 reason,就需要 release-acquire。 正确思维:每次使用 atomic 操作时,问"这个操作是否需要同步其他数据?"

练习

  1. [分析题]:对比 relaxedrelease-acquireseq_cst 三种内存序的保证强度。各给一个机器人场景的例子。
  2. [代码题]:编写一个 release-acquire 消息传递程序:生产者写 MapChunk 后 release store published 标志;消费者 acquire load 后读 MapChunk。用 assert 验证数据可见。
  3. [跨章综合题]:线程管理与互斥同步 的 mutex 本质上提供 seq_cst 级别的同步(unlock/lock 建立全序)。讨论:如果把所有 mutex 保护的场景都改成 release-acquire atomic,代码复杂度和正确性验证难度会如何变化?

工程边界:不同平台暴露不同 bug

x86 的 TSO 内存模型较强。 很多错误代码在 x86 上长期不炸。 ARM 和 POWER 更弱,移动机器人常用的 Jetson、ARM SBC、嵌入式平台更容易暴露问题。

但 C++ 正确性不能以“在我的机器上跑过”为准。 跨平台代码要按 C++ 内存模型证明。

代码验证:三种内存序用途

#include <atomic>
#include <cassert>
#include <string>
#include <thread>

struct MapChunk {
    std::string name;
    int points = 0;
};

MapChunk chunk;
std::atomic<bool> published{false};
std::atomic<int> debug_counter{0};

void producer() {
    chunk.name = "local_map";
    chunk.points = 1000;
    debug_counter.fetch_add(1, std::memory_order_relaxed);
    published.store(true, std::memory_order_release);
}

void consumer() {
    while (!published.load(std::memory_order_acquire)) {
    }
    assert(chunk.name == "local_map");
    assert(chunk.points == 1000);
}

int main() {
    std::thread a(producer);
    std::thread b(consumer);
    a.join();
    b.join();
}

debug_counter 是独立统计,用 relaxed。 published 发布 chunk,用 release-acquire。


18.6 CAS、ABA 与无锁结构的边界 ⭐⭐⭐⭐

工程问题:CAS 能构造无锁结构,但也能构造难查 bug

无锁栈、无锁队列、状态机常用 CAS。 但 CAS 只比较一个内存位置的当前值。 如果值从 A 变成 B 又变回 A,CAS 可能误以为“没有变化”。 这就是 ABA。

反面失败:指针 CAS 的 ABA

想象一个无锁栈头指针:

head -> A -> B -> C

线程 1 读取 head == A,准备把 head 改成 B。 它被切走。

线程 2 执行:

pop A
pop B
push A

现在 head 又是 A。 线程 1 的 CAS 看到 head 还是 A,于是把 head 改成它之前读到的 B。 但 B 可能已经不在栈中,甚至已释放。

这就是 ABA 问题。

CAS 的理论基础:从硬件原语到软件抽象

CAS(Compare-And-Swap)不是 C++ 发明的抽象概念——它直接对应 CPU 硬件提供的原子指令。理解 CAS 的硬件基础有助于理解它的能力边界和性能特征。

硬件层面的 CAS 指令。 x86 架构提供 CMPXCHG(Compare and Exchange)指令,ARM 提供 LDXR/STXR(Load-Exclusive/Store-Exclusive)指令对。两者的语义相同但实现机制不同:

  • x86 CMPXCHG:这是一条真正的原子指令。CPU 在执行期间锁住该内存地址对应的 cache line(或者在跨 cache line 时锁总线),确保比较和交换在一个不可分割的操作中完成。由于锁 cache line 需要与其他核心协调,频繁的 CAS 操作会产生大量的缓存一致性流量。

  • ARM LDXR/STXR(LL/SC):ARM 使用"加载链接/条件存储"模型。LDXR 加载值并设置一个"独占监视器"标志。后续的 STXR 检查独占监视器——如果自 LDXR 以来没有其他核心写过该地址,STXR 成功并写入新值;否则 STXR 失败,软件层面需要重试。这种机制的优势是不需要锁 cache line,但代价是 STXR 可能"虚假失败"(spurious failure)——即使值没有被其他线程改变,任何对该 cache line 的干扰(如缓存驱逐、中断、上下文切换)都可能导致独占标志被清除。

compare_exchange_weak vs compare_exchange_strong 的硬件根源。 C++ 标准提供两个 CAS 接口正是因为硬件差异。在 ARM 上,LL/SC 的虚假失败使得 compare_exchange_strong 需要在内部循环重试——一次 strong CAS 可能执行多次 LDXR/STXR。而 weak 版本允许虚假失败,调用者自己写重试循环。当 CAS 本身就在循环中使用时(如 CAS 状态机),weak 版本避免了双重循环,性能更好。在 x86 上,CMPXCHG 不会虚假失败,两个版本生成的机器码通常相同。

CAS 循环的一般模式。 几乎所有使用 CAS 的算法都遵循同一个模式:"读取当前值 -> 计算期望的新值 -> CAS 尝试写入 -> 失败则重试"。这个模式的正确性依赖于一个隐含假设:从读取到 CAS 之间的计算是可重试的。如果计算有副作用(如分配内存、修改其他变量),重试时这些副作用会重复执行,可能导致错误。CAS 循环最适合纯计算的场景(如原子递增、状态机转移),不适合带复杂副作用的场景。

抽象不变量:CAS 比较的是位模式,不比较历史

CAS 只知道:

当前值 == expected ?

它不知道这个值中途是否变化过。 解决 ABA 常见方法:

方法 思路 代价
tagged pointer 指针加版本号 需要额外位或双宽 CAS
hazard pointer 读者声明自己正在使用的节点 实现复杂
epoch reclamation 延迟回收节点 需要全局 epoch
SPSC 限定 简化拓扑,避免多写者竞争 适用范围窄
mutex 放弃无锁复杂性 可能足够好

机器人项目里,除非真的在高频热路径证明 mutex 不够,否则不要轻易手写 MPMC 无锁结构。 SPSC、双缓冲、原子标志已经覆盖很多实际需求。

规则推导:状态机 CAS 比指针结构更容易验证

状态机只有少量枚举值:

enum class Mode : int {
    Idle = 0,
    Tracking = 1,
    Relocalizing = 2,
    Shutdown = 3
};

std::atomic<Mode> mode{Mode::Idle};

Idle 切到 Tracking

bool tryStartTracking() {
    Mode expected = Mode::Idle;
    return mode.compare_exchange_strong(
        expected,
        Mode::Tracking,
        std::memory_order_acq_rel,
        std::memory_order_acquire);
}

这种 CAS 比无锁链表更容易推理,因为没有节点生命周期和 ABA 回收问题。

工程边界:lock-free 不等于 wait-free

常见术语:

术语 含义
blocking 可能因锁或等待被挂起
lock-free 系统整体总有线程能推进
wait-free 每个线程在有限步内完成
obstruction-free 无竞争时能完成

很多 std::atomic<T> 不一定 lock-free。 可以检查:

std::atomic<std::uint64_t> value{0};
bool ok = value.is_lock_free();

即使 lock-free,也可能因为 CAS 重试过多导致某个线程长时间不成功。

代码验证:CAS 状态机

#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

enum class Mode : int {
    Idle,
    Tracking,
    Shutdown
};

int main() {
    std::atomic<Mode> mode{Mode::Idle};
    std::atomic<int> started{0};

    auto start = [&] {
        Mode expected = Mode::Idle;
        if (mode.compare_exchange_strong(
                expected,
                Mode::Tracking,
                std::memory_order_acq_rel,
                std::memory_order_acquire)) {
            started.fetch_add(1, std::memory_order_relaxed);
        }
    };

    std::vector<std::thread> threads;
    for (int i = 0; i < 8; ++i) {
        threads.emplace_back(start);
    }
    for (auto& t : threads) {
        t.join();
    }

    std::cout << started.load(std::memory_order_relaxed) << "\n";
}

这类状态机适合 CAS。 如果状态里还包含复杂资源所有权,先考虑 mutex 或显式队列。

ABA 问题可以类比邮局信箱。你看到信箱里有信件 A,准备取走。但在你伸手之前,邮递员取走了 A,放入了 B,然后 B 被取走,又放回了 A。你回来看到"还是 A",以为什么都没变——但实际上信箱经历了一轮替换。CAS 只比较当前值,不比较历史。如果 A 是指针,中间可能涉及内存释放和重新分配,"同一个指针值"可能指向完全不同的对象。

⚠️ 编程陷阱:手写无锁 MPMC 队列 错误做法:没有充分测试就在生产代码中使用自己实现的多生产者多消费者无锁队列。 现象:在低负载下正常,高负载或特定线程数下出现数据丢失、重复消费或崩溃。 根本原因:MPMC 无锁结构需要处理 ABA、内存回收(hazard pointer/epoch)、伪共享等大量细节。99% 的手写实现都有 bug。 正确做法:机器人项目优先用 mutex + queue。只有在 profiler 证明 mutex 是瓶颈、且拓扑可以简化为 SPSC 时,才考虑无锁结构。使用经过充分测试的库(如 cameron314/readerwriterqueue)比手写更安全。

💡 概念误区:认为 lock-free 等于 wait-free 等于"不阻塞" 新手想法:"lock-free 就是永远不等待,性能最好。" 实际上:lock-free 只保证系统整体总有线程能推进,但某个具体线程可能因为 CAS 重试而长时间不成功。wait-free 保证每个线程在有限步内完成,这比 lock-free 更强但也更难实现。而且 std::atomic<T> 不一定是 lock-free——可以用 value.is_lock_free() 检查。 正确理解:lock-free 是进度保证,不是性能保证。CAS 重试在高竞争下可能比 mutex 更慢。

练习

  1. [分析题]:解释 ABA 问题为什么在无锁栈中特别危险。tagged pointer 如何通过版本号避免 ABA?
  2. [代码题]:用 CAS 实现一个简单的 Idle -> Tracking -> Shutdown 状态机。验证只有合法的状态转换能成功。
  3. [跨章综合题]:线程管理与互斥同步 的 deadlock 是 mutex 的经典问题;本节的 ABA 是 CAS 的经典问题。对比两者:"使用更底层的工具"在什么意义上更危险?从"bug 种类"和"调试难度"两个维度讨论。

18.7 False Sharing:atomic 也会拖慢系统 ⭐⭐⭐⭐

缓存一致性协议的理论基础:为什么硬件共享单位不是变量

理解 false sharing 需要先理解现代 CPU 的缓存架构和缓存一致性协议。这些硬件机制直接决定了原子操作的真实性能代价。

CPU 缓存的层次结构。 现代 CPU 的内存访问延迟差距极大:L1 cache 约 1-4 个时钟周期(约 1ns),L2 cache 约 10-20 个周期(约 4-8ns),L3 cache 约 30-70 个周期(约 12-30ns),主内存约 200-300 个周期(约 80-120ns)。为了弥补 CPU 执行速度和内存延迟之间的巨大鸿沟,CPU 在多级缓存中保存最近使用的数据。缓存的基本管理单位不是字节或变量,而是**缓存行(cache line)**——通常是 64 字节。一次缓存未命中(cache miss)会加载整条 64 字节的缓存行,即使程序只需要其中 4 个字节。

MESI 协议的状态转换。 在多核 CPU 中,同一个内存地址的数据可能同时存在于多个核心的缓存中。缓存一致性协议(最基本的是 MESI 协议)确保所有核心看到一致的数据。MESI 为每条缓存行维护四种状态:

状态 含义 转换条件
**M**odified(已修改) 只有当前核心有最新数据,主内存中的版本是旧的 当前核心写入
**E**xclusive(独占) 只有当前核心有数据,和主内存一致 读取且无其他核心缓存
**S**hared(共享) 多个核心有相同的副本 其他核心也在读取
**I**nvalid(无效) 当前缓存行数据无效 其他核心写入了该地址

当核心 A 想要写一个处于 Shared 状态的缓存行时,它必须先向所有持有该缓存行副本的核心发送"无效化"消息(invalidate),等待所有核心确认后才能将状态改为 Modified 并执行写入。这个"无效化 -> 确认"的往返通信是 false sharing 的性能代价的根源。

false sharing 的定量影响。 在两个核心频繁交替写入同一条缓存行时,每次写入都触发一次无效化-确认往返。如果两个核心在不同的芯片组上(NUMA 架构),这个往返可能需要 40-100ns。在紧密循环中,这意味着每秒只能完成约 1000-2500 万次写入,而单核心写本地缓存行可以达到每秒数十亿次。性能差距可达 100 倍以上。这就是为什么看似"每个线程只访问自己的变量"的代码可能比单线程更慢——硬件不按变量边界管理缓存,而按缓存行边界管理。

工程问题:不同变量也可能抢同一条 cache line

CPU 缓存以 cache line 为单位。 常见 cache line 是 64 字节。 如果两个线程分别频繁写两个 atomic,但这两个 atomic 落在同一条 cache line 上,缓存一致性协议仍会让这条 cache line 在核心之间来回迁移。

这叫 false sharing:

struct Counters {
    std::atomic<int> tracking_frames{0};
    std::atomic<int> mapping_frames{0};
};

Tracking 线程只写 tracking_frames。 Mapping 线程只写 mapping_frames。 逻辑上互不共享。 硬件上可能共享同一 cache line。

反面失败:把大量 atomic 统计量紧密排布

struct Metrics {
    std::atomic<std::uint64_t> imu_count{0};
    std::atomic<std::uint64_t> lidar_count{0};
    std::atomic<std::uint64_t> dropped_count{0};
    std::atomic<std::uint64_t> loop_count{0};
};

如果四个线程分别高频写四个字段,它们可能互相拖慢。 这不是锁竞争,却是缓存一致性竞争。

抽象不变量:硬件共享单位不是 C++ 变量,而是 cache line

可以用对齐隔离热点计数器:

#include <atomic>
#include <new>

struct alignas(std::hardware_destructive_interference_size) PaddedCounter {
    std::atomic<std::uint64_t> value{0};
};

C++17 提供:

常量 含义
std::hardware_destructive_interference_size 避免不同热点对象互相干扰的建议间距
std::hardware_constructive_interference_size 适合一起读取对象的建议聚合大小

不同平台实现可能不同。 工程上仍应结合性能测量。

规则推导:局部累积再归并

并行编程框架 会讲数据并行。 并行点云处理里,最常见的错误是每个点都写全局 atomic:

global_count.fetch_add(1, std::memory_order_relaxed);

如果百万点循环里每个线程都写同一个 atomic,这个 atomic 成为串行瓶颈。 更好的方式是:

  1. 每个线程用局部普通变量计数。
  2. 分块结束后写一次 atomic。
  3. 或在主线程归并局部结果。

工程边界:atomic 统计值通常不该进入实时热路径

实时控制线程里,统计日志应尽量低频。 每次循环都写共享 atomic 可能引入抖动。 可以采用:

场景 建议
调试构建 atomic 统计详细指标
发布构建 降低统计频率
实时线程 写本地缓冲,低频汇总
离线分析 tracepoint 或二进制日志

代码验证:padding 计数器

#include <atomic>
#include <cstddef>
#include <iostream>
#include <new>

struct alignas(std::hardware_destructive_interference_size) Counter {
    std::atomic<std::uint64_t> value{0};
};

struct Metrics {
    Counter tracking;
    Counter mapping;
};

int main() {
    Metrics metrics;
    std::cout << alignof(Counter) << "\n";
    metrics.tracking.value.fetch_add(1, std::memory_order_relaxed);
    metrics.mapping.value.fetch_add(1, std::memory_order_relaxed);
}

这个例子只展示布局意图。 是否真的更快,要用目标硬件测。

⚠️ 编程陷阱:多线程每个点都写全局 atomic 计数器 错误做法:在百万点的并行循环里,每处理一个点就 global_count.fetch_add(1, relaxed)现象:这个 atomic 成为串行瓶颈,性能反而不如单线程。 根本原因:多个核心频繁写同一个 atomic 会触发 cache line 在核心之间来回迁移(cache line bouncing)。即使是 relaxed,缓存一致性协议的开销也很大。 正确做法:每个线程用局部普通变量计数,处理完一批后再写一次 atomic 或在主线程归并。

💡 概念误区:认为 false sharing 只在"写同一个变量"时发生 新手想法:"两个线程写的是不同的 atomic 变量,不会互相影响。" 实际上:CPU 缓存以 cache line(通常 64 字节)为单位。如果两个不同的 atomic 恰好落在同一条 cache line 上,对任一变量的写入都会导致整条 cache line 在核心之间迁移。逻辑上不共享,硬件上共享——这就是 false sharing。 正确理解:热点 atomic 变量应该用 alignas(hardware_destructive_interference_size) 隔离到不同 cache line。

练习

  1. [分析题]:一个 Metrics 结构体有 4 个 atomic<uint64_t> 字段,分别被 4 个线程高频写入。解释为什么即使逻辑上不共享,性能也会下降。如何用 padding 解决?
  2. [代码题]:编写 benchmark:在紧密循环中 4 个线程分别写入同一结构体的 4 个 atomic 字段。对比有 padding 和无 padding 版本的吞吐量。
  3. [跨章综合题]:C++语言核心/预处理器与宏 的结构体布局和 padding 保证字段对齐;本节的 padding 隔离 cache line。对比两种 padding 的目的、方法和验证方式。

18.8 volatile 不是线程同步 ⭐⭐⭐⭐⭐

工程问题:跨语言经验会误导 C++ 程序员

Java、C# 里的 volatile 有线程可见性语义。 C++ 的 volatile 没有这种语义。 C++ volatile 主要告诉编译器:对这个对象的访问有特殊副作用,不要随意优化掉。 典型用途是内存映射 I/O。

反面失败:volatile bool stop

volatile bool stop = false;

void trackingLoop() {
    while (!stop) {
        processOneFrame();
    }
}

void requestStop() {
    stop = true;
}

这仍然不是线程同步。 volatile 不保证原子性。 不建立 happens-before。 不保证其他线程看到写入。

正确写法:

std::atomic<bool> stop{false};

抽象不变量:volatile 面向特殊内存,atomic 面向线程通信

理解 volatileatomic 的区别,需要追溯到它们各自解决的根本问题。

volatile 解决的问题:编译器的 as-if 规则允许它优化掉"看起来没有用"的内存访问。例如,如果编译器发现一个变量被写入但在当前控制流中从未被读取,它可能把写入优化掉;如果一个变量被连续读取两次且中间没有修改,编译器可能把第二次读取替换为第一次的结果。这些优化在普通内存中完全正确,但在内存映射 I/O(MMIO)中是致命的——硬件寄存器的每次读写都有特殊副作用(读取可能清除中断标志,写入可能启动 DMA 传输),编译器不能优化掉任何一次访问。volatile 就是告诉编译器:"这个对象的每次读写都有你看不到的副作用,不要优化掉任何一次。"

volatile 不能解决的问题volatile 不保证原子性(读写可能被撕裂)、不建立 happens-before 关系(不阻止编译器或 CPU 的指令重排,只阻止优化掉访问本身)、不保证跨线程可见性。C++ 标准明确说:volatile 不是同步机制。

为什么 Java/C# 的 volatile 和 C++ 的不同? 这是跨语言经验最容易误导的地方。Java 的 volatile 在 JMM(Java Memory Model)中有明确的 happens-before 语义——对 volatile 变量的写 happens-before 后续对同一变量的读。C# 的 volatile 类似,有 acquire/release 语义。但 C++ 的 volatile 完全不保证这些——它的语义仅限于"不优化掉访问"。这个差异的根本原因是:Java 和 C# 运行在虚拟机之上,语言可以为 volatile 注入同步指令;C++ 直接编译到硬件,volatile 的语义被设计为最小化——只解决编译器优化问题,把同步的责任完全交给 atomicmutex

工具 面向问题 保证
volatile 编译器不要优化特殊内存访问 每次读写都执行,不保证原子性和可见性
std::atomic 跨线程原子访问和同步 原子性 + 可选的 happens-before
sig_atomic_t 信号处理中的有限安全通信 信号安全的读写,但不保证跨线程同步
mutex 保护临界区和复杂不变量 互斥 + happens-before

FAST-LIO2 这类项目里可能会看到 volatile sig_atomic_t。 如果它用于 signal handler 与主循环之间的通信,属于信号处理语境。 如果用于普通线程之间通信,应改用 atomic。

规则推导:硬件寄存器与线程标志不是同一类问题

硬件寄存器:

volatile std::uint32_t* uart_status =
    reinterpret_cast<volatile std::uint32_t*>(0x40000000);

std::uint32_t status = *uart_status;

线程标志:

std::atomic<bool> stop_requested{false};

前者是“每次访问都必须真的触发内存读写”。 后者是“多个线程之间的读写必须合法并可同步”。

工程边界:不要用 volatile 修复优化级别导致的并发问题

如果 -O0 正常、-O3 出错,很多人会想到加 volatile。 这通常是在掩盖 data race。 正确路径是:

  1. 用 ThreadSanitizer 检查 data race。
  2. 用 atomic 或 mutex 修复共享访问。
  3. 再按性能需要调整内存序。

代码验证:编译能过不代表同步正确

下面代码可能长期看似正常:

volatile bool ready = false;
int data = 0;

void producer() {
    data = 42;
    ready = true;
}

void consumer() {
    while (!ready) {
    }
    use(data);
}

它的问题不是语法。 问题是 ready 没有和 data 建立 happens-before。 把 ready 改成 atomic,并使用 release-acquire 才能表达发布关系。

⚠️ 编程陷阱:用 volatile 修复优化级别导致的并发问题 错误做法-O0 正常、-O3 出错时,给共享变量加 volatile现象:问题可能暂时"消失",但这是在掩盖 data race,不是修复。换编译器版本或平台后可能再次暴露。 根本原因:优化级别改变了指令重排和寄存器缓存策略。volatile 阻止了某些优化,但不建立同步关系。表面"修好"是因为偶然让内存访问变得"足够频繁",不是因为建立了正确的 happens-before。 正确做法:用 ThreadSanitizer 检查 data race → 用 atomic 或 mutex 修复 → 再按需调整内存序。

练习

  1. [分析题]:解释 C++ volatile 和 Java volatile 的语义差异。为什么 C++ 选择不给 volatile 加线程同步语义?
  2. [代码题]:编写一个用 volatile bool stop 的程序(反例),用 ThreadSanitizer 检测。改为 atomic<bool> 后验证 data race 消失。
  3. [跨章综合题]:线程管理与互斥同步 也有 volatile 小节。总结本课程中出现过的"看起来能工作但标准层面错误"的模式:volatile 线程标志、无锁 deque push/pop、C 风格转换。它们的共同特征是什么?

18.9 SPSC Ring Buffer:限定拓扑带来的简单无锁结构 ⭐⭐⭐⭐

SPSC Ring Buffer 的理论基础:拓扑约束如何简化同步

SPSC(Single Producer Single Consumer)环形缓冲区是无锁数据结构中最简单、最可靠的一种。它的简单性不是偶然的——它来自一个关键的拓扑约束:只有一个线程写 tail 索引,只有一个线程写 head 索引。这个约束从根本上消除了 CAS 竞争、ABA 问题和复杂的内存回收需求。

为什么单生产者单消费者如此特殊? 在 MPMC(多生产者多消费者)场景中,多个生产者可能同时尝试写入同一个槽位——它们需要用 CAS 竞争 tail 索引的递增权,失败者必须重试。多个消费者可能同时尝试读取同一个元素——它们需要用 CAS 竞争 head 索引的递增权。CAS 竞争引入了重试循环,重试循环引入了活锁风险(所有线程都在重试,没有线程成功推进)。

在 SPSC 中,tail 只有一个写者——不需要 CAS,普通的 atomic store 就够了。head 只有一个写者——同理。生产者写 tail 后用 release 发布,消费者读 tail 时用 acquire 获取——这建立了生产者写入的数据对消费者可见的 happens-before 链。反方向同理:消费者写 head 后用 release,生产者读 head 时用 acquire——知道哪些槽位已经被消费、可以重用。

环形缓冲区的数学模型。 环形缓冲区可以看作一个固定大小的数组加上两个"滑动窗口"指针:head(消费者的读取位置)和 tail(生产者的写入位置)。有效数据位于 [head, tail) 区间(模数组大小取模)。空间大小是 capacity - (tail - head)。当 tail == head 时队列为空;当 tail - head == capacity - 1 时队列满(故意留一个空槽来区分"满"和"空"——如果不留空槽,tail == head 既可以表示空也可以表示满,产生二义性)。

为什么需要留一个空槽? 这是环形缓冲区设计中一个容易被忽视但至关重要的细节。如果允许 tail 追上 head(即填满所有 capacity 个槽位),那么"满"和"空"的判断条件都变成 tail == head——无法区分。替代方案是使用一个额外的 size 计数器,但这个计数器需要两个线程同时修改(生产者递增、消费者递减),又引入了 CAS 竞争——正是 SPSC 设计要避免的。留一个空槽是最简洁的解决方案:浪费一个元素的空间,换取完全无竞争的索引判断。

SPSC 在机器人系统中的典型拓扑。 机器人系统中很多数据流天然是单生产者单消费者的:

数据流 生产者 消费者
IMU 原始数据 IMU 驱动回调 预积分线程
去畸变点云 LiDAR 驱动回调 Tracking 线程
控制命令 规划器线程 执行器驱动线程
最新位姿 Tracking 线程 可视化线程

只要确认拓扑是 1:1 的,SPSC 就比 mutex + queue 更轻量。但如果拓扑可能变化(如多个传感器回调写入同一队列),必须切换到 mutex 保护或 MPSC 队列。错误地把 SPSC 用在多生产者场景中,是最危险的无锁 bug 之一——因为它可能在低负载下完全正常,只在高并发时偶发数据损坏。

工程问题:IMU 高频数据不适合每条都阻塞

IMU 可能 200 Hz、400 Hz 甚至更高。 LiDAR 线程或融合线程只需要按顺序取走数据。 如果拓扑明确是单生产者、单消费者,可以用 SPSC ring buffer。

IMU callback thread  --->  ring buffer  --->  fusion thread
     producer                                  consumer

SPSC 的前提很强:

  1. 只有一个生产者写 tail。
  2. 只有一个消费者写 head。
  3. 两者都可以读对方的索引。
  4. 缓冲区元素生命周期清楚。

反面失败:把 SPSC 用成 MPSC

如果两个 ROS 回调线程都写同一个 SPSC 队列,tail 就有两个写者。 原来的无锁证明失效。 即使 tail 是 atomic,也不能自动修好所有元素写入顺序和槽位所有权。

多生产者应使用:

  1. mutex + queue。
  2. MPSC 专用队列。
  3. 每个生产者一个 SPSC,再由消费者合并。

抽象不变量:head 只由消费者写,tail 只由生产者写

SPSC 的核心不变量:

head: 下一个要读的位置,只由 consumer 写
tail: 下一个要写的位置,只由 producer 写
buffer[i]: 只有生产者写入后,消费者才能读取

发布数据时,生产者先写元素,再 release store 更新 tail。 消费数据时,消费者 acquire load tail,确认有元素后读取。

规则推导:容量必须留一个空槽

环形缓冲区常用一个空槽区分满和空:

empty: head == tail
full : next(tail) == head

如果容量是 N,实际可存 N - 1 个元素。 这比额外维护一个 shared size 更简单。

教学版实现

#include <array>
#include <atomic>
#include <cstddef>
#include <optional>
#include <utility>

template <typename T, std::size_t Capacity>
class SpscRingBuffer {
public:
    static_assert(Capacity >= 2);

    bool push(const T& value) {
        const std::size_t tail = tail_.load(std::memory_order_relaxed);
        const std::size_t next = increment(tail);

        if (next == head_.load(std::memory_order_acquire)) {
            return false;
        }

        buffer_[tail] = value;
        tail_.store(next, std::memory_order_release);
        return true;
    }

    std::optional<T> pop() {
        const std::size_t head = head_.load(std::memory_order_relaxed);

        if (head == tail_.load(std::memory_order_acquire)) {
            return std::nullopt;
        }

        T value = buffer_[head];
        head_.store(increment(head), std::memory_order_release);
        return value;
    }

private:
    static constexpr std::size_t increment(std::size_t index) {
        return (index + 1) % Capacity;
    }

    std::array<T, Capacity> buffer_{};
    std::atomic<std::size_t> head_{0};
    std::atomic<std::size_t> tail_{0};
};

这个实现适合可默认构造、可拷贝的教学类型。 工业实现要处理非平凡对象生命周期、缓存对齐、批量读写和等待策略。

工程边界:满了怎么办比无锁本身更重要

IMU 队列满了,有几种策略:

策略 行为 适合场景
丢弃新数据 保留旧历史 需要完整时间窗
覆盖旧数据 保留最新状态 控制命令、可视化
阻塞生产者 不丢数据 非实时线程
扩容 暂时缓冲峰值 非实时、内存充足
报警并降级 系统自检 实时系统

不要只说“队列无锁”。 要说清楚数据丢弃策略。 (「实时约束与高性能数据传递」的 20.4 节从数据通道的角度给出了另一张表——把 IMU、最新状态、控制命令、安全事件等具体通道分别映射到满队列策略,可与这里的按策略分类对照阅读。)

代码验证:单生产者单消费者

#include <atomic>
#include <cassert>
#include <thread>
#include <vector>

int main() {
    SpscRingBuffer<int, 1024> queue;
    std::atomic<bool> done{false};
    std::vector<int> received;
    received.reserve(1000);

    std::thread producer([&] {
        for (int i = 1; i <= 1000; ++i) {
            while (!queue.push(i)) {
            }
        }
        done.store(true, std::memory_order_release);
    });

    std::thread consumer([&] {
        while (!done.load(std::memory_order_acquire)) {
            while (auto value = queue.pop()) {
                received.push_back(*value);
            }
        }
        while (auto value = queue.pop()) {
            received.push_back(*value);
        }
    });

    producer.join();
    consumer.join();
    assert(received.size() == 1000);
    for (int i = 0; i < 1000; ++i) {
        assert(received[static_cast<std::size_t>(i)] == i + 1);
    }
}

这个测试逐项验证顺序数据没有丢、没有重复、没有重排。 只验证总和不够,因为 1,2,31,1,4 的总和相同,却代表完全不同的队列行为。 但它不验证多生产者。 SPSC 的名字就是接口契约。

⚠️ 编程陷阱:把 SPSC 队列用在多生产者场景 错误做法:两个 ROS 回调线程同时向同一个 SpscRingBuffer 写入 IMU 数据。 现象:偶发数据丢失、乱序或崩溃。问题在高负载下更频繁。 根本原因:SPSC 的无锁证明建立在"tail 只有一个写者"的前提上。两个生产者同时 push 会并发修改 tail 和缓冲区元素,破坏所有不变量。 正确做法:多生产者用 mutex + queue、MPSC 队列、或每个生产者一个 SPSC。

💡 概念误区:认为环形缓冲区"满时应该扩容" 新手想法:"队列满了就像 vector 满了一样,自动扩容就行。" 实际上:SPSC ring buffer 的容量是编译期固定的(模板参数),运行时不能扩容。而且实时系统中,动态内存分配是延迟和抖动的来源。满队列的正确处理取决于业务语义——丢弃新数据、覆盖旧数据、阻塞生产者或报警降级。 正确理解:SPSC ring buffer 的容量选择是设计决策,不是运行时自适应。容量应根据生产者和消费者的速率差异和可接受的数据延迟来确定。

练习

  1. [分析题]:SPSC ring buffer 用"空一个槽"区分满和空。解释为什么不用一个 shared size 变量——这会引入什么同步问题?
  2. [代码题]:为 SpscRingBuffer 编写压力测试:生产者连续写入 10000 个递增整数,消费者逐个读取并验证顺序。用 ThreadSanitizer 验证无 data race。
  3. [跨章综合题]:线程管理与互斥同步 的 deque + mutex 传感器缓冲和本节的 SPSC ring buffer 都用于 IMU 数据传递。对比两者在拓扑假设、性能特征、满队列处理和时间窗裁剪上的差异。

18.10 最新状态发布:双缓冲直觉与可验证实现 ⭐⭐⭐⭐

工程问题:可视化、控制、重定位常常只需要最新状态

有些模块不需要每一帧历史。 它们只需要“当前最新位姿”:

模块 需要完整历史吗 常见需求
RViz 可视化 不需要 最新位姿即可
控制器 通常不需要全部历史 最新状态与时间戳
回环检测 需要关键帧历史 不能只用最新
后端优化 需要图结构 不能只用最新
健康监控 需要统计窗口 最近一段历史

最新状态发布很容易让人想到双缓冲。 双缓冲的直觉是对的:读者读当前快照,写者准备下一份快照,写完后一次性切换。 但在 C++ 线程模型里,仅靠一个原子索引还不够。 如果写者连续发布太快,就可能复用读者刚刚加载但尚未复制完成的缓冲区。 完整历史仍应使用队列、数据库或图结构。

反面失败 1:用一个 atomic 索引发布正在写的对象

错误流程:

writer 选择 buffer[1]
writer 发布 active_index = 1
writer 继续写 buffer[1].yaw
reader 读 active_index = 1
reader 读到半写状态

发布索引必须发生在完整写入之后。 如果写者可能很快复用读者正在读的缓冲区,还要考虑读者生命周期。

反面失败 2:写者追上读者

更隐蔽的问题发生在连续发布时:

active_index = 0
reader 读取 active_index,准备复制 buffer[0]
writer 写完 buffer[1],发布 active_index = 1
writer 下一次准备写 buffer[0]
reader 仍在复制 buffer[0]

这时 release/acquire 只能保证“读者看到索引之后,能看到索引发布前的写入”。 它不能自动告诉写者:某个读者还在使用旧缓冲区。 如果两个线程同时访问同一份非 atomic State,一个读、一个写,仍然是 data race。

抽象不变量:状态发布必须同时解决两个问题

最新状态发布有两个不变量:

  1. 读者看到的状态必须来自同一次发布。
  2. 写者不能修改读者正在读取的对象。

第一个不变量是可见性问题。 第二个不变量是生命周期问题。 很多错误实现只处理了可见性,却没有处理生命周期。

教学版实现:不可变快照 + atomic shared_ptr

C++20 提供 std::atomic<std::shared_ptr<T>>。 它适合教学和中低频状态发布,因为读者拿到的是不可变对象的引用计数快照。 只要读者还持有 shared_ptr,写者就不会修改这份对象。

#include <atomic>
#include <cstdint>
#include <memory>

struct State {
    double x = 0.0;
    double y = 0.0;
    double yaw = 0.0;
    std::uint64_t frame_id = 0;
};

class StatePublisher {
public:
    void publish(const State& state) {
        auto snapshot = std::make_shared<const State>(state);
        current_.store(snapshot, std::memory_order_release);
    }

    State load() const {
        auto snapshot = current_.load(std::memory_order_acquire);
        if (!snapshot) {
            return {};
        }
        return *snapshot;
    }

private:
    std::atomic<std::shared_ptr<const State>> current_{};
};

这个实现假设:

  1. 单写者。
  2. State 拷贝成本可接受。
  3. 发布频率不高到让堆分配成为瓶颈。
  4. 不要求读者读到每一次发布。

工程边界:硬实时线程不应在主循环中频繁分配

atomic shared_ptr 的优势是语义清楚。 它的代价也很明确:

  1. make_shared 会分配内存。
  2. 引用计数更新通常需要原子操作。
  3. 析构时机由最后一个读者决定。

因此它不适合 1 kHz 控制循环里的每周期发布。 在硬实时路径里,常见替代方案是:

  1. mutex 保护的小对象拷贝,并验证最坏持锁时间。
  2. 预分配 SPSC 队列。
  3. 三缓冲或更多缓冲,并配套读者占用协议。
  4. 所有字段都用 atomic,并用版本号验证一致性。
  5. 把状态发布移出硬实时线程。

seqlock 直觉

seqlock 用版本号验证读者是否读到一致快照:

writer: version += 1  // 奇数,写入中
writer: 写数据
writer: version += 1  // 偶数,写入完成

reader: v1 = version
reader: 读数据
reader: v2 = version
若 v1 == v2 且 v1 为偶数,则快照一致

seqlock 适合单写者、多读者、读者可重试的场景。 如果写者太频繁,读者可能一直重试。 实时读者也要谨慎。 在标准 C++ 里,如果 seqlock 保护的是普通非 atomic 字段,还要小心 data race。 教学上更稳妥的方式,是把它理解为“版本验证思想”,不要直接把内核风格 seqlock 照搬到普通 C++ 对象上。

代码验证:最新状态发布

#include <atomic>
#include <cassert>
#include <cstdint>
#include <memory>
#include <thread>

int main() {
    StatePublisher publisher;
    std::atomic<bool> done{false};

    std::thread writer([&] {
        for (std::uint64_t i = 1; i <= 1000; ++i) {
            publisher.publish(State{
                static_cast<double>(i),
                static_cast<double>(i + 1),
                0.1,
                i});
        }
        done.store(true, std::memory_order_release);
    });

    std::thread reader([&] {
        std::uint64_t last = 0;
        while (!done.load(std::memory_order_acquire)) {
            State state = publisher.load();
            assert(state.frame_id >= last);
            last = state.frame_id;
        }
    });

    writer.join();
    reader.join();
}

这个测试只适合单写者。 如果多个线程发布状态,需要额外同步。

本质洞察:最新状态发布的核心困难不是"让读者看到新值"(可见性),而是"不让写者修改读者正在读的对象"(生命周期)。很多实现只处理了可见性(用 release-acquire 发布索引),却忽略了生命周期(写者可能追上读者复用同一缓冲区)。不可变快照 + atomic<shared_ptr> 同时解决了两个问题——读者拿到 shared_ptr 后,即使写者发布新快照,旧快照也不会被销毁。

⚠️ 编程陷阱:双缓冲中写者追上读者导致 data race 错误做法:写者写完 buffer[1] 后发布 active_index = 1,然后立刻开始写 buffer[0]。如果读者刚读到旧的 active_index = 0 正在复制 buffer[0],写者和读者同时访问 buffer[0]。 现象:读者偶尔读到半写状态。在低发布频率下正常,高频率发布时出问题。 根本原因:release-acquire 只保证"读者看到索引后能看到索引发布前的写入"。它不告诉写者"有没有读者还在用旧缓冲区"。 正确做法:使用不可变快照 + atomic<shared_ptr>(读者引用计数保护);或使用三缓冲 + 读者占用协议;或接受 mutex 的简单语义。

练习

  1. [分析题]atomic<shared_ptr<const State>>publish() 每次调用 make_shared。这在 1kHz 控制循环中可行吗?讨论堆分配对实时性的影响以及替代方案。
  2. [代码题]:实现 StatePublisher 类,编写一个写者线程发布 1000 个递增状态、一个读者线程持续读取最新状态的测试。验证读者看到的 frame_id 单调递增。
  3. [跨章综合题]:线程管理与互斥同步 的 shared_mutex + 快照和本节的 atomic<shared_ptr> 都能发布最新状态。对比两种方案的线程安全性、性能特征和代码复杂度。什么场景下选哪个?

18.11 机器人场景中的原子模式 ⭐⭐⭐

工程问题:同一种 atomic 在不同模块里语义不同

机器人项目常见 atomic 用法:

模式 示例 内存序
停止标志 stop_requested relaxed 或 release-acquire
统计计数 processed_frames relaxed
模式切换 Mode::Tracking -> Shutdown acq_rel
发布配置 ready 标志 release-acquire
最新索引 快照指针或缓冲索引 release-acquire
版本号 seqlock version acquire/release
自旋锁 atomic_flag acquire/release

同样是 atomic<bool>,语义可以完全不同。 变量名和注释必须写出它同步什么。

停止标志

如果停止标志不发布其他数据:

std::atomic<bool> stop{false};

bool shouldStop() {
    return stop.load(std::memory_order_relaxed);
}

void requestStop() {
    stop.store(true, std::memory_order_relaxed);
}

如果停止请求前还写了某个 shutdown reason,希望工作线程读取:

ShutdownReason reason;
std::atomic<bool> stop{false};

void requestStop(ShutdownReason r) {
    reason = r;
    stop.store(true, std::memory_order_release);
}

void shutdownAwareLoop() {
    while (!stop.load(std::memory_order_acquire)) {
        runOnce();
    }
    handle(reason);
}

帧计数器

std::atomic<std::uint64_t> frames{0};

void onFrame() {
    frames.fetch_add(1, std::memory_order_relaxed);
}

帧计数器通常用于监控。 不要用它暗示“第 N 帧数据已完整发布”。

状态模式

enum class SlamMode : int {
    Initializing,
    Tracking,
    Lost,
    Shutdown
};

std::atomic<SlamMode> mode{SlamMode::Initializing};

模式切换涉及状态机规则。 用 CAS 可以避免非法覆盖:

bool tryEnterTracking() {
    SlamMode expected = SlamMode::Initializing;
    return mode.compare_exchange_strong(
        expected,
        SlamMode::Tracking,
        std::memory_order_acq_rel);
}

自旋锁

atomic_flag 可以实现最小自旋锁:

class SpinLock {
public:
    void lock() {
        while (flag_.test_and_set(std::memory_order_acquire)) {
        }
    }

    void unlock() {
        flag_.clear(std::memory_order_release);
    }

private:
    std::atomic_flag flag_ = ATOMIC_FLAG_INIT;
};

这能兼容 std::lock_guard<SpinLock>。 但自旋锁会占用 CPU。 只适合极短临界区和明确不阻塞的环境。 不要用它包 I/O、内存分配、日志或可能等待的操作。

工程边界:实时控制线程里更要谨慎

实时线程常见目标是低抖动。 atomic 比 mutex 更可控,但不等于零成本。 高频写同一个 atomic 会产生缓存一致性流量。 自旋锁在竞争时会烧 CPU。 CAS 循环在竞争时可能重试。

实时路径优先考虑:

  1. 单写者数据流。
  2. 预分配缓冲区。
  3. SPSC 队列。
  4. 双缓冲。
  5. 批量发布。
  6. 低频诊断统计。

代码验证:模式切换与停止标志组合

#include <atomic>
#include <cassert>

enum class Mode : int {
    Initializing,
    Tracking,
    Shutdown
};

class RuntimeState {
public:
    bool tryStart() {
        Mode expected = Mode::Initializing;
        return mode_.compare_exchange_strong(
            expected,
            Mode::Tracking,
            std::memory_order_acq_rel);
    }

    void requestShutdown() {
        stop_.store(true, std::memory_order_relaxed);
        mode_.store(Mode::Shutdown, std::memory_order_release);
    }

    bool shouldStop() const {
        return stop_.load(std::memory_order_relaxed);
    }

    Mode mode() const {
        return mode_.load(std::memory_order_acquire);
    }

private:
    std::atomic<bool> stop_{false};
    std::atomic<Mode> mode_{Mode::Initializing};
};

int main() {
    RuntimeState state;
    assert(state.tryStart());
    state.requestShutdown();
    assert(state.shouldStop());
    assert(state.mode() == Mode::Shutdown);
}

这里 stop_ 是快速退出标志。 mode_ 用 release-acquire 表达状态可见性。

⚠️ 编程陷阱:自旋锁内做 I/O、内存分配或长计算 错误做法:用 atomic_flag 实现自旋锁,然后在自旋锁保护下写日志、分配内存或执行优化。 现象:持锁线程被操作系统调度走后,等待线程空转 CPU 直到持锁线程被调度回来。系统整体吞吐下降,功耗上升。 根本原因:自旋锁期间等待线程忙等,消耗 CPU 资源。只有在"持锁时间极短且确定"的场景(如修改几个字段)下自旋锁才合理。 正确做法:临界区内容超过几十纳秒时,使用 std::mutex(它会让等待线程睡眠而非空转)。

💡 概念误区:认为同一个 atomic<bool> 总该用同一种内存序 新手想法:"stop 标志全部用 relaxed 就好。" 实际上:如果 requestStop() 之前还写了 shutdown reason,并希望工作线程在看到 stop == true 后能读到 reason,stop 的 store 就需要 release,load 需要 acquire。同一个变量在不同使用场景下语义不同,内存序也不同。 正确理解:每次使用 atomic 操作时,问"这个操作是否承担发布其他数据的职责?"

练习

  1. [分析题]:对照本节的表格,为 SLAM 系统中的以下需求选择 atomic 模式和内存序:(a) 帧计数器;(b) 停止标志(不发布数据);(c) 停止标志(发布 shutdown reason);(d) SLAM 模式切换。
  2. [代码题]:实现 RuntimeState 类,包含停止标志、模式状态机(CAS 切换)和帧计数器。编写单元测试验证状态转换的互斥性。
  3. [跨章综合题]:综合 线程管理与互斥同步(mutex/条件变量/队列)和 原子操作与内存模型(atomic/CAS/SPSC),为一个完整的 Mini SLAM 流水线设计同步架构:哪些用 mutex,哪些用 atomic,哪些用条件变量。画出数据流图和同步点标注。

18.12 ThreadSanitizer 与最小并发测试 ⭐⭐⭐⭐

工程问题:并发 bug 很难靠肉眼覆盖

并发 bug 通常具有这些特征:

特征 结果
依赖时序 本地很难复现
受优化级别影响 Debug 正常,Release 出错
受硬件影响 x86 正常,ARM 出错
受负载影响 小数据正常,大数据出错
日志改变时序 加日志后 bug 消失

因此要用工具。

ThreadSanitizer

编译:

g++ -std=c++20 -fsanitize=thread -O1 -g test.cpp -o test_tsan

运行:

./test_tsan

TSan 能发现很多 data race。 它不能证明程序没有所有并发逻辑错误。 它也可能因为第三方库或自定义同步原语产生误报。

反面失败:只测最终结果

下面测试不够:

assert(queue.size() == expected);

并发测试应同时看:

  1. 是否有 data race。
  2. 是否可能死锁。
  3. 是否能正常停止。
  4. 是否在压力下保持顺序。
  5. 是否在队列满时执行预期策略。
  6. 是否在多次重复运行中稳定。

抽象不变量:测试应覆盖同步协议,而不只覆盖业务输出

对 SPSC 队列,应该验证:

不变量 测试
单调顺序 生产 1..N,消费也应递增
不丢数据 求和或计数匹配
满队列策略 填满后 push 返回 false
空队列策略 空时 pop 返回 nullopt
停止协议 done 后能排空剩余数据
工具检查 TSan 无 data race

代码验证:压力测试骨架

#include <atomic>
#include <cassert>
#include <thread>

void testSpscOrder() {
    SpscRingBuffer<int, 256> queue;
    std::atomic<bool> done{false};
    constexpr int N = 10000;

    std::thread producer([&] {
        for (int i = 1; i <= N; ++i) {
            while (!queue.push(i)) {
            }
        }
        done.store(true, std::memory_order_release);
    });

    std::thread consumer([&] {
        int expected = 1;
        while (!done.load(std::memory_order_acquire) || expected <= N) {
            if (auto value = queue.pop()) {
                assert(*value == expected);
                ++expected;
            }
        }
    });

    producer.join();
    consumer.join();
}

这个测试仍然不是完整证明。 但它覆盖了顺序、停止和排空。 配合 TSan 能发现很多实现错误。

⚠️ 编程陷阱:并发测试只检查最终结果不检查过程 错误做法assert(queue.size() == expected) 只看最终计数。 现象:测试通过但实际有数据乱序、重复消费或中间状态不一致。1+2+3 和 1+1+4 的总和相同,但代表完全不同的队列行为。 根本原因:并发 bug 往往体现在过程而非结果。只检查最终状态的测试覆盖率极低。 正确做法:逐项验证顺序(received[i] == i+1)、检查无重复无遗漏、配合 ThreadSanitizer 检查 data race。

🧠 思维陷阱:认为"本地跑了一万次没问题就是正确的" 新手想法:"我在本地跑了一万次测试都通过了,代码应该没有并发问题。" 实际上:并发 bug 依赖时序、优化级别、硬件平台和系统负载。某些 bug 只在 ARM 上出现,某些只在 -O2 以上出现,某些只在高负载下出现。用 ThreadSanitizer 能在编译时插桩,大幅提高检测覆盖率。 正确思维:使用 ThreadSanitizer (-fsanitize=thread) 作为常规 CI 步骤。它能发现大多数 data race,但不能发现所有逻辑竞态。

练习

  1. [分析题]:列出并发测试应覆盖的 6 个维度(data race、deadlock、正常停止、压力下顺序、满队列策略、多次运行稳定性)。对每个维度给出具体的测试方法。
  2. [代码题]:为 SpscRingBuffer 编写 ThreadSanitizer 测试。故意引入一个 bug(如去掉 release-acquire),验证 TSan 能检测到。
  3. [跨章综合题]:C++语言核心/预处理器与宏 用 -Enm -C 验证宏和链接层面的正确性;线程管理与互斥同步-18 用 ThreadSanitizer 验证并发正确性。总结本课程中出现过的所有"验证工具→问题域"映射,形成工程师的调试工具箱。

18.13 C++20/23/26 并发新特性展望 ⭐⭐⭐

为什么要关注标准演进

C++ 并发标准从 C++11 引入线程和 atomic 以来,每个版本都在解决上一版遗留的工程痛点。C++20 的 jthread/stop_token 解决了线程停止协议的样板代码问题(线程管理与互斥同步 已详细讨论)。C++20 还引入了 std::atomic<std::shared_ptr<T>>,解决了并发环境下智能指针的安全发布。这些特性不是"锦上添花"——它们直接改善了机器人系统中高频出现的并发模式的安全性和可读性。

C++23 和 C++26 的并发提案正在解决更底层的问题:无锁数据结构中的安全内存回收。这些特性对于编写高性能传感器通道、状态发布器和无锁数据结构具有实际工程价值。

std::atomic<std::shared_ptr<T>>:安全的并发智能指针(C++20) ⭐⭐⭐

本章 18.1 节讨论过:要发布一个完整的位姿快照,不能把每个字段分别做成 atomic。一个干净的方案是用不可变快照加原子指针交换。C++20 提供了 std::atomic<std::shared_ptr<T>> 让这个模式有了标准支持:

// C++20:原子地发布不可变快照
std::atomic<std::shared_ptr<const RobotState>> latest_state;

// 发布者线程
void publish(double x, double y, double yaw) {
    auto snapshot = std::make_shared<const RobotState>(RobotState{x, y, yaw});
    latest_state.store(snapshot, std::memory_order_release);
}

// 消费者线程
RobotState read() {
    auto snapshot = latest_state.load(std::memory_order_acquire);
    return snapshot ? *snapshot : RobotState{};
}

这个模式的关键是:快照对象创建后不可变(const),发布者创建新快照并原子替换指针,消费者读到的始终是一个完整的、不会被修改的状态。shared_ptr 的引用计数确保旧快照在所有消费者读完后才被释放。

如果不使用 atomic<shared_ptr>,直接在多线程中复制或修改 shared_ptr 是 data race——因为 shared_ptr 的拷贝需要原子递增引用计数和复制指针两步,这两步在普通 shared_ptr 上不是原子的。C++20 之前的替代方案是 std::atomic_load(&sp)std::atomic_store(&sp, new_sp)(自由函数版本),但语法不如成员函数直观。

Hazard Pointer:安全的无锁内存回收(C++26 提案) ⭐⭐⭐⭐

18.7 节讨论了 ABA 问题:当一个无锁数据结构使用 CAS 操作时,另一个线程可能在 CAS 检查之间删除并重新分配同一地址的节点,导致 CAS 成功但语义错误。ABA 问题的根源是**安全内存回收**——什么时候可以安全地释放一个从无锁结构中移除的节点?

Hazard pointer(风险指针)是 Maged Michael 2004 年提出的解决方案。其核心思想是:每个读取无锁结构的线程在读取节点之前,先把该节点的地址登记到一个全局可见的"风险指针"列表中。当写者想释放一个节点时,它先检查所有线程的风险指针——如果有任何线程正在使用这个节点,就推迟释放。

读者视角:
  1. 把目标节点地址写入自己的 hazard pointer slot
  2. 验证节点仍在结构中(防止在注册前被移除)
  3. 安全读取节点内容
  4. 清除 hazard pointer slot

写者视角:
  1. 从结构中原子移除节点
  2. 把节点加入"待回收列表"
  3. 定期扫描所有线程的 hazard pointer
  4. 释放不在任何 hazard pointer 中的节点

C++26 的 std::hazard_pointer 提案(P2530)将这个模式标准化。对于机器人系统中的无锁传感器通道、无锁状态发布器,hazard pointer 提供了一种不依赖 GC、延迟确定的安全回收机制。不过,绝大多数机器人工程代码不需要手写无锁结构——使用经过验证的库(如 Folly、Concurrency Kit)或坚持 mutex + 条件变量是更务实的选择。

RCU(Read-Copy-Update):读优化的并发更新(C++26 提案) ⭐⭐⭐⭐

RCU 是 Linux 内核中广泛使用的并发原语,由 Paul McKenney 系统化。它的核心思想是**读操作零开销,写操作承担全部同步成本**。这与读写锁(shared_mutex)的区别在于:读写锁的读操作仍需要获取共享锁(涉及原子操作),而 RCU 的读操作完全无同步开销。

RCU 的工作方式:

  1. 读者:进入"RCU 读侧临界区"(几乎零成本),直接读取共享数据。
  2. 写者:创建数据的新副本,修改副本,然后原子地用新副本替换旧副本的指针。
  3. 回收:写者等待所有已经进入读侧临界区的读者退出(称为"宽限期"),然后释放旧副本。

RCU 特别适合"读极多、写极少"的场景。在 SLAM 系统中,配置参数、校准矩阵、词袋数据库这类几乎不变的共享数据非常适合 RCU 模式——几十个线程可以无开销地读取,偶尔的更新由写者独自承担复制和同步成本。

C++26 的 RCU 提案(P2545)将引入 std::rcu_readerstd::synchronize_rcu()。在标准化之前,可以用 atomic<shared_ptr<const T>> + release-acquire 模拟类似效果——本章 18.1 节的 LatestStatePublisher 本质上就是一个简化版的 RCU 模式。

技术 读开销 写开销 适用场景 C++ 标准状态
shared_mutex 获取共享锁(原子操作) 获取独占锁 读多写少,需精确一致性 C++17 已有
atomic<shared_ptr> load + 引用计数 store + 旧快照回收 发布不可变快照 C++20 已有
Hazard Pointer 注册/注销指针 扫描风险指针列表 无锁结构节点回收 C++26 提案
RCU 几乎零 复制+等待宽限期 读极多写极少 C++26 提案

本质洞察:从 shared_mutexatomic<shared_ptr> 到 Hazard Pointer 到 RCU,是一条"把同步成本从读者转移到写者"的技术演进路线。每一步都让读操作更轻量,代价是写操作更复杂、回收延迟更大。选择哪个点取决于你的读写比例和延迟预算。

练习

  1. [分析题]atomic<shared_ptr<const T>> 的发布模式和 RCU 的读-复制-更新模式有什么相似之处和关键区别?在什么场景下 atomic<shared_ptr> 已经足够,不需要真正的 RCU?
  2. [代码题]:使用 std::atomic<std::shared_ptr<const Config>> 实现一个配置热加载器:主线程读取新配置文件并原子发布,工作线程无锁读取当前配置。验证在持续读取的同时更新配置不会导致 data race。
  3. [跨章综合题]:线程管理与互斥同步 用 shared_mutex 实现读写分离;本节介绍了 atomic<shared_ptr> 和 RCU 两种替代方案。对于 SLAM 地图的"Tracking 读 + Mapping 写"场景,画出三种方案的适用条件决策图。

18.14 累积项目:Mini SLAM Atomic State Layer ⭐⭐⭐⭐

项目目标

在 线程管理与互斥同步 的 Mini SLAM Concurrent Pipeline 上增加一层原子状态发布。 目标不是写最复杂的无锁库,而是让读者把 atomic 用在恰当的位置。

新增模块:

模块 作用
AtomicRuntimeState 停止标志、模式状态、帧计数
LatestStatePublisher 不可变快照发布最新位姿
SpscRingBuffer<T,N> 单生产者单消费者 IMU 通道
AtomicMetrics relaxed 统计计数
tsan_tests.cpp data race 和协议测试

AtomicRuntimeState

#pragma once

#include <atomic>
#include <cstdint>

enum class PipelineMode : int {
    Starting,
    Tracking,
    Mapping,
    Shutdown
};

class AtomicRuntimeState {
public:
    bool tryEnterTracking() {
        PipelineMode expected = PipelineMode::Starting;
        return mode_.compare_exchange_strong(
            expected,
            PipelineMode::Tracking,
            std::memory_order_acq_rel);
    }

    void requestStop() {
        stop_.store(true, std::memory_order_relaxed);
        mode_.store(PipelineMode::Shutdown, std::memory_order_release);
    }

    bool shouldStop() const {
        return stop_.load(std::memory_order_relaxed);
    }

    PipelineMode mode() const {
        return mode_.load(std::memory_order_acquire);
    }

    void countFrame() {
        frames_.fetch_add(1, std::memory_order_relaxed);
    }

    std::uint64_t frames() const {
        return frames_.load(std::memory_order_relaxed);
    }

private:
    std::atomic<bool> stop_{false};
    std::atomic<PipelineMode> mode_{PipelineMode::Starting};
    std::atomic<std::uint64_t> frames_{0};
};

LatestStatePublisher

#pragma once

#include <atomic>
#include <cstdint>
#include <memory>

struct LatestState {
    double x = 0.0;
    double y = 0.0;
    double yaw = 0.0;
    std::uint64_t frame_id = 0;
};

class LatestStatePublisher {
public:
    void publish(const LatestState& state) {
        auto snapshot = std::make_shared<const LatestState>(state);
        current_.store(snapshot, std::memory_order_release);
    }

    LatestState load() const {
        auto snapshot = current_.load(std::memory_order_acquire);
        if (!snapshot) {
            return {};
        }
        return *snapshot;
    }

private:
    std::atomic<std::shared_ptr<const LatestState>> current_{};
};

AtomicMetrics

#pragma once

#include <atomic>
#include <cstdint>
#include <new>

struct alignas(std::hardware_destructive_interference_size) AtomicCounter {
    std::atomic<std::uint64_t> value{0};
};

class AtomicMetrics {
public:
    void countImu() {
        imu_.value.fetch_add(1, std::memory_order_relaxed);
    }

    void countLidar() {
        lidar_.value.fetch_add(1, std::memory_order_relaxed);
    }

    std::uint64_t imu() const {
        return imu_.value.load(std::memory_order_relaxed);
    }

    std::uint64_t lidar() const {
        return lidar_.value.load(std::memory_order_relaxed);
    }

private:
    AtomicCounter imu_;
    AtomicCounter lidar_;
};

集成示例

#include <cassert>
#include <thread>

int main() {
    AtomicRuntimeState runtime;
    LatestStatePublisher latest;
    AtomicMetrics metrics;

    assert(runtime.tryEnterTracking());

    std::thread tracking([&] {
        for (std::uint64_t i = 1; i <= 1000; ++i) {
            runtime.countFrame();
            metrics.countLidar();
            latest.publish(LatestState{
                static_cast<double>(i),
                static_cast<double>(i + 1),
                0.01 * static_cast<double>(i),
                i});
        }
        runtime.requestStop();
    });

    std::thread monitor([&] {
        while (!runtime.shouldStop()) {
            LatestState state = latest.load();
            (void)state;
        }
    });

    tracking.join();
    monitor.join();

    assert(runtime.frames() == 1000);
    assert(metrics.lidar() == 1000);
    assert(runtime.mode() == PipelineMode::Shutdown);
}

验收标准

验收项 标准
停止协议 所有线程能退出并 join
帧计数 frames == produced frames
最新状态 load() 返回完整快照,不返回引用
内存序 停止计数 relaxed,发布状态 release-acquire
SPSC 明确单生产者单消费者假设
测试 TSan 运行不报告 data race
文档 每个 atomic 变量说明同步语义

进阶练习

  1. LatestStatePublisher 改成预分配三缓冲,比较它与不可变快照的生命周期差异。
  2. SpscRingBuffer 增加“满时覆盖旧数据”的模式,并说明适合哪些模块。
  3. std::atomic<std::shared_ptr<const Config>> 发布不可变配置,比较配置发布与状态发布的相同点。
  4. 用 ThreadSanitizer 故意验证一个普通 bool stop 的 data race。
  5. 在 ARM 或 Jetson 上运行 release-acquire 示例,观察不同优化级别下的表现。

本章小结

本章主线是:atomic 不是更短的 mutex,而是 C++ 内存模型中的同步原语。

主题 核心判断
data race 普通变量跨线程无同步读写会进入未定义行为
race condition 没有 data race 仍可能有业务竞态
happens-before 跨线程可见性的证明链
relaxed 只保证单个原子对象的原子性
release-acquire 生产者发布数据,消费者获取数据
seq_cst 最强直觉,常作为初始安全选择
consume 数据依赖序,实践中退化为 acquire
CAS 适合状态机和无锁结构的条件更新
ABA CAS 不记录历史,指针结构要处理回收问题
false sharing 不同变量可能共享 cache line
volatile 不是线程同步
SPSC 拓扑假设是接口契约
双缓冲 发布最新完整快照,不保存完整历史
atomic<shared_ptr> C++20 安全并发智能指针,适合不可变快照发布
Hazard Pointer / RCU C++26 方向:把同步成本从读者转移到写者

本质洞察:并发正确性不是”某个操作看起来原子”,而是”整个同步协议能证明对象生命周期、数据可见性和业务不变量”。atomic 只解决其中一部分;越底层的工具,越需要把假设写清楚。


延伸阅读

  1. Anthony Williams, C++ Concurrency in Action, 2nd Edition:C++ 内存模型与 atomic 的系统讲解。⭐⭐
  2. cppreference:std::atomicstd::memory_orderstd::hardware_destructive_interference_sizestd::atomic<std::shared_ptr>。⭐⭐
  3. Herb Sutter, atomic<> Weapons(CppCon 2012):理解 acquire/release 和硬件内存模型的经典视频。⭐⭐⭐
  4. Jeff Preshing 的内存序系列文章(preshing.com):用图解释 happens-before 和 release-acquire,是理解内存模型最直观的材料。⭐⭐⭐
  5. cameron314/readerwriterqueue:工业级 SPSC 队列实现,适合作为参考阅读。⭐⭐⭐
  6. LLVM ThreadSanitizer 文档:data race 检测工具的使用和局限性。⭐⭐
  7. ROS2 real-time working group 文档:实时路径中的同步与内存分配约束。⭐⭐⭐
  8. Maged Michael, "Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects"(IEEE TPDS 2004):hazard pointer 的原始论文。⭐⭐⭐⭐
  9. Paul McKenney, Is Parallel Programming Hard, And, If So, What Can You Do About It?:RCU 的系统讲解,Linux 内核并发编程的权威参考。⭐⭐⭐⭐
  10. P2530R3(C++26 Hazard Pointer 提案)和 P2545R4(C++26 RCU 提案):了解标准化方向。⭐⭐⭐⭐

🔧 故障排查手册

现象 常见原因 检查方式 修复方式
程序在 Release 偶发卡死 普通变量做停止标志 TSan、搜索共享 bool std::atomic<bool> 或 mutex
x86 正常 ARM 出错 依赖强内存模型偶然正确 检查发布标志是否 relaxed 使用 release-acquire
atomic 后仍读到不一致状态 多字段不变量未保护 检查是否逐字段 atomic 用 mutex、双缓冲或 seqlock
队列偶发乱序 SPSC 被多个生产者使用 检查写者数量 改 MPSC 队列或每生产者一个 SPSC
CPU 占用高 自旋等待或 CAS 重试 perf/top/trace 加等待策略或退回 mutex
atomic 计数拖慢 多线程写同一 cache line perf、对齐实验 局部累积再归并,或 padding
TSan 报 data race 普通共享变量缺同步 看报告栈 用 mutex/atomic 建立同步
volatile 后仍不稳定 误把 volatile 当同步 搜索 volatile 标志 改 atomic
compare_exchange 循环不收敛 expected 更新逻辑错误 打印失败路径 正确使用失败时写回的 expected
最新状态偶尔跳变 发布半写快照或复用缓冲 检查 active index store 时机 写完后 release 发布,读者复制快照

下一章进入并行编程框架。回顾本章:atomic 解决的是低层同步协议;并行编程框架 会讨论如何把大量点云、残差和体素任务分给多个核心。两者的边界很重要:并行循环里不要让每个元素都争同一个 atomic,优先让线程局部工作,最后再归并。