跳转至

线程管理与互斥同步

难度:⭐⭐⭐~⭐⭐⭐⭐ | 建议用时:2 周 | 前置要求:C++语言核心/类型系统与值类别推导-C++语言核心/Lambda与STL算法 的 C++ 对象模型、RAII、异常、STL 容器,C++语言核心/Eigen基础与SLAM数学预备-C++语言核心/Concepts与Policy 的机器人数据结构和泛型接口基础


前置自测

答不出两题以上,建议先复习对象生命周期、引用传参、异常传播、容器迭代器失效和 RAII。 并发编程的难点不在 API 名字,而在多个控制流同时观察和修改同一批对象时,原来顺序程序里隐含成立的假设会消失。

  1. std::thread t(f); 创建的是线程对象还是操作系统线程?t 析构时如果仍然 joinable 会发生什么?
  2. 为什么 std::thread t(process, msg); 默认会拷贝参数?如果要传引用,为什么必须显式使用 std::ref
  3. 两个线程同时读写同一个普通 int,即使机器上看起来只是“偶尔错一次”,在 C++ 语义里为什么已经是未定义行为?
  4. std::lock_guardstd::unique_lock 都能管理 mutex,它们在所有权、延迟加锁、条件变量配合上有什么差异?
  5. condition_variable::wait(lock, pred) 为什么要写谓词?虚假唤醒和丢失通知分别是什么问题?
  6. 如果 Tracking 线程持有关键帧队列锁后再申请地图锁,而 Mapping 线程持有地图锁后再申请关键帧队列锁,会触发哪类并发故障?
  7. std::shared_mutex 允许多个读者并发,为什么它并不等于“地图访问可以随便读”?
  8. volatile bool running 能不能作为线程退出标志?如果不能,应该考虑 std::atomic<bool>std::stop_token 还是 condition_variable

本章目标

学完本章,你将能够:

  • 解释 SLAM 和机器人系统为什么常采用多线程流水线,而不是把所有计算塞进单线程循环或粗暴拆成多个进程。
  • 从对象模型角度理解 std::thread:线程入口、参数传递、join/detach、移动语义、析构时 std::terminate、异常不能逃出入口函数。
  • 用 RAII 管理线程生命周期:线程守卫、std::jthreadstd::stop_token、协作式停止协议。
  • 识别 data race,划定 critical section,并用 std::lock_guardstd::unique_lockstd::scoped_lock 选择合适的锁管理方式。
  • 正确使用 std::condition_variable:等待-通知模型、谓词、虚假唤醒、丢失通知、生产者消费者队列。
  • 解释 deadlock 的四个必要条件,并用锁排序、std::lockstd::scoped_lock 避免多锁循环等待。
  • 在读多写少地图查询中使用 std::shared_mutex,同时理解读写锁的饥饿、公平性和临界区边界。
  • 构造 deque + mutex 的传感器缓冲范式,处理 IMU/LiDAR/Odom 时间同步和时间窗剪裁。
  • 使用 std::chrono 表达时间点、持续时间、性能计时和 ROS2 timer 的基本语义。
  • 区分线程级并发和任务级异步,掌握 std::asyncstd::futurestd::promise 的返回值、阻塞和异常传播边界。
  • 说明 C++ volatile 为什么不是线程同步工具,并区分它和 Java/C# volatile 的语义差异。
  • 完成 Mini SLAM Concurrent Pipeline:MessageQueue<T>、Tracking/Mapping/LoopClosing、停止协议、计时和测试目标。

知识树

线程管理与互斥同步
├── 为什么机器人系统选择多线程 ⭐⭐⭐⭐
│   ├── 延迟隔离与所有权边界
│   ├── 单线程 / 多线程 / 多进程的边界
│   └── 共享可变状态是根本困难
├── std::thread 对象模型 ⭐⭐⭐⭐
│   ├── 线程对象 vs 执行线程
│   ├── joinable 不变量
│   ├── 参数传递与 std::ref
│   └── 异常不能逃出入口
├── RAII 线程管理 ⭐⭐⭐⭐⭐
│   ├── ThreadGuard(C++17)
│   ├── std::jthread 与 stop_token(C++20)
│   ├── 协作式停止三范式
│   └── 队列关闭协议
├── mutex 基础 ⭐⭐⭐⭐⭐
│   ├── data race = 未定义行为
│   ├── 不变量保护而非数据保护
│   ├── lock_guard / unique_lock / scoped_lock
│   └── 锁粒度权衡
├── condition_variable ⭐⭐⭐⭐⭐
│   ├── 等待-通知模型
│   ├── 谓词 / 虚假唤醒 / 丢失通知
│   └── MessageQueue<T> 实现
├── Deadlock ⭐⭐⭐⭐⭐
│   ├── 四个必要条件(Coffman 1971)
│   ├── 锁排序与 scoped_lock
│   └── 持锁不调用外部回调
├── shared_mutex ⭐⭐⭐⭐
│   ├── 读写锁语义与公平性
│   └── 读锁内返回引用的危险
├── 传感器缓冲 ⭐⭐⭐⭐⭐
│   ├── deque + mutex 范式
│   └── 时间窗与旧数据剪裁
├── 时间管理 ⭐⭐⭐⭐
│   └── std::chrono 与 ROS2 time
├── 异步任务 ⭐⭐⭐
│   └── async / future / promise
├── volatile 不是同步 ⭐⭐⭐
└── 并发调试工具 ⭐⭐⭐⭐
    ├── ThreadSanitizer
    ├── Helgrind / DRD
    └── perf 与火焰图

本章在课程中的位置:前面的章节把单个对象、单条控制流、单个算法内核讲清楚。

回顾 C++语言核心/RAII与智能指针:RAII 保证"获取即初始化,析构即释放"——这在单线程中管理内存、文件和锁非常自然。本章的挑战是:当多个线程同时触碰同一批对象时,RAII 的"析构即释放"必须配合停止协议和队列关闭才能安全工作。析构函数不能在其他线程还在访问对象时就释放资源。回顾 C++语言核心/移动语义与完美转发:移动语义让 std::thread 只能移动不能复制,这与 std::unique_ptr 的设计一脉相承——线程句柄是系统资源,复制没有清晰语义。

从本章开始,同一批机器人状态会被多个控制流同时触碰。 Tracking 想要低延迟,Mapping 想要持续优化局部地图,LoopClosing 想要在后台做昂贵的回环检测,传感器回调还要把 IMU、LiDAR、轮速计数据按时间塞进缓冲区。 这时 C++ 对象模型没有消失,反而更重要:对象由谁拥有、谁能修改、锁保护什么不变量、线程如何停止,都会直接影响 SLAM 系统是否稳定。


17.1 为什么机器人系统选择多线程 ⭐⭐⭐⭐

工程问题:SLAM 不是一条均匀流水线

一个在线 SLAM 系统看起来像“传感器输入一帧,算法输出一个位姿”。 如果只看论文公式,这个过程可以被写成顺序伪代码:

while sensor_has_data:
    read_frame()
    track_pose()
    update_local_map()
    detect_loop()
    optimize_graph()
    publish_result()

工程系统不会这样简单。 Tracking 需要尽快给控制、导航或可视化输出当前位姿。 Local Mapping 可能要三角化新地图点、筛选关键帧、做局部 Bundle Adjustment。 Loop Closing 可能要检索词袋、做几何验证、触发 pose graph 优化。 IMU 预积分和 LiDAR 去畸变还要求高频数据按时间顺序进入缓冲区。

这些工作有三个典型差异:

  1. 实时性不同:Tracking 延迟敏感,后端优化吞吐敏感。
  2. 计算粒度不同:单帧跟踪通常短,局部优化和回环检测可能长。
  3. 数据依赖不同:Tracking 依赖最新传感器,Mapping 依赖关键帧队列,LoopClosing 依赖关键帧数据库和全局图。

如果把它们全部塞进一个循环,慢任务会直接拖住快任务。 如果把每个模块拆成进程,又要承担序列化、共享地图同步、跨进程生命周期管理和调试成本。 因此,许多 SLAM 系统选择“一个进程内多线程”:同一地址空间共享地图对象,用 mutex 和条件变量管理并发访问。

ORB-SLAM 系列的经典结构就是 Tracking、LocalMapping、LoopClosing。 Tracking 通常在主线程中消费图像或传感器帧。 LocalMapping 和 LoopClosing 分别运行在独立线程里,通过关键帧队列、地图对象和状态标志协作。 LIO-SAM、FAST-LIO、FAST-LIVO 等 LiDAR/IMU 系统也大量依赖传感器回调和处理线程之间的缓冲同步。

反面失败:单线程让实时路径被慢任务拖住

考虑一个相机 30Hz、IMU 200Hz、局部优化偶尔耗时 80ms 的系统。 单线程循环里如果每插入一个关键帧都同步执行局部优化,那么接下来两到三帧图像会排队。 Tracking 得到的数据已经过时,控制层看到的位姿延迟变大。 更糟糕的是,传感器驱动还在继续推数据,缓冲区越来越长,系统开始出现周期性卡顿。

反面结构常见于教学代码:

// 片段:能表达顺序流程,但不适合作为在线系统结构。
void runSequentialPipeline() {
    while (true) {
        Frame frame = readFrame();
        Pose pose = track(frame);
        if (shouldInsertKeyframe(frame)) {
            updateLocalMap(frame);
            runLocalBundleAdjustment();
            detectLoopAndOptimizeGraph();
        }
        publish(pose);
    }
}

这段代码的问题不在语法,而在系统不变量没有分层。 Tracking 的不变量是“尽快处理最新帧并输出位姿”。 Mapping 的不变量是“最终消费关键帧并维护局部地图一致性”。 LoopClosing 的不变量是“在可接受延迟内发现全局约束并修正图”。 把三者合在一个循环,会让慢任务破坏快任务的实时性。

为什么共享可变状态是并发编程的根本困难

在进入具体的线程管理之前,需要理解一个更基本的问题:为什么并发编程比顺序编程困难得多?答案不是”API 更复杂”,而是**顺序程序中隐含成立的一个根本假设——在并发中崩塌了**。

顺序一致性假设。 在单线程程序中,程序员可以安全地假设:(1) 语句按源代码顺序执行;(2) 对一个变量的写入立刻对后续读取可见;(3) 一个操作的中间状态不会被其他代码观察到。这三条假设合在一起构成了程序员的”顺序一致性直觉”——代码做什么就是它看起来做什么。

在多线程环境中,这三条假设全部失效:

  1. 指令重排:编译器和 CPU 都会重排指令。单线程中这不影响结果(as-if 规则保证单线程行为不变),但多线程中另一个线程可能观察到重排后的执行顺序。
  2. 可见性延迟:一个线程写入变量后,另一个线程可能看不到这次写入——因为写入可能停留在 CPU 的 store buffer 或 cache 中,尚未传播到其他核心。
  3. 操作的非原子性:即使是 ++i 这样简单的操作,在机器层面也是”读取-修改-写回”三步。另一个线程可能在三步之间观察或修改同一个变量。

这些失效不是”偶发的 bug”,而是 C++ 语言规范明确允许的行为——标准只保证没有 data race 时程序行为有定义。当存在 data race(两个线程无同步地访问同一内存位置且至少一个是写)时,程序行为**完全未定义**——不是”偶尔读到旧值”,而是编译器可以基于”没有 data race”的假设做出任何优化,程序可能以任何方式失败。

共享可变状态是这些问题的交汇点。 如果状态不共享(每个线程有自己的数据),没有可见性问题;如果状态不可变(只读数据),没有 data race;如果程序是单线程的,没有并发访问。问题只在”共享 + 可变 + 多线程”三者同时存在时出现。因此并发编程的核心策略都是在这三者中消除至少一个:不共享(消息传递、线程本地存储)、不可变(不可变快照、函数式风格)、或保护访问(mutex、atomic)。

抽象不变量:并发拆分服务于延迟隔离和所有权边界

多线程不是为了”看起来高级”,而是为了维护几个系统级不变量:

  1. 实时路径不被后台优化长期阻塞
  2. 模块间通过队列传递事件,而不是互相调用长函数
  3. 共享地图只在受保护的临界区内被修改
  4. 线程停止时,队列、地图和状态标志有明确的关闭顺序
  5. 异常、超时和传感器断流不会把整个系统拖进半关闭状态

在这个视角下,线程不是算法的本体。 线程只是把不同延迟需求的任务放到不同执行上下文里。 真正需要设计的是数据所有权和同步协议:

  • Tracking 产生关键帧,写入 Mapping 队列。
  • Mapping 消费关键帧,修改局部地图,可能产生回环检测请求。
  • LoopClosing 消费候选关键帧,读取地图并偶尔触发全局修正。
  • 传感器回调只做短临界区入队,不在回调里执行重优化。
  • 停止协议先通知队列关闭,再让消费线程退出,最后 join。

规则推导:单线程、多线程、多进程各有边界

单线程适合离线工具、确定性实验和小规模脚本。 它的优势是调试简单、顺序可复现、没有 data race。 如果系统目标是“读一个 rosbag,离线跑完并输出轨迹”,单线程可能更好。

多线程适合同一进程内共享大量状态、模块延迟差异明显、数据交换频繁的系统。 它的优势是共享内存成本低,线程间传递对象可以移动或引用,地图结构不必频繁序列化。 代价是需要严格设计锁、队列、停止和异常处理。

多进程适合故障隔离、语言边界、权限隔离或部署边界明显的系统。 例如相机驱动、深度学习检测、SLAM 后端、导航栈可以分进程,用 ROS2 topic/service/action 连接。 代价是消息序列化、调度延迟、跨进程状态一致性和部署复杂度。

机器人系统常混合三者:

  • 进程之间用 ROS2 通信隔离大模块。
  • 进程内部用多线程处理回调、队列和后端优化。
  • 算法内核内部仍尽量保持顺序、可测试、可复现。

工程边界:多线程不能修复算法结构混乱

一个常见误解是:只要系统变慢,就加线程。 如果算法本身没有明确数据边界,加线程只会让问题更难定位。 例如地图对象到处暴露可变引用,任何模块都能随时改关键帧、地图点和位姿图。 此时即使加了 mutex,也很难知道锁保护的是哪个不变量。

多线程设计前应先问五个问题:

  1. 哪条路径是实时路径,最大可接受延迟是多少?
  2. 哪些任务可以滞后处理,滞后上限是多少?
  3. 哪些对象跨线程共享,哪些对象只在线程内部拥有?
  4. 每把锁保护哪个对象集合和哪个不变量?
  5. 停止、异常、传感器断流时,队列和线程如何退出?

如果这些问题答不清楚,线程数越多,系统越不可控。

代码验证:三阶段流水线的最小可运行模型

下面的代码不是完整 SLAM。 它只验证一个工程事实:Tracking 可以保持固定节奏产生关键帧,Mapping 可以以自己的速度消费,LoopClosing 可以在后台处理低频事件。 后面项目会把这个模型扩展成带停止协议和泛型队列的版本。

#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

struct Keyframe {
    int id = 0;
};

int main() {
    using namespace std::chrono_literals;

    std::mutex m;
    std::queue<Keyframe> queue;
    bool finished = false;

    std::thread mapping_thread([&] {
        for (;;) {
            Keyframe kf;
            {
                std::lock_guard<std::mutex> lock(m);
                if (!queue.empty()) {
                    kf = queue.front();
                    queue.pop();
                } else if (finished) {
                    break;
                } else {
                    continue;
                }
            }

            std::this_thread::sleep_for(30ms);
            std::cout << "mapping keyframe " << kf.id << '\n';
        }
    });

    for (int i = 0; i < 5; ++i) {
        std::this_thread::sleep_for(10ms);
        {
            std::lock_guard<std::mutex> lock(m);
            queue.push(Keyframe{i});
        }
        std::cout << "tracking keyframe " << i << '\n';
    }

    {
        std::lock_guard<std::mutex> lock(m);
        finished = true;
    }

    mapping_thread.join();
}

这个模型故意还没有使用 condition_variable。 Mapping 线程在队列空时会忙等,浪费 CPU。 它适合说明”线程拆分隔离了节奏”,不适合作为最终队列实现。 17.6 节会修正等待模型。

本质洞察:多线程的本质不是”让代码跑得更快”,而是”让不同延迟需求的任务不互相拖累”。Tracking 需要低延迟,Mapping 需要高吞吐,LoopClosing 可以后台慢慢做。线程只是把这些不同节奏的任务放进不同执行上下文的手段。真正需要设计的不是”开几个线程”,而是”数据由谁拥有、通过什么协议传递、在什么边界同步”。

多线程系统像一个工厂流水线:Tracking 是高速检测工位(每件产品停留时间短),Mapping 是精加工工位(每件产品需要更长时间),LoopClosing 是质检工位(低频抽检)。如果把三个工位串行排列,精加工慢了就会堵住检测工位。流水线设计让每个工位按自己的节奏工作,工位之间用传送带(队列)缓冲。

⚠️ 编程陷阱:为了”加速”而盲目添加线程 错误做法:系统变慢时,第一反应是”加线程”。 现象:加了线程后出现 data race、死锁、退出时崩溃等新问题,调试成本远大于性能收益。 根本原因:多线程不是性能优化的默认手段。如果算法本身没有明确的数据边界和所有权划分,加线程只会让问题更难定位。 正确做法:先回答五个问题(实时路径是什么?哪些任务可以滞后?哪些对象跨线程共享?每把锁保护什么?停止时如何退出?),再决定是否需要多线程。

💡 概念误区:认为单线程永远低效 新手想法:”单线程不能利用多核,所以实时系统一定要多线程。” 实际上:如果所有计算加起来不超过帧时间预算(如 33ms@30Hz),单线程顺序执行更简单、更可调试、更可复现。离线实验和算法验证阶段,单线程通常更合适。 正确理解:多线程解决的是”延迟隔离”和”吞吐瓶颈”,不是”让 CPU 看起来很忙”。

练习

  1. [分析题]:对比单线程顺序流水线和多线程流水线。列出至少 3 个场景适合单线程,3 个场景必须多线程。
  2. [代码题]:修改上面的最小流水线代码,添加帧延迟统计:记录每帧从 Tracking 产生到 Mapping 消费的时间差。观察忙等模式下 Mapping 线程的 CPU 占用。
  3. [跨章综合题]:C++语言核心/RAII与智能指针 讲了 RAII 管理单线程资源生命周期。在多线程环境下,RAII 的”析构即释放”原则需要配合什么额外机制(停止协议、队列关闭)才能安全工作?

17.2 std::thread 对象模型 ⭐⭐⭐⭐

工程问题:线程对象不是线程函数本身

std::thread 把一个可调用对象交给系统创建新的执行流。 创建后,C++ 程序里出现两个概念:

  1. 执行线程:操作系统调度的控制流,正在运行线程入口函数。
  2. std::thread 对象:C++ 对象,保存线程句柄,负责 join/detach 管理。

这两个概念必须分开。 线程函数可能已经运行结束,但 std::thread 对象仍然 joinable,调用者仍必须 join()detach()。 反过来,std::thread 对象可以被移动到另一个变量里,底层执行线程并不会因此重启。

线程入口可以是普通函数、函数对象、lambda、成员函数加对象指针。 ORB-SLAM 风格的写法本质上就是把某个对象的成员函数作为线程入口:

class LocalMapping {
public:
    void run();
};

LocalMapping mapping;
std::thread mapping_thread(&LocalMapping::run, &mapping);

这段代码的关键边界是:mapping 对象必须活得比线程入口更久。 如果对象先析构,线程还在调用它的成员函数,就是悬空指针。

反面失败:忘记 join 导致析构时终止程序

下面是一个反例片段。 它能编译,但离开作用域时会调用 std::terminate,因为 t 仍然 joinable。

#include <thread>

void badThreadLifetime() {
    std::thread t([] {});
}

C++ 标准库选择终止程序,而不是自动 detach。 原因很实际:自动 detach 会让后台线程继续访问已经析构的局部对象,故障更隐蔽。 自动 join 也不适合 std::thread,因为析构可能发生在异常展开路径上,隐式阻塞会让程序在意外位置卡住。 所以 std::thread 把责任交给调用者:你必须明确选择 join()detach()

另一个反面失败是参数引用。

#include <iostream>
#include <thread>

void addOne(int& x) {
    ++x;
}

void badReferencePassing() {
    int value = 41;
    // std::thread t(addOne, value);  // 无法按 int& 调用:thread 会先复制参数。
    std::thread t(addOne, std::ref(value));
    t.join();
    std::cout << value << '\n';
}

std::thread 默认会把参数 decay-copy 到内部存储。 这能避免很多悬空引用,但也意味着引用传递必须显式写 std::ref。 显式性是安全边界:读代码的人能立刻看到跨线程共享了外部对象。

抽象不变量:joinable 线程必须被收束

std::thread 的核心不变量可以写成一句话:

任何 joinable 的 std::thread 对象在析构前必须被 join()detach() 或移动走。

围绕这个不变量,可以推导出几条规则:

  1. 默认构造的 std::thread 不代表执行线程,joinable() == false
  2. 成功创建线程后,joinable() == true
  3. join() 等待执行线程结束,并让对象变为 not joinable。
  4. detach() 让执行线程脱离对象管理,并让对象变为 not joinable。
  5. 移动构造或移动赋值会转移线程句柄。
  6. joinable 对象析构会 std::terminate

线程入口还有一个异常不变量:

异常不能逃出线程入口函数。

如果线程入口函数抛出异常并离开入口,程序同样会调用 std::terminate。 线程外部不能直接 catch 另一个线程里逃出的异常。 如果需要把错误传回创建者,要在入口内部捕获,再通过 std::promise、错误队列、状态对象或日志系统传递。

规则推导:参数传递、移动和成员函数入口

线程参数传递遵循“先保存,再在线程中调用”的模型。 std::thread 构造函数会把可调用对象和参数复制或移动到内部存储,然后在新线程中用这些存储值调用入口。 这解释了几个常见现象:

  • 传普通对象会复制,除非使用 std::move
  • 传引用需要 std::refstd::cref
  • 传指针只是复制指针值,不延长目标对象生命周期。
  • 传成员函数时,第一个额外参数是对象指针、引用包装器或智能指针。
  • std::thread 本身不可复制,只能移动。

成员函数入口示例:

#include <iostream>
#include <thread>

class Mapper {
public:
    void run(int start_id) {
        std::cout << "mapping starts at " << start_id << '\n';
    }
};

int main() {
    Mapper mapper;
    std::thread t(&Mapper::run, &mapper, 10);
    t.join();
}

如果 Mapperstd::shared_ptr 管理,也可以把 shared_ptr 复制进线程,显式延长对象生命周期。 这适合后台任务需要拥有对象的场景,但不应滥用。 共享所有权会让关闭顺序变得不直观。

工程边界:detach 很少是机器人核心线程的正确选择

detach() 的含义是:我不再关心这个线程何时结束,也不会再 join 它。 这在短命日志刷新、一次性后台统计等弱依赖任务中可能可接受。 但对 SLAM 的 Tracking、Mapping、LoopClosing、传感器处理线程来说,detach() 通常是错误信号。

核心线程需要明确关闭顺序:

  1. 停止接收新输入。
  2. 关闭队列或发送停止请求。
  3. 唤醒等待中的线程。
  4. 等待线程退出。
  5. 析构地图、词袋、优化器和传感器缓冲。

如果 detach 了 Mapping 线程,主系统可能先析构地图对象,后台线程随后访问地图,形成悬空引用。 这种故障往往只在退出时偶发,调试成本很高。

因此,机器人核心线程的默认选择是 RAII 管理和显式 join。 C++20 的 std::jthread 进一步把这个规则做成类型行为。

代码验证:安全入口、引用传递和异常转移

下面代码可单文件编译。 它展示三件事:引用要用 std::ref,线程入口内部捕获异常,错误通过 std::promise 传回主线程。

#include <exception>
#include <future>
#include <iostream>
#include <stdexcept>
#include <thread>

void increment(int& value) {
    ++value;
}

int main() {
    int counter = 0;
    std::thread t1(increment, std::ref(counter));
    t1.join();

    std::promise<int> result_promise;
    std::future<int> result = result_promise.get_future();

    std::thread t2([p = std::move(result_promise)]() mutable {
        try {
            throw std::runtime_error("mapping failed");
            p.set_value(1);
        } catch (...) {
            p.set_exception(std::current_exception());
        }
    });

    try {
        std::cout << "counter = " << counter << '\n';
        std::cout << "result = " << result.get() << '\n';
    } catch (const std::exception& e) {
        std::cout << "caught from thread: " << e.what() << '\n';
    }

    t2.join();
}

编译命令示例:

g++ -std=c++17 -O2 -pthread thread_object_model.cpp && ./a.out

这个例子里的 std::promise 只负责传递一次结果或一次异常。 如果要长期传递多条消息,应使用线程安全队列。

⚠️ 编程陷阱:忘记 join() 导致析构时 std::terminate 错误做法:创建 std::thread 后在正常路径 join(),但异常路径跳过了 join()现象:正常退出时一切正常;但当初始化函数抛异常时,程序在 std::thread 析构处直接终止。 根本原因:C++ 标准要求 joinable 的 std::thread 在析构前必须 join()detach()。异常路径上的栈展开会触发析构,此时线程仍然 joinable。 正确做法:用 RAII 守卫(ThreadGuard 或 std::jthread)确保所有路径都能 join。

🧠 思维陷阱:认为 std::ref 是多余的语法 新手想法:"addOne(int& x) 参数是引用,传给 std::thread 时引用自然就传过去了。" 实际上std::thread 构造函数会 decay-copy 所有参数到内部存储。不用 std::ref,参数会被拷贝——函数修改的是拷贝件,不是原件。std::ref 的显式性是安全边界:读代码的人立刻知道"这里跨线程共享了外部对象"。 正确思维std::ref 不是语法负担,而是"我知道自己在做什么"的声明。

练习

  1. [分析题]:解释 std::thread 析构时为什么选择 std::terminate 而不是自动 join() 或自动 detach()。两种自动策略各有什么隐患?
  2. [代码题]:编写一个程序,故意让 std::thread 在 joinable 状态下析构。用 try-catch 观察是否能捕获终止。然后用 ThreadGuard 修复。
  3. [跨章综合题]:C++语言核心/移动语义与完美转发 讲了移动语义。std::thread 不可复制只能移动——解释这个设计与"线程句柄是系统资源"之间的关系。对比 std::unique_ptr 的设计(同样不可复制只能移动)。

17.3 RAII 线程管理:守卫、jthread 与停止请求 ⭐⭐⭐⭐⭐

工程问题:异常路径也必须 join

手动 join() 在直线代码里不难。 困难在异常路径。

void runMapping() {
    std::thread t([] {
        runLocalMappingLoop();
    });

    loadVocabulary();     // 如果这里抛异常,t 的析构会 terminate。
    initializeMap();

    t.join();
}

只要 loadVocabulary()initializeMap() 抛异常,函数开始栈展开,t 析构时仍然 joinable,程序终止。 这和内存泄漏问题相似:资源获取和释放不能散落在控制流里。 线程句柄也是资源,应该用 RAII 收束。

线程生命周期管理可以类比公司的入职和离职流程。std::thread 的创建就像员工入职——系统开始运行新的工作流。但如果员工离职(线程需要结束)时没有交接流程(join),而是直接走人(对象析构),公司(程序)会陷入混乱。更糟的是,如果员工还在使用公司资源(访问对象成员)时公司就关门了(对象析构),就会出现"人还在用但工位已经拆了"的情况(use-after-free)。RAII 线程管理就是确保"先完成交接,再关闭工位"。

如果 std::thread 的析构函数自动调用 join() 而不是 terminate() 会怎样?异常展开路径上的隐式 join 可能让程序在意外位置无限期卡住——想象一个后台优化线程需要 30 秒才能完成,异常处理路径却在等它结束。自动 detach 也不好——后台线程会继续访问可能已经销毁的局部对象。C++ 标准库选择 terminate() 是"fail-fast"策略:强迫程序员显式做出选择,而不是在两个都不好的默认行为之间猜测。C++20 的 std::jthread 选择了"先 request_stop 再 join",这是一个更合理的默认行为,因为它同时提供了协作式停止机制。

线程停止的三种范式:为什么协作式停止是最佳选择

线程停止看似简单——"让线程退出循环就好了"——但在有共享资源的系统中,停止顺序和停止时机直接影响系统的正确性。理解三种停止范式的差异,有助于选择正确的停止策略。

范式一:强制终止。 POSIX 的 pthread_cancel 和 Windows 的 TerminateThread 可以从外部强制杀死一个线程。这看似最简单,但在 C++ 中几乎不可用。原因是:强制终止不会执行栈展开——局部对象的析构函数不会被调用,RAII 资源不会被释放,mutex 不会被解锁。如果被杀死的线程持有 map_mutex_,这把锁就永远处于锁定状态,其他线程永久卡住。强制终止是"最暴力也最危险"的停止方式,在 SLAM 系统中绝不应使用。

范式二:标志轮询。 设置一个 atomic<bool> stop_requested 标志,工作线程在每次循环迭代中检查这个标志。这是最常用的轻量级停止方式,适合"循环频率高、每次迭代时间短"的线程。但如果线程正在 condition_variable::wait() 或阻塞式 I/O 中等待,它不会主动检查标志——需要额外的 notify_all() 唤醒或 wait_for 超时来打破等待。

范式三:C++20 协作式停止(stop_token/stop_source)。 std::jthreadstd::stop_token 是 C++20 引入的完整停止协议。stop_source 像一个"停止按钮",stop_token 像一个"停止信号接收器"。工作线程可以通过 token.stop_requested() 轮询,也可以通过 stop_callback 注册回调——当停止按钮被按下时,回调自动执行。这解决了"线程在 condition_variable::wait() 中无法感知停止"的问题:可以在 stop_callbacknotify_all(),唤醒等待中的线程。

SLAM 系统的停止顺序设计。 在多线程 SLAM 系统中,停止顺序通常是反向的——先创建的线程后停止:

  1. 停止接收新传感器数据(关闭驱动回调)。
  2. 关闭传感器缓冲队列(使消费者退出等待)。
  3. 停止 Tracking 线程并 join。
  4. 关闭关键帧队列。
  5. 停止 Mapping 线程并 join。
  6. 关闭回环检测队列。
  7. 停止 LoopClosing 线程并 join。
  8. 释放地图、词袋数据库和优化器。

每一步都要确保"先让线程退出,再释放它访问的资源"。颠倒顺序就会出现 use-after-free。这种严格的停止顺序很像操作系统的关机流程——先停止用户进程,再停止系统服务,最后卸载文件系统。任何一步跳过都可能导致数据损坏。

反面失败:析构顺序和停止协议分离

一个常见反面写法是把停止标志、线程对象和 join 放在不同位置:

class BadPipeline {
public:
    void start();
    void requestStop();
    void wait();

private:
    bool stop_requested_ = false;
    std::thread mapping_thread_;
};

这个设计缺少三个约束:

  1. stop_requested_ 如果被多个线程读写,普通 bool 会 data race。
  2. 调用者可能只调用 requestStop(),忘记 wait()
  3. 析构函数如果没有处理 joinable 线程,退出路径仍然不安全。

真正的生命周期设计应把“请求停止”和“等待退出”放在同一个所有者里。 所有者析构时必须让线程收束。

抽象不变量:线程所有者析构后不留下活动入口

RAII 线程管理的不变量是:

拥有线程句柄的 C++ 对象析构完成后,不再有入口函数访问该对象的成员。

这句话比“析构时 join”更完整。 如果线程入口捕获了 this,析构函数必须先请求线程退出,再 join。 否则 join 只是等待,线程可能永远不退出。

停止协议通常包含三部分:

  1. 一个线程安全的停止请求。
  2. 等待点能被唤醒。
  3. 线程循环在合适边界检查停止请求并退出。

C++20 的 std::jthread 同时提供两个改进:

  • 析构时如果 joinable,会先 request_stop,再 join。
  • 线程入口可以接收 std::stop_token,用它查询停止请求。

std::jthread 不会强制杀死线程。 它仍然是协作式停止。 线程入口必须检查 token,等待点也要能被唤醒。

规则推导:线程守卫适合 C++17,jthread 适合 C++20

C++17 可以写一个不可复制的线程守卫:

#include <thread>

class ThreadGuard {
public:
    explicit ThreadGuard(std::thread t) noexcept
        : thread_(std::move(t)) {}

    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;

    ThreadGuard(ThreadGuard&&) noexcept = default;
    ThreadGuard& operator=(ThreadGuard&&) noexcept = default;

    ~ThreadGuard() {
        if (thread_.joinable()) {
            thread_.join();
        }
    }

private:
    std::thread thread_;
};

这个守卫解决“忘记 join”的问题,但不解决“线程如何退出”的问题。 如果线程函数是无限循环,析构会永久等待。 因此 C++17 常配合 std::atomic<bool> 或条件变量关闭队列。

C++20 可以使用 std::jthread

#include <chrono>
#include <iostream>
#include <thread>

int main() {
    using namespace std::chrono_literals;

    std::jthread mapping_thread([](std::stop_token stop) {
        while (!stop.stop_requested()) {
            std::cout << "mapping tick\n";
            std::this_thread::sleep_for(20ms);
        }
        std::cout << "mapping exits\n";
    });

    std::this_thread::sleep_for(60ms);
}

mapping_thread 离开作用域时会请求停止并等待退出。 但注意:如果线程卡在不响应停止请求的阻塞调用里,析构仍会等待。

工程边界:协作式停止不能替代队列关闭

机器人系统里线程经常阻塞在队列等待。 如果只检查 stop_token,但线程正在 condition_variable::wait(),它不会自动醒来。 C++20 有 condition_variable_any 可以配合 stop_token,但常见工程做法仍是给队列设计 close()

  1. close() 在锁内设置关闭标志。
  2. close() 调用 notify_all() 唤醒所有等待者。
  3. pop() 在谓词里同时检查“队列非空”和“已关闭”。
  4. 队列空且已关闭时,pop() 返回空结果,消费循环退出。

这种队列关闭协议比单独的停止标志更完整。 它把“没有数据”和“以后也不会有数据”区分开。

jthread 适合拥有后台循环的对象。 MessageQueue::close() 适合阻塞等待数据的消费端。 实际系统常同时使用两者:析构时 request_stop,队列 close 唤醒等待线程。

代码验证:C++20 jthread 与可停止等待

下面代码展示 std::jthread 的协作停止。 为了避免等待点睡太久,循环使用短周期检查 token。 在高性能系统中,应把这个模式替换为条件变量唤醒。

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

class PeriodicTask {
public:
    void start() {
        thread_ = std::jthread([this](std::stop_token stop) {
            run(stop);
        });
    }

private:
    void run(std::stop_token stop) {
        using namespace std::chrono_literals;
        while (!stop.stop_requested()) {
            ++ticks_;
            std::cout << "tick " << ticks_.load() << '\n';
            std::this_thread::sleep_for(10ms);
        }
    }

    // 用 atomic:本例中 ticks_ 只在工作线程内修改,但一旦将来需要从外部
    // 线程读取计数(如监控、日志),普通 int 的读写就会与这里的 ++ticks_
    // 构成 data race。默认用 atomic 计数器是更安全的教学习惯。
    std::atomic<int> ticks_{0};
    std::jthread thread_;
};

int main() {
    using namespace std::chrono_literals;
    PeriodicTask task;
    task.start();
    std::this_thread::sleep_for(35ms);
}

编译命令示例:

g++ -std=c++20 -O2 -pthread jthread_stop.cpp && ./a.out

这段代码的边界很清楚:PeriodicTask 析构时,std::jthread 先请求停止再 join。 如果 run() 内部访问共享对象,仍然需要 mutex 或其他同步机制。

⚠️ 编程陷阱:jthread 析构时线程卡在不响应停止请求的阻塞调用 错误做法:线程入口在 cv.wait() 上阻塞,但 wait 的谓词没有检查停止请求。 现象jthread 析构时调用 request_stop() 并等待 join,但线程永远不醒来——程序卡死在析构函数里。 根本原因request_stop() 只设置停止标志,不唤醒阻塞调用。如果线程在 cv.wait() 上睡眠,谓词中没有检查停止标志,notify_all() 也没有被调用,线程就不会醒来。 正确做法:队列的 close() 方法应同时设置关闭标志和调用 notify_all()。或使用 condition_variable_any 配合 stop_token

💡 概念误区:认为 detach() 是合理的线程管理方式 新手想法:"detach() 让线程自己跑,不用管它,比 join() 更灵活。" 实际上detach() 意味着放弃对线程生命周期的控制。主系统析构地图、传感器缓冲和配置对象后,detached 线程可能仍在访问这些已销毁的对象——这是经典的 use-after-free。更糟的是,这种崩溃只在退出时偶发,极难复现和调试。 正确理解:机器人核心线程的默认选择是 RAII 管理 + 显式 join。只有真正"发出去就不管"的弱依赖任务才考虑 detach。

练习

  1. [分析题]std::jthread 的析构顺序是 request_stop()join()。如果线程入口函数捕获了 this 指针,析构函数中的 join 和成员变量析构的顺序为什么很关键?
  2. [代码题]:编写一个 PeriodicTask 类,使用 std::jthreadstd::stop_token 实现可停止的周期任务。验证离开作用域后线程能干净退出。
  3. [跨章综合题]:C++语言核心/RAII与智能指针 的 RAII 保证"获取即初始化,析构即释放"。std::jthread 是 RAII 在线程管理上的应用。对比 std::jthreadstd::unique_ptr:两者都是 RAII 包装器,分别管理什么资源?析构时分别做什么?

17.4 mutex 基础:data race、临界区与锁粒度 ⭐⭐⭐⭐⭐

这段代码模拟的是什么场景?——Tracking 和 Mapping 在争抢什么

在进入 mutex API 之前,先想清楚我们到底在保护什么。考虑 ORB-SLAM 的核心数据流:Tracking 线程每帧都要读取局部地图中的地图点来做特征匹配;Mapping 线程则在不断向地图中插入新的关键帧和地图点。如果你把地图想象成一个共享的白板——Tracking 在读白板上的内容做计算,Mapping 在白板上写新内容——那么问题就来了:Mapping 正在擦掉旧内容写新内容的那一瞬间,Tracking 读到的是什么?是旧内容?新内容?还是擦了一半的乱码?

这不是理论问题。ORB-SLAM2/3 的 Map 类内部有 mMutexMapUpdate,正是因为 Tracking 和 Mapping 同时访问 mspMapPointsmspKeyFrames 这两个 std::set。如果没有这把锁,std::set 在一个线程插入(可能触发红黑树旋转)的同时被另一个线程遍历,迭代器可能指向已经移动的节点——程序不是"偶尔读到旧数据",而是直接段错误。

理解了这个场景,mutex 的动机就自然了:它不是"因为教科书说并发要加锁",而是因为 Tracking 线程需要在**地图处于一致状态**时观察它,Mapping 线程需要在**没有其他线程观察**时修改它。mutex 划定的就是这个"观察窗口"和"修改窗口"的边界。

工程问题:共享地图不是普通全局变量

SLAM 系统里最诱人的写法是让多个模块直接共享地图对象:

struct Map {
    std::vector<int> keyframes;
    std::vector<int> landmarks;
};

Tracking 读地图做位姿估计,Mapping 插入关键帧和地图点,LoopClosing 修正位姿图。 如果这些操作没有同步,std::vector 可能一边扩容,一边被另一个线程遍历。 这不是“读到旧数据”的小问题,而是 C++ data race 和容器不变量破坏。

data race 的定义可以简化理解为:

两个线程并发访问同一内存位置,其中至少一个是写操作,并且没有 happens-before 同步关系。

一旦发生 data race,程序行为未定义。 编译器可以基于“没有 data race”这个语言前提优化代码。 所以不要把 data race 当成“偶发数值错误”,它是整个 C++ 语义层的断裂。

反面失败:只保护写,不保护读

很多初学者会写成“写的时候加锁,读的时候不加锁”。 他们的直觉是读操作不会修改对象。 但如果另一个线程正在写,读也会和写并发访问同一内存位置。

#include <mutex>
#include <vector>

class UnsafeMap {
public:
    void addKeyframe(int id) {
        std::lock_guard<std::mutex> lock(m_);
        keyframes_.push_back(id);
    }

    int latestKeyframe() const {
        return keyframes_.back();  // 反例:读没有同步。
    }

private:
    mutable std::mutex m_;
    std::vector<int> keyframes_;
};

如果 push_back 触发扩容,back() 可能在旧缓冲区或半更新状态上读取。 即使没有扩容,std::vector 的 size 更新也不是线程同步操作。

抽象不变量:mutex 不是保护数据,而是保护不变量

这个区分至关重要,值得展开讲清楚。很多初学者的心智模型是”mutex 保护数据”——好像给数据加了一把锁,只要加了锁就安全了。更准确的心智模型是:mutex 保护不变量(invariant)——保证在临界区外部观察到的数据始终满足某组一致性条件。

什么是不变量? 以银行转账为例。假设 Alice 有 100 元,Bob 有 200 元,系统的不变量是”Alice 和 Bob 的余额之和 = 300 元”。转账操作需要两步:从 Alice 扣款、给 Bob 加款。在两步之间,不变量被暂时破坏——Alice 已经被扣款但 Bob 还没有收到。如果此时另一个线程观察余额,会看到总和不等于 300 的中间状态。

mutex 的临界区就是为了覆盖”不变量暂时被破坏”的这段时间。进入临界区时,不变量成立;在临界区内部,不变量可以被暂时破坏(因为修改正在进行中);退出临界区前,不变量必须被恢复。其他线程只能在临界区外部观察数据——此时不变量一定成立。

临界区入口(加锁):不变量成立
修改步骤1:Alice -= 50    ← 此时不变量暂时被破坏
修改步骤2:Bob += 50      ← 不变量恢复
临界区出口(解锁):不变量成立

为什么”mutex 保护数据”是不够精确的心智模型? 因为同一个 mutex 可能保护多个相关的数据结构。SLAM 地图的不变量可能是”关键帧数组、关键帧 id 到索引的映射、共视图边、最新关键帧 id 四者一致”。如果只锁住 keyframes_.push_back() 一行,而不锁住 id 映射和共视图边的更新,那么另一个线程在锁释放后可能看到”关键帧数组已经有新帧,但 id 映射还没更新”的不一致状态。锁住了数据,但没有保护不变量。

mutex 的名字不应表达”谁在用”,而应表达”保护什么”。 例如:

  • map_mutex_ 保护地图容器和关键帧位姿的一致性。
  • queue_mutex_ 保护队列内容和关闭标志。
  • state_mutex_ 保护系统状态枚举和错误消息。

临界区不是”觉得危险就加锁”的代码块。 临界区应覆盖一次不变量观察或修改的完整过程。

例如地图插入关键帧时,可能要同时更新:

  1. 关键帧数组。
  2. 关键帧 id 到索引的映射。
  3. 共视图边。
  4. 最新关键帧 id。

这些更新如果代表同一个地图不变量,就应该在同一把锁保护下完成,或者用更高层事务机制封装。 只锁 push_back 一行,不能保护跨容器一致性。

规则推导:lock_guardunique_lockscoped_lock 的分工

std::lock_guard<std::mutex> 是最简单的 RAII 锁。 构造时加锁,析构时解锁,不可手动解锁。 它适合短临界区。

{
    std::lock_guard<std::mutex> lock(mutex_);
    shared_state_ = 42;
}

std::unique_lock<std::mutex> 拥有更丰富的状态。 它可以延迟加锁、手动解锁、移动,并且是 condition_variable 的标准搭档。

std::unique_lock<std::mutex> lock(mutex_);
prepareSharedState();
lock.unlock();
doExpensiveWorkWithoutHoldingMutex();

std::scoped_lock 是 C++17 引入的通用 RAII 锁。 它可以一次锁多把 mutex,并使用避免死锁的锁定算法。

std::scoped_lock lock(map_mutex_, queue_mutex_);
moveKeyframeFromQueueToMap();

锁粒度需要权衡。 锁太粗,系统容易串行化,Tracking 被 Mapping 长时间阻塞。 锁太细,不变量被拆碎,死锁和竞态更容易出现。 机器人系统常用策略是:

  1. 共享数据结构用清晰的少数几把锁保护。
  2. 重计算不在锁内执行。
  3. 在锁内复制或移动出所需快照,然后释放锁计算。
  4. 写回共享状态时再次短暂加锁。

工程边界:锁不能保护锁外引用

一个隐蔽错误是从锁内返回引用或迭代器:

class BadMapView {
public:
    const std::vector<int>& keyframes() const {
        std::lock_guard<std::mutex> lock(m_);
        return keyframes_;
    }

private:
    mutable std::mutex m_;
    std::vector<int> keyframes_;
};

函数返回后锁已经释放。 调用者拿到的引用没有同步保护。 另一个线程可以同时修改 keyframes_

正确做法通常是:

  • 返回快照副本。
  • 让调用者提供回调,在锁内访问受控视图。
  • 返回不可变、生命周期稳定的数据结构。
  • 用更高层消息传递替代共享容器。

对 SLAM 地图而言,直接返回内部容器引用尤其危险。 地图点和关键帧往往互相引用,单个容器快照也可能不代表一致地图。

锁粒度的选择是一个持续的工程权衡。锁太粗(整个地图一把锁),Tracking 读地图时会被 Mapping 的长时间写操作阻塞,实时性受损。锁太细(每个关键帧一把锁、每个地图点一把锁),跨对象不变量(如"关键帧 A 和地图点 B 的共视关系")没有统一保护,容易出现不一致状态。机器人系统的常见平衡点是:用少数几把锁保护清晰定义的数据聚合体,在锁内只做数据传递(复制快照或移动所有权),在锁外做重计算。这样既不会长时间持锁阻塞实时路径,又能保持不变量的完整性。

理解锁粒度还需要一个反事实:如果每个函数内部各自加锁,不考虑函数组合怎么样?empty() 返回 false、pop() 返回元素——两个函数各自正确,但组合时中间的间隙可能被其他线程利用。这就是 TOCTOU(Time-Of-Check-To-Time-Of-Use)竞态。正确的并发接口应把"检查+操作"合并为一个受锁保护的原子操作,如 tryPop() 返回 optional<T>。这个原则贯穿整个并发编程:不要让调用者在两个安全操作之间暴露不安全的间隙。

代码验证:锁内修改、锁外计算

下面代码可单文件编译。 它展示一个线程安全计数器和一个“复制快照后锁外计算”的模式。

#include <iostream>
#include <mutex>
#include <numeric>
#include <thread>
#include <vector>

class KeyframeStore {
public:
    void add(int id) {
        std::lock_guard<std::mutex> lock(m_);
        ids_.push_back(id);
    }

    std::vector<int> snapshot() const {
        std::lock_guard<std::mutex> lock(m_);
        return ids_;
    }

private:
    mutable std::mutex m_;
    std::vector<int> ids_;
};

int main() {
    KeyframeStore store;

    std::thread a([&] {
        for (int i = 0; i < 1000; ++i) {
            store.add(i);
        }
    });

    std::thread b([&] {
        for (int i = 1000; i < 2000; ++i) {
            store.add(i);
        }
    });

    a.join();
    b.join();

    const auto ids = store.snapshot();
    const long sum = std::accumulate(ids.begin(), ids.end(), 0L);
    std::cout << ids.size() << " ids, sum = " << sum << '\n';
}

快照副本有成本。 如果地图很大,不能每次都复制全部对象。 但这个例子强调的是边界:锁保护共享容器,计算在锁外做。 真实系统会根据数据量把快照缩小到关键帧 id、位姿数组或局部子图。

本质洞察:mutex 不是保护代码的,而是保护不变量的。当你说"这个 mutex 保护 map",真正的意思是"这个 mutex 保证在临界区外观察到的 map 始终满足其不变量(如关键帧数组和 id 映射一致、共视图边完整)"。临界区应覆盖一次不变量修改的完整过程,而不是"觉得危险就加锁"的任意代码块。

mutex 就像图书馆的借还书流程。借书时,图书馆员要同时更新书架上的库存和借阅记录——这两个操作必须在同一次事务中完成(类似同一个临界区)。如果只更新了库存但没更新记录,下一个读者查记录时会以为书还在,实际上书已经被拿走了。mutex 保护的就是这种"多个数据结构之间的一致性"。

如果只保护写操作不保护读操作会怎样?一个线程正在执行 push_back(可能触发 vector 扩容:分配新内存→复制元素→释放旧内存),另一个线程同时调用 back() 读取。读者可能读到旧缓冲区(已释放)、半复制状态、或者扩容前的 size 配合扩容后的数据指针。这不是"偶尔读到旧值"的问题,而是未定义行为——程序可能崩溃、返回垃圾数据、或者"看起来正常"但实际损坏了内存。

⚠️ 编程陷阱:从锁内返回容器的引用或迭代器 错误做法const std::vector<int>& keyframes() const { lock_guard lock(m_); return keyframes_; } 现象:调用者拿到引用后锁已释放,另一个线程随时可能修改 vector。调用者遍历时可能遇到迭代器失效或数据损坏。 根本原因:锁只在持有期间保护数据。返回引用让调用者在无锁状态下访问受保护对象,等于把保险箱的门打开后交出钥匙。 正确做法:返回副本(return keyframes_;),或让调用者在回调中在锁内访问受控视图。

🧠 思维陷阱:认为"只要每个函数都加锁就线程安全了" 新手想法:"empty() 加锁、pop() 加锁,所以 if (!q.empty()) q.pop() 线程安全。" 实际上:每个函数单独无 data race,但 empty() 返回 false 后、进入 pop() 前,另一个线程可能已经清空了队列。这是 race condition——两个安全操作之间的间隙仍然不安全。 正确思维:并发接口设计要考虑"一个业务动作需要的完整不变量在哪里被保护"。把检查和操作合并为原子接口(如 tryPop() -> optional<T>)。

练习

  1. [分析题]:解释为什么 lock_guard 适合短临界区,unique_lock 适合条件变量,scoped_lock 适合多锁场景。各给一个 SLAM 中的真实用例。
  2. [代码题]:实现一个线程安全的 KeyframeStore,支持 add()snapshot()(返回副本)和 size()。用两个线程同时写入 2000 个 id,验证最终 snapshot 包含全部 id。
  3. [跨章综合题]:C++语言核心/类型系统与值类别推导 讲了 C++ 的值类别和对象模型。在单线程中,对象的"未初始化"是常见陷阱;在多线程中,对象的"并发访问"是常见陷阱。对比两种陷阱的检测工具(单线程用 sanitizer,多线程用 ThreadSanitizer)。

17.5 condition_variable:等待-通知模型 ⭐⭐⭐⭐⭐

这段代码模拟的是什么场景?——Mapping 线程在等什么

想象 ORB-SLAM 的 LocalMapping::Run() 函数。Mapping 线程的主循环是:"从关键帧队列里取一个新关键帧,做局部 BA,然后再取下一个。"但关键帧不是匀速产生的——Tracking 线程只有在当前帧被判定为关键帧时才会入队。大部分时间关键帧队列是空的。

如果 Mapping 线程用忙等(不断检查队列是否为空),它会一直占着一个 CPU 核心做无用功。在 Jetson 这种 4-8 核平台上,一个核心被空转占用意味着 Tracking 或 LoopClosing 可能被调度延迟。更实际地说,如果你在笔记本上开发,忙等的 Mapping 线程会让风扇全速运转——虽然什么有用的计算都没做。

条件变量解决的正是这个问题:Mapping 线程在队列为空时**睡眠**,不占 CPU;Tracking 线程插入关键帧后**唤醒** Mapping 线程。这不仅是 API 层面的优化,而是让整个系统的 CPU 资源分配合理化——睡眠的线程不参与调度,空出来的核心可以服务其他计算密集型任务。

ORB-SLAM3 的 LocalMapping 中使用了类似机制:mbAbortBA 和关键帧队列检查配合 usleep 实现了一种简化的等待-通知模式。而更规范的 C++ 做法是使用 std::condition_variable——它同时解决了"何时睡眠""何时唤醒""如何处理虚假唤醒"三个问题。

工程问题:队列为空时不应该忙等

17.1 节的最小流水线有一个明显缺陷:Mapping 线程在队列为空时不断循环检查。 这种忙等会浪费 CPU,尤其当系统里还有图像解码、点云处理、优化器和 ROS2 executor 时,空转线程会抢占真正需要计算的任务。

生产者消费者队列需要表达的是:

  1. 生产者入队后通知消费者。
  2. 消费者在队列为空时睡眠。
  3. 队列关闭时,消费者即使没有新数据也要醒来退出。
  4. 等待和检查状态必须在同一个 mutex 保护的不变量下完成。

std::condition_variable 正是为这种等待-通知关系设计的。 它不是消息队列,也不保存通知次数。 它只负责让线程在某个条件不满足时睡眠,并在其他线程通知后重新检查条件。

反面失败:先检查再等待导致丢失通知

下面是一个常见错误模式。 代码片段能表达问题,但不应作为队列实现使用。

#include <condition_variable>
#include <mutex>
#include <queue>

std::mutex m;
std::condition_variable cv;
std::queue<int> q;

int badPop() {
    if (q.empty()) {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock);
    }

    int value = q.front();
    q.pop();
    return value;
}

问题至少有四个:

  1. q.empty() 在锁外检查,本身就是并发访问风险。
  2. 检查到空以后、进入 wait() 以前,生产者可能已经入队并 notify_one(),通知丢失。
  3. wait() 返回后没有重新检查队列是否真的非空。
  4. q.empty() 为假而走"快路径"时,q.front()q.pop() 都在锁外执行,与生产者的 push 并发访问同一个 std::queue(至少一个是写),同样是 data race——这条快路径根本没有任何同步保护。

condition_variable 的通知不是事件计数器。 如果没有线程正在等待,notify_one() 不会为未来保存一次唤醒。 因此等待方必须把“状态检查”和“进入等待”放进同一个原子化等待操作中。

抽象不变量:condition_variable 的完整语义模型

条件变量的核心不变量是:

通知只是提示状态可能改变,谓词才是状态是否满足的真相。

理解条件变量需要把握三个层次:接口语义(为什么需要谓词)、原子操作序列(wait 内部做了什么)、以及**虚假唤醒的底层原因**。

为什么需要谓词? 这不是"最佳实践"级别的建议,而是正确性的必要条件。条件变量的 notify_one() / notify_all() 不是事件计数器——它不会"保存"通知供未来使用。如果调用 notify_one() 时没有线程在等待,这次通知直接丢失。因此,等待方不能只依赖"收到通知"来判断条件是否满足,而必须自己检查实际状态。谓词就是这个"实际状态检查"。

标准写法:

cv.wait(lock, [&] {
    return !queue.empty() || closed;
});

这行代码等价于一个循环:

while (!predicate()) {
    cv.wait(lock);
}

wait() 的原子操作序列是理解条件变量的关键。 wait(lock) 内部做了一个精心设计的原子操作序列:

  1. 调用方已经持有 mutex(这是前置条件,违反会导致未定义行为)。
  2. wait() **原子地**释放 mutex 并让线程进入等待状态。"原子地"是关键——释放锁和进入等待必须是不可分割的一步。如果先释放锁再进入等待,生产者可能在释放锁之后、进入等待之前执行 notify_one(),这次通知就丢失了。
  3. 被通知(notify_one / notify_all)或虚假唤醒后,线程被唤醒。
  4. wait() 重新获取 mutex。只有获取成功后,wait() 才返回。
  5. 如果使用谓词版本,返回前再次检查谓词。谓词不满足就继续回到步骤 2。

虚假唤醒(spurious wakeup)的底层原因。 很多教材把虚假唤醒说成"实现的遗憾",但它实际上有深层的技术原因:

  1. POSIX pthread 实现原因pthread_cond_wait 在底层使用 futex 系统调用(Linux)或类似机制。futex 的等待可以被信号中断(EINTR),此时线程被唤醒但条件并未改变。与其让每一层都处理 EINTR 重试逻辑,POSIX 标准选择把"可能的虚假唤醒"作为接口契约的一部分,让调用者用循环+谓词统一处理。
  2. 多处理器优化原因:在某些多处理器架构上,允许虚假唤醒可以避免复杂的跨核心同步,让 notify_one 的实现更高效。如果标准要求"绝不虚假唤醒",实现需要在唤醒路径上做额外的确认步骤,增加延迟。
  3. 多等待者竞争原因:当多个线程等待同一个条件变量,notify_one 唤醒了一个线程。但在被唤醒的线程重新获取 mutex 之前,另一个线程可能先获取了 mutex 并消费了数据。被唤醒的线程最终获取 mutex 后发现数据已被消费——这在效果上等同于虚假唤醒。

因此不写谓词的 wait(lock) 只适合非常底层的封装;业务代码应优先使用 wait(lock, pred)

丢失通知(lost wakeup)问题。 与虚假唤醒相对的是丢失通知。如果生产者在消费者进入 wait() 之前就调用了 notify_one(),这次通知不会被保存——消费者随后进入 wait() 就可能永远睡下去。谓词版本 wait(lock, pred) 同时解决了这两个问题:虚假唤醒后重新检查谓词发现条件不满足就继续等待;丢失通知时因为状态已经满足谓词(生产者已经入队了数据)而直接跳过等待、不进入 wait()

规则推导:生产者消费者队列必须同时处理数据和关闭

一个可用的阻塞队列至少需要:

  1. push():在锁内检查是否关闭,入队,然后通知一个消费者。
  2. pop():等待“队列非空或已关闭”。
  3. close():在锁内设置关闭标志,然后通知所有等待者。
  4. 返回值能表达“拿到数据”和“队列已空且关闭”两种结果。

C++17 可以用 std::optional<T> 表达 pop() 的结果。

#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <utility>

template <typename T>
class MessageQueue {
public:
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(m_);
            if (closed_) {
                throw std::runtime_error("push to closed queue");
            }
            q_.push(std::move(value));
        }
        cv_.notify_one();
    }

    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(m_);
        cv_.wait(lock, [this] {
            return closed_ || !q_.empty();
        });

        if (q_.empty()) {
            return std::nullopt;
        }

        T value = std::move(q_.front());
        q_.pop();
        return value;
    }

    void close() {
        {
            std::lock_guard<std::mutex> lock(m_);
            closed_ = true;
        }
        cv_.notify_all();
    }

private:
    std::mutex m_;
    std::condition_variable cv_;
    std::queue<T> q_;
    bool closed_ = false;
};

notify_one() 放在锁外通常更好。 因为被唤醒的线程需要重新获取同一把 mutex。 如果通知时还持有锁,被唤醒线程可能立刻阻塞在锁上。 不过“锁内通知”也不是语义错误;关键是状态修改必须在通知之前完成。

工程边界:条件变量不替代背压和容量控制

上面的队列是无界队列。 如果 Tracking 生产关键帧速度持续高于 Mapping 消费速度,队列会不断增长。 真实系统需要背压策略:

  • 阻塞生产者,限制队列最大长度。
  • 丢弃旧数据,例如视觉显示帧或低价值中间结果。
  • 合并任务,例如只保留最新状态。
  • 触发降级,例如暂停插入关键帧或降低优化频率。

不同消息语义需要不同策略。 关键帧通常不能随便丢。 实时可视化图像可以丢旧帧。 停止信号不能丢。 这就是为什么工程队列不应只看作 std::queue + mutex,还要把消息语义写进接口。

条件变量还有一个边界:谓词必须只检查受同一把锁保护的状态。 如果谓词同时读一个未同步的外部标志,就会重新引入 data race。

代码验证:阻塞队列驱动 Mapping 线程退出

下面是完整可编译示例。 它验证队列关闭后,消费线程会从 pop() 得到 std::nullopt 并自然退出。

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <thread>
#include <utility>

template <typename T>
class MessageQueue {
public:
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(m_);
            if (closed_) {
                throw std::runtime_error("push to closed queue");
            }
            q_.push(std::move(value));
        }
        cv_.notify_one();
    }

    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(m_);
        cv_.wait(lock, [this] {
            return closed_ || !q_.empty();
        });

        if (q_.empty()) {
            return std::nullopt;
        }

        T value = std::move(q_.front());
        q_.pop();
        return value;
    }

    void close() {
        {
            std::lock_guard<std::mutex> lock(m_);
            closed_ = true;
        }
        cv_.notify_all();
    }

private:
    std::mutex m_;
    std::condition_variable cv_;
    std::queue<T> q_;
    bool closed_ = false;
};

int main() {
    using namespace std::chrono_literals;

    MessageQueue<int> keyframes;

    std::thread mapping([&] {
        while (auto id = keyframes.pop()) {
            std::cout << "mapping consumes " << *id << '\n';
            std::this_thread::sleep_for(20ms);
        }
        std::cout << "mapping exits\n";
    });

    for (int i = 0; i < 4; ++i) {
        keyframes.push(i);
    }

    keyframes.close();
    mapping.join();
}

编译命令示例:

g++ -std=c++17 -O2 -pthread message_queue.cpp && ./a.out

这个队列已经足够支撑本章累积项目。 后续可以加入容量、超时、统计计数和丢弃策略。

条件变量的工作方式可以类比餐厅的叫号系统。顾客(消费线程)到店后如果没有空桌(队列为空),不应该站在门口每秒问一次"有桌了吗"(忙等),而是拿号坐在等候区(wait)。当有桌子空出来时,服务员喊号(notify_one)。顾客被叫醒后还要检查"是不是真的轮到我了"(谓词),因为可能是误叫(虚假唤醒)。如果餐厅关门了(队列关闭),所有等候的顾客都要被通知离开(notify_all)。

如果不使用谓词版本的 wait 会怎样?虚假唤醒时,线程从 wait() 返回后队列仍然为空,如果直接调用 q_.front() 就会触发未定义行为。丢失通知时,生产者在消费者进入 wait() 之前就 notify_one() 了,这次通知不会被保存——消费者随后进入 wait() 就永远睡下去。谓词版本 wait(lock, pred) 同时解决了这两个问题:虚假唤醒后重新检查谓词,丢失通知时因为状态已经满足谓词而直接跳过等待。

⚠️ 编程陷阱:condition_variable::wait() 不写谓词 错误做法cv.wait(lock); 不检查共享状态。 现象:虚假唤醒时消费者在空队列上调用 front(),崩溃或读到垃圾数据。 根本原因:条件变量允许虚假唤醒——即使没有对应通知,wait() 也可能返回。不写谓词就没有防线。 正确做法:始终使用 cv.wait(lock, [&]{ return !q_.empty() || closed_; });

💡 概念误区:认为 notify_one() 会"保存"一次通知 新手想法:"生产者先 notify_one(),消费者稍后 wait() 时就能收到这次通知。" 实际上:条件变量的通知不是事件计数器。如果没有线程正在等待,notify_one() 什么都不做——通知直接丢失。这也是为什么必须把状态检查和进入等待放在同一把锁保护下:先检查状态,状态不满足才进入 wait()正确理解:通知只是"提示状态可能改变",谓词才是"状态是否满足"的真相。

练习

  1. [分析题]:解释 cv.wait(lock, pred) 等价于 while (!pred()) cv.wait(lock);。为什么这个循环能同时解决虚假唤醒和丢失通知?
  2. [代码题]:给 MessageQueue 添加 tryPopFor(duration) 方法,超时返回 std::nullopt。使用 cv.wait_for(lock, timeout, pred) 实现。
  3. [跨章综合题]:原子操作与内存模型 将介绍 atomic 操作。condition_variable 使用 mutex + 谓词实现等待-通知;atomic 可以实现无等待的状态检查。讨论:什么场景适合条件变量(如队列非空),什么场景适合 atomic(如停止标志)?

17.6 Deadlock:四个必要条件与多锁策略 ⭐⭐⭐⭐⭐

从一个真实场景出发:两个线程各持一把锁,想要对方的锁

死锁不是一个需要死记四条理论条件的抽象概念。让我们从一个具体的、在 SLAM 系统中真实会发生的场景开始,然后从这个场景中**推导出**四条条件——而不是反过来。

场景:ORB-SLAM 风格的系统中,Mapping 线程需要从关键帧队列取出新关键帧,然后插入地图。这个操作涉及两把锁——queue_mutex(保护关键帧队列)和 map_mutex(保护地图数据)。Mapping 线程的逻辑是"先锁队列取关键帧,然后锁地图插入关键帧"。

与此同时,Tracking 线程在决定是否插入新关键帧时,需要查看局部地图的状态和队列的长度。Tracking 线程的逻辑是"先锁地图查状态,然后锁队列检查长度"。

注意两个线程的锁获取**顺序相反**:

时刻 T1:
  Mapping  线程:获取 queue_mutex ✓  →  准备获取 map_mutex...
  Tracking 线程:获取 map_mutex  ✓  →  准备获取 queue_mutex...

时刻 T2:
  Mapping  线程:持有 queue_mutex,等待 map_mutex(被 Tracking 持有)
  Tracking 线程:持有 map_mutex,等待 queue_mutex(被 Mapping 持有)

结果:两个线程永久等待,系统冻结。

这就是死锁。不是理论概念,而是"程序突然不动了,top 显示两个线程都在 futex_wait,但没有任何输出"的真实故障。

从这个场景推导四条必要条件:为什么死锁能发生?让我们逐一检查这个场景中满足了哪些前提——如果其中任何一条不满足,死锁就不可能出现:

  1. 互斥:两把锁都是互斥的——同一时刻只能被一个线程持有。如果锁可以被多个线程同时持有(比如只读操作用 shared_lock),就不需要等待,死锁不可能发生。
  2. 持有并等待:每个线程在已经持有一把锁的情况下,去等待另一把锁。如果线程在获取第二把锁之前必须先释放第一把锁,就不会出现"双方各持一把"的对峙局面。
  3. 不可剥夺:一个线程持有的锁不能被另一个线程强行抢走。如果系统可以强行中断 Tracking 线程并释放 map_mutex,Mapping 线程就能继续——但这会让被保护的地图数据处于不一致状态,比死锁更危险。
  4. 循环等待:Mapping 等 Tracking 释放 map_mutex,Tracking 等 Mapping 释放 queue_mutex——形成了等待环。如果两个线程都按同一顺序获取锁(比如都先锁 queue_mutex 再锁 map_mutex),等待关系就是一条链而非环,链的末端线程能完成工作并释放锁,连锁反应让所有线程最终都能继续。

这四条就是 Coffman、Elphick 和 Shoshani 1971 年在"System Deadlocks"论文中证明的死锁四个必要条件——它们不是人为规定的,而是从"两个线程各持一把锁想要对方的锁"这个物理现象中提取出的结构性条件。只要破坏其中任何一条,死锁就不可能发生。

工程问题:地图锁和队列锁经常一起出现

SLAM 系统中,多锁访问很常见。 上面的场景不是特例。LoopClosing 线程读取地图后,可能向优化队列提交任务。 Tracking 线程可能在决定是否插入关键帧时同时查看局部地图和队列长度。 传感器回调可能在持有 IMU 缓冲锁时试图获取状态锁更新时间戳。

每一对锁的获取,如果不同路径使用不同顺序,就可能形成等待环。

反面失败:不同路径使用不同锁顺序

下面代码可编译,但可能永久卡住。 它是一个反例,用来观察锁顺序错误。

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

int main() {
    using namespace std::chrono_literals;

    std::mutex map_mutex;
    std::mutex queue_mutex;

    std::thread tracking([&] {
        std::lock_guard<std::mutex> q_lock(queue_mutex);
        std::this_thread::sleep_for(10ms);
        std::lock_guard<std::mutex> m_lock(map_mutex);
        std::cout << "tracking updated queue and map\n";
    });

    std::thread mapping([&] {
        std::lock_guard<std::mutex> m_lock(map_mutex);
        std::this_thread::sleep_for(10ms);
        std::lock_guard<std::mutex> q_lock(queue_mutex);
        std::cout << "mapping updated map and queue\n";
    });

    tracking.join();
    mapping.join();
}

这段程序的问题不是 sleep。 sleep 只是放大时序窗口。 真实系统中,图优化、内存分配、日志输出、回调调度都可能制造同样窗口。

抽象不变量:如何预防死锁——逐条破坏必要条件

前面我们从具体场景推导出了死锁的四个必要条件。工程上最常破坏的是**条件四(循环等待)**——通过建立全局锁顺序消除等待环。为什么优先破坏这一条?因为条件一(互斥)通常不能取消——我们用 mutex 就是为了互斥;条件三(不可剥夺)也不能轻易取消——强行释放锁会让被保护的数据处于半修改状态。条件二(持有并等待)可以通过 std::scoped_lock 一次性获取所有锁来破坏,但这要求调用点知道所有需要的锁。条件四通过锁排序来破坏是最通用的策略——只需要一个全局约定,所有代码路径遵守即可。

具体手段:

  • 锁排序(Lock Ordering):为所有 mutex 定义一个全局偏序关系,所有代码路径必须按这个顺序获取锁。回顾上面的场景:如果规定"必须先锁 queue_mutex 再锁 map_mutex",那么 Tracking 线程就不能"先锁 map 再锁 queue",循环等待在数学上不可能形成——等待图(wait-for graph)变成了 DAG。
  • std::scoped_lock:一次获取多把锁,内部使用 try-and-back-off 算法保证不形成环。这破坏了条件二——线程不会"持有一把锁再等另一把",而是要么同时获取所有锁,要么一个都不持有。
  • 缩短临界区:持锁时不调用外部代码(回调、ROS publish、优化器),避免在锁内引入不可控的新锁需求。
  • 消息队列替代共享状态:把"多线程访问同一对象"转化为"通过队列传递消息",从根本上减少多锁需求。

规则推导:锁排序和 std::scoped_lock

锁排序要求为所有 mutex 定义一个固定顺序。 例如:

system_state_mutex < queue_mutex < map_mutex < optimizer_mutex

任何线程只要需要多把锁,都必须按这个顺序获取。 这样就不可能形成循环等待。

C++17 的 std::scoped_lock 可以一次接收多把 mutex:

std::scoped_lock lock(queue_mutex, map_mutex);

它内部使用避免死锁的锁定算法。 这不意味着“自动解决所有 deadlock”。 它只解决这一行里多把 mutex 的获取问题。 如果你先手动持有了一把锁,再在深层函数里获取另一把锁,仍然可能和其他路径形成环。

C++11/14 可以用 std::lockstd::adopt_lock

std::lock(queue_mutex, map_mutex);
std::lock_guard<std::mutex> q_lock(queue_mutex, std::adopt_lock);
std::lock_guard<std::mutex> m_lock(map_mutex, std::adopt_lock);

std::scoped_lock 更简洁,优先使用。

工程边界:持锁时不要调用不受控回调

另一个死锁来源是锁内调用外部代码。 例如地图对象在持有 map_mutex_ 时调用用户注册的回调:

// 反例片段:回调可能再次访问地图或队列。
void notifyMapChanged() {
    std::lock_guard<std::mutex> lock(map_mutex_);
    callback_();
}

如果 callback_() 内部又调用需要 map_mutex_ 的函数,可能自锁。 如果它访问队列,也可能形成跨锁环。

更稳妥的做法是锁内准备事件数据,锁外调用回调:

void notifyMapChangedSafely() {
    MapEvent event;
    {
        std::lock_guard<std::mutex> lock(map_mutex_);
        event = buildEventLocked();
    }

    callback_(event);
}

机器人系统里尤其要避免锁内做这些事:

  • 调用 ROS publish、service、action。
  • 调用优化器长时间求解。
  • 写磁盘或保存地图。
  • 调用未知回调。
  • 等待另一个线程结束。

这些操作的耗时和内部锁不可控。

代码验证:用 std::scoped_lock 安全移动关键帧

下面示例展示同时锁定队列和地图。 它刻意让临界区只做移动和插入,昂贵计算放到锁外。

#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <vector>

struct Keyframe {
    int id = 0;
};

class SharedState {
public:
    void pushKeyframe(Keyframe kf) {
        std::lock_guard<std::mutex> lock(queue_mutex_);
        queue_.push(std::move(kf));
    }

    std::optional<Keyframe> moveOneKeyframeToMap() {
        std::scoped_lock lock(queue_mutex_, map_mutex_);

        if (queue_.empty()) {
            return std::nullopt;
        }

        Keyframe kf = std::move(queue_.front());
        queue_.pop();
        map_keyframes_.push_back(kf.id);
        return kf;
    }

    std::vector<int> mapSnapshot() const {
        std::lock_guard<std::mutex> lock(map_mutex_);
        return map_keyframes_;
    }

private:
    mutable std::mutex map_mutex_;
    std::mutex queue_mutex_;
    std::queue<Keyframe> queue_;
    std::vector<int> map_keyframes_;
};

int main() {
    SharedState state;
    state.pushKeyframe(Keyframe{1});
    state.pushKeyframe(Keyframe{2});

    std::thread mapping([&] {
        while (auto kf = state.moveOneKeyframeToMap()) {
            std::cout << "inserted " << kf->id << '\n';
        }
    });

    mapping.join();

    for (int id : state.mapSnapshot()) {
        std::cout << "map has " << id << '\n';
    }
}

多锁不是常态目标。 如果某个设计需要频繁同时持有三四把锁,通常说明数据边界需要重新切分。

⚠️ 编程陷阱:不同代码路径使用不同的锁顺序 错误做法:Tracking 线程先锁 queue_mutex 再锁 map_mutex;Mapping 线程先锁 map_mutex 再锁 queue_mutex现象:两个线程各自持有一把锁等待另一把,系统永久卡死。只在特定时序下触发,极难复现。 根本原因:循环等待是死锁的必要条件之一。不同路径使用不同锁顺序就可能形成循环。 正确做法:定义全局锁顺序(如 state < queue < map < optimizer),所有路径遵守;或用 std::scoped_lock 一次获取多把锁。

💡 概念误区:认为 std::scoped_lock 自动解决所有死锁 新手想法:"std::scoped_lock(a, b) 能避免死锁,所以以后都用它就行了。" 实际上scoped_lock 只解决"同时获取多把锁"的顺序问题。如果你先手动持有了一把锁,然后在深层函数里获取另一把锁,scoped_lock 看不到先前持有的锁,仍然可能和其他路径形成环。 正确理解scoped_lock 是工具,锁设计是策略。工具只能辅助策略,不能替代策略。

练习

  1. [分析题]:列出死锁的四个必要条件(互斥、持有并等待、不可剥夺、循环等待),对每个条件说明在 SLAM 系统中是否可以被破坏,以及如何破坏。
  2. [代码题]:编写一个故意死锁的程序(两个线程使用相反的锁顺序)。然后用 std::scoped_lock 修复。使用 GDB 或 gdb -batch -ex "thread apply all bt" 观察死锁时的线程状态。
  3. [跨章综合题]:C++语言核心/错误处理与异常安全 讲了异常安全。在持有锁时如果函数抛异常,lock_guard 会确保锁被释放(RAII)。讨论:如果在双锁临界区中,第一把锁获取成功但第二把锁获取前抛异常,系统状态是否一致?scoped_lock 如何处理这种情况?

17.7 shared_mutex:读多写少地图查询 ⭐⭐⭐⭐

这段代码模拟的是什么场景?——一张地图被多个模块同时查询

考虑 SLAM 系统中地图的访问模式。在一个典型的运行周期内:Tracking 线程以 30Hz 查询局部地图做特征匹配(每秒 30 次读);可视化线程以 10Hz 读取地图点和关键帧用于渲染(每秒 10 次读);LoopClosing 线程在检索候选回环时读取关键帧数据库(每秒 1-3 次读)。而 Mapping 线程插入新关键帧和地图点的频率大约只有 2-5Hz(每秒 2-5 次写)。

这意味着**读操作的频率是写操作的 10-20 倍**。如果用普通 std::mutex,Tracking 在读地图时持有互斥锁,可视化线程想同时读地图就必须等待——但两个纯读操作之间根本不需要互斥!它们各自独立地遍历相同的数据结构,不会互相干扰。把纯读操作也互斥排队,就像图书馆规定"同一时间只能有一个人看书"——完全不必要的限制。

std::shared_mutex 解决的就是这个问题:允许多个读者同时持有共享锁(共同看书),但写者需要独占锁(修改书架时所有人都暂停阅读)。

工程问题:地图查询远多于地图修改

SLAM 地图通常是读多写少。 Tracking 高频读取局部地图点用于匹配。 可视化线程读取关键帧和地图点发布。 LoopClosing 读取关键帧数据库做检索。 Mapping 相对低频地插入关键帧、剔除地图点、更新局部图。

如果所有读写都用一把 std::mutex,多个纯读操作也会互相排队。 std::shared_mutex 提供读写锁语义:

  • 多个读者可以同时持有共享锁。
  • 写者需要独占锁。
  • 写者持锁时,读者不能进入。

C++17 提供 std::shared_mutexstd::shared_lock。 写锁仍用 std::unique_lock<std::shared_mutex>

反面失败:读锁内返回内部引用

读写锁不改变生命周期规则。 下面仍然是错误边界:

#include <shared_mutex>
#include <vector>

class BadSharedMap {
public:
    const std::vector<int>& keyframes() const {
        std::shared_lock<std::shared_mutex> lock(m_);
        return keyframes_;
    }

private:
    mutable std::shared_mutex m_;
    std::vector<int> keyframes_;
};

函数返回时读锁释放。 调用者拿到的引用不再受保护。 写者随后可以修改 vector。 shared_mutex 只允许读并发,不允许锁外引用并发。

抽象不变量:共享锁保护的是只读观察窗口

共享锁的不变量是:

持有共享锁期间,受保护对象不会被写者修改;释放共享锁后,观察窗口结束。

因此读操作有两种安全模式:

  1. 在读锁内完成全部读取和轻量计算。
  2. 在读锁内复制必要快照,释放锁后做重计算。

第一种适合很短的查询,例如按 id 找位姿。 第二种适合较重的算法,例如复制局部地图点坐标后做匹配。

写操作必须用独占锁。 写锁临界区应尽量短。 如果写者需要先做昂贵计算,应先在锁外准备数据,再进入写锁提交修改。

读写锁的理论基础:公平性、饥饿与性能权衡

读写锁看起来是一个"严格更好"的同步原语——它允许多个读者并发,同时保证写者独占。但这种直觉忽略了一个关键的设计维度:公平性(fairness)。不同的公平策略会导致截然不同的性能和正确性特征。

读者优先(Reader-preference)。 在这种策略下,只要有读者持锁或等待读锁,新的读者都可以立即获取共享锁,即使有写者在等待。好处是读者永远不会被写者阻塞;坏处是写者可能被无限延迟——如果读者源源不断到来,写者永远等不到空窗期。这叫**写者饥饿(writer starvation)**。在 SLAM 系统中,如果 Tracking 和可视化线程不断读取地图,Mapping 线程可能永远无法获得写锁来插入新关键帧——系统看起来"运行正常"但地图不再更新。

写者优先(Writer-preference)。 在这种策略下,一旦有写者等待独占锁,新的读者会被阻塞,直到所有等待中的写者完成。好处是写者不会被饥饿;坏处是读者可能被连续的写操作延迟。在实时系统中,如果 Mapping 和 LoopClosing 频繁写入地图,Tracking 的读取可能出现不可接受的延迟。

公平锁(Fair lock)。 按请求到达的顺序排队,不区分读写。公平但吞吐可能更低,因为连续的读请求之间被写请求打断,无法充分利用读并发的优势。

C++ 标准的选择。 C++ 标准没有规定 std::shared_mutex 的公平策略——这是实现定义的。Linux 上的 pthread_rwlock 默认是写者优先(可通过属性修改)。因此,在没有测量之前不应假设读写锁一定比普通 mutex 更好。工程上的决策应基于实际负载的读写比例:当读写比高于 10:1 且读临界区足够短时,读写锁通常有收益;当读写比接近 1:1 或写临界区很长时,普通 mutex 可能更简单也更可预测。

读写锁的隐藏成本。 除了公平性问题,读写锁还有一个容易被忽视的性能陷阱:共享锁的获取不是免费的。每个读者获取共享锁时,都需要原子地修改一个共享的读者计数器——这意味着在多核 CPU 上,频繁获取共享锁会导致计数器所在的 cache line 在多个核心之间弹跳(cache line bouncing)。如果读操作非常短(如读取一个原子标志),cache line 弹跳的开销可能超过读操作本身的计算量,使得读写锁比 std::mutex 更慢。对于这类场景,std::atomic 或无锁设计可能是更好的选择。

规则推导:读写锁适合稳定容器,不适合高频写入热点

读写锁能提高吞吐的前提是读操作多、读临界区短、写操作少。 如果写操作频繁,读写锁可能比普通 mutex 更慢,因为内部状态更复杂。

读写锁还涉及公平性。 标准没有保证所有实现都采用同样策略。 某些实现可能读者优先,写者在持续读负载下等待很久。 某些实现可能写者优先,新读者会被阻塞以避免写饥饿。 这会影响实时性:

  • 如果 Tracking 读锁长时间占用,Mapping 写地图会延迟。
  • 如果写者优先,Tracking 可能被地图更新阻塞。

因此读锁内不要做点云配准、图优化、文件写入。 读锁应只取快照或完成小查询。

工程边界:地图一致性可能跨越多个容器

读写锁不是数据库事务。 如果地图状态分布在多个对象中,例如:

  • keyframes_
  • landmarks_
  • observations_
  • covisibility_graph_

只给其中一个容器加读写锁不能保证整体一致。 要么使用同一把锁保护同一个地图不变量,要么把数据拆成可独立一致的子结构,并清楚定义跨结构同步点。

例如局部地图查询可以先复制关键帧 id 列表,再按 id 查询稳定的只读关键帧对象。 但如果关键帧对象本身也会被回环修正修改,就需要版本号、快照或更高层锁。

代码验证:读快照与写提交

下面示例展示 shared_mutex 的基本使用。 读者复制快照,写者独占插入。

#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <vector>

class MapIndex {
public:
    void addKeyframe(int id) {
        std::unique_lock<std::shared_mutex> lock(m_);
        keyframe_ids_.push_back(id);
    }

    std::vector<int> snapshotIds() const {
        std::shared_lock<std::shared_mutex> lock(m_);
        return keyframe_ids_;
    }

    std::size_t size() const {
        std::shared_lock<std::shared_mutex> lock(m_);
        return keyframe_ids_.size();
    }

private:
    mutable std::shared_mutex m_;
    std::vector<int> keyframe_ids_;
};

int main() {
    MapIndex map;

    std::thread writer([&] {
        for (int i = 0; i < 10; ++i) {
            map.addKeyframe(i);
        }
    });

    std::thread reader([&] {
        for (int i = 0; i < 10; ++i) {
            auto ids = map.snapshotIds();
            std::cout << "snapshot size = " << ids.size() << '\n';
        }
    });

    writer.join();
    reader.join();

    std::cout << "final size = " << map.size() << '\n';
}

编译命令示例:

g++ -std=c++17 -O2 -pthread shared_map.cpp && ./a.out

如果读快照很大,应考虑只复制局部子图、只复制 id、或者使用不可变数据块加原子替换。

⚠️ 编程陷阱:读写锁内返回内部引用 错误做法const vector<int>& keyframes() const { shared_lock lock(m_); return keyframes_; } 现象:函数返回时读锁释放,调用者拿到的引用不再受保护。写者随时可能修改 vector。 根本原因shared_mutex 只保护持锁期间的观察窗口。释放锁后,引用指向的数据重新暴露给写者。 正确做法:在读锁内复制快照返回,或在读锁内完成全部读取操作。

💡 概念误区:认为读写锁总比 mutex 快 新手想法:"读写锁允许并发读,所以替换 mutex 一定能提升性能。" 实际上shared_mutex 的内部状态比 mutex 更复杂(要维护读者计数和写者等待队列)。如果写操作频繁或读临界区很短,shared_mutex 可能比 mutex 更慢。只有在"读多写少且读操作确实有重叠"的场景下,读写锁才有优势。 正确理解:先用 mutex 写正确,再用 profiler 证明 mutex 是瓶颈,最后才考虑换 shared_mutex

练习

  1. [分析题]:SLAM 地图查询是典型的"读多写少"场景。讨论在什么条件下 shared_mutex 会比 mutex 更好,在什么条件下更差。
  2. [代码题]:用 shared_mutex 实现一个 MapIndex 类,支持 addKeyframe()(独占锁)和 snapshotIds()(共享锁)。用 benchmark 对比一个写者 + 多个读者场景下 shared_mutexmutex 的吞吐差异。
  3. [跨章综合题]:原子操作与内存模型 将介绍 atomic 操作和双缓冲。对于"最新状态发布"场景(可视化只需要最新位姿),shared_mutex + 快照和 atomic 双缓冲各有什么优劣?

17.8 deque + mutex 传感器缓冲范式 ⭐⭐⭐⭐⭐

传感器缓冲的理论模型:生产者消费者的多频率变体

传感器缓冲是 线程管理与互斥同步.5 生产者消费者队列在机器人系统中的特化应用。但与通用的 FIFO 队列不同,传感器缓冲有几个独特的约束,使得它的设计需要额外考量。

多频率异步生产者。 通用生产者消费者队列通常假设一个或少数几个生产者。机器人传感器缓冲面对的是多个不同频率的生产者——IMU 200-400 Hz、LiDAR 10-20 Hz、轮式里程计 50-100 Hz、GPS 1-10 Hz。这些生产者的时钟可能不同步,数据到达的时间可能抖动(jitter),而消费者(如融合线程)需要按时间顺序取出多个传感器的数据并对齐。

时间有序窗口而非简单 FIFO。 通用队列的消费操作是"取走最早的元素"。传感器缓冲的消费操作通常是"取走时间区间 \([t_1, t_2]\) 内的所有元素"——这意味着消费者需要按时间戳查找和截取,而不是简单的 pop_front()std::deque 适合这种模式:它支持两端高效插入删除(新数据 push_back,过期数据 pop_front),也支持随机访问以进行时间戳二分查找。

"短临界区 + 复制小片段"的设计模式。 传感器回调通常运行在驱动线程或 ROS2 executor 线程中,这些线程有严格的实时约束——回调不能长时间持锁。因此传感器缓冲的设计原则是:回调只做"锁 -> push_back -> 解锁"三步,持锁时间控制在微秒级。消费者取数据时,也应在锁内完成时间戳查找和数据复制(复制到局部变量),然后在锁外进行插值、积分等重计算。绝不能在持锁期间执行积分或矩阵运算——这会让其他传感器回调排队等待,造成数据丢失。

旧数据剪裁(trimming)的必要性。 传感器数据源源不断到来,如果不剪裁旧数据,缓冲区会无限增长。剪裁策略通常基于两个条件:(1) 时间窗口——只保留最近 N 秒的数据;(2) 已消费标记——已经被融合线程取走的数据可以安全删除。剪裁应在持锁时进行,但由于 std::dequepop_front\(O(1)\) 操作,剪裁的持锁时间很短。

工程问题:IMU、LiDAR、Odom 频率不同但必须按时间对齐

机器人传感器不是同步到达的。 典型频率可能是:

  • IMU:200Hz 到 1000Hz。
  • LiDAR:10Hz 到 20Hz。
  • Camera:30Hz 到 60Hz。
  • Wheel Odom:50Hz 到 100Hz。

LiDAR 去畸变需要某一帧点云时间段内的 IMU。 紧耦合里程计需要点云时间戳附近的 odom 或预积分状态。 ROS 回调线程负责接收消息,处理线程负责取一段时间窗的数据。

通用结构是:

传感器回调:短临界区 push_back 到 deque
处理线程:短临界区读取和裁剪时间窗
同步规则:按 timestamp 单调维护缓冲区

std::deque 很适合这个范式:

  1. 尾部 push_back 高效。
  2. 头部 pop_front 高效。
  3. 不要求像 vector 那样整体连续搬移元素。
  4. 适合滑动时间窗。

std::vector 也能保存传感器数据,但频繁从头部删除会搬移大量元素。 std::queue 又不能方便地遍历和按时间裁剪。 因此 deque 是传感器缓冲的常见折中。

反面失败:回调里做重处理

反面写法是在 LiDAR 回调里直接等待 IMU、去畸变、配准、优化并发布。 这会把传感器接收路径变成重计算路径。 当某次配准耗时变长,回调积压,DDS/ROS 缓冲开始堆积或丢消息。

另一个反面写法是多个回调无锁写同一个 deque:

#include <deque>

struct ImuMsg {
    double stamp = 0.0;
};

std::deque<ImuMsg> imu_buffer;

void imuCallback(const ImuMsg& msg) {
    imu_buffer.push_back(msg);  // 反例:如果处理线程也访问,会 data race。
}

容器本身不会因为只在一端 push、另一端 pop 就自动线程安全。 只要并发访问同一容器,仍然需要同步。

抽象不变量:缓冲区维护时间有序窗口

传感器缓冲区应维护几个不变量:

  1. 每个 deque 内部按时间戳非递减排列。
  2. 回调只做入队和轻量统计。
  3. 处理线程按目标时间窗取数据。
  4. 已经早于保留窗口的数据会被裁剪。
  5. 所有访问同一 deque 的操作都持有同一把 mutex。

LiDAR 去畸变常需要 [scan_start, scan_end] 内的 IMU。 为了插值,还可能需要 scan_start 之前最近的一条 IMU 和 scan_end 之后第一条 IMU。 因此裁剪时不能简单删除所有 < scan_start 的数据。 常见做法是保留一个 margin:

keep_from = scan_start - 0.1s
删除 stamp < keep_from 的旧 IMU

具体 margin 取决于传感器频率、插值方法和时间同步误差。

规则推导:取时间窗时优先复制小片段

处理线程有两种读取方式:

  1. 锁内遍历并复制目标时间窗,锁外处理。
  2. 锁内移动走一段数据,所有权转交处理线程。

对于 IMU 这种消息小、频率高的数据,复制一段窗口通常可以接受。 对于点云这种消息大、移动成本低的数据,常用智能指针或移动语义传递。

关键规则是:不要持有缓冲 mutex 做点云配准或优化。 锁只保护 deque 的结构和时间窗选择。 算法计算使用局部副本或移动出来的数据。

工程边界:时间同步失败要显式表达

如果 LiDAR 帧到达时 IMU 数据还没覆盖到 scan_end,处理线程不能假装成功。 可选策略包括:

  • 等待更多 IMU 数据,直到超时。
  • 暂时缓存 LiDAR 帧,下一轮再处理。
  • 丢弃该 LiDAR 帧并记录原因。
  • 使用外推,但必须标记精度风险。

这些策略的工程含义不同。 在线定位通常宁可短暂等待或丢帧,也不要无声使用错误时间窗。 建图离线处理可以等待完整数据。

代码验证:IMU 时间窗提取和旧数据剪裁

下面示例可单文件编译。 它模拟 IMU 回调入队,处理线程提取某个 LiDAR 时间窗所需的 IMU 数据。

#include <deque>
#include <iostream>
#include <mutex>
#include <optional>
#include <vector>

struct ImuMsg {
    double stamp = 0.0;
    double ax = 0.0;
};

class ImuBuffer {
public:
    void push(ImuMsg msg) {
        std::lock_guard<std::mutex> lock(m_);
        if (!imu_.empty() && msg.stamp < imu_.back().stamp) {
            out_of_order_count_++;
            return;
        }
        imu_.push_back(msg);
    }

    std::optional<std::vector<ImuMsg>> extractWindow(double start, double end) {
        std::lock_guard<std::mutex> lock(m_);

        if (imu_.empty() || imu_.front().stamp > start || imu_.back().stamp < end) {
            return std::nullopt;
        }

        std::vector<ImuMsg> window;
        for (const auto& msg : imu_) {
            if (msg.stamp >= start && msg.stamp <= end) {
                window.push_back(msg);
            }
        }

        const double keep_from = start - 0.1;
        while (imu_.size() > 1 && imu_[1].stamp < keep_from) {
            imu_.pop_front();
        }

        return window;
    }

    std::size_t size() const {
        std::lock_guard<std::mutex> lock(m_);
        return imu_.size();
    }

    int outOfOrderCount() const {
        std::lock_guard<std::mutex> lock(m_);
        return out_of_order_count_;
    }

private:
    mutable std::mutex m_;
    std::deque<ImuMsg> imu_;
    int out_of_order_count_ = 0;
};

int main() {
    ImuBuffer buffer;

    for (int i = 0; i < 50; ++i) {
        buffer.push(ImuMsg{0.01 * i, 1.0});
    }

    auto window = buffer.extractWindow(0.12, 0.20);
    if (!window) {
        std::cout << "not enough imu data\n";
        return 0;
    }

    std::cout << "imu samples in window = " << window->size() << '\n';
    std::cout << "buffer size after trim = " << buffer.size() << '\n';
    std::cout << "out of order = " << buffer.outOfOrderCount() << '\n';
}

这个例子没有用条件变量。 如果处理线程需要等待 IMU 覆盖 LiDAR 结束时间,可以给缓冲区增加 condition_variable,在 push() 后通知等待者。 等待谓词应检查 !imu_.empty() && imu_.back().stamp >= end 或关闭标志。

⚠️ 编程陷阱:在传感器回调中执行重计算 错误做法:在 LiDAR 回调函数里直接做去畸变、配准、优化并发布结果。 现象:当某次配准耗时变长,回调积压,DDS/ROS 缓冲堆积或丢消息。系统出现周期性卡顿。 根本原因:回调函数应该尽快返回。把重计算塞进回调等于让传感器驱动等待算法完成。 正确做法:回调只做短临界区入队(push_back + unlock),重计算由独立处理线程消费队列完成。

💡 概念误区:认为 std::deque 是线程安全的 新手想法:"deque 一端 push 一端 pop,两端不互相影响,应该不需要锁。" 实际上std::deque 的内部结构(分块数组指针和 size)是跨所有操作共享的。即使一个线程只 push_back、另一个只 pop_front,两者都会修改 size 和可能触发内部指针重新分配。标准容器不是线程安全的。 正确理解:只要并发访问同一容器(无论是同一端还是不同端),都需要同步。无锁 SPSC 队列是专门设计的数据结构(原子操作与内存模型),不是 deque 加了"天然分离"就能实现的。

练习

  1. [分析题]:解释为什么 IMU 缓冲区的裁剪不能简单地删除所有 stamp < scan_start 的数据。需要保留什么 margin?margin 的大小取决于什么?
  2. [代码题]:给 ImuBuffer 添加 condition_variable,让处理线程在 IMU 数据未覆盖到 scan_end 时阻塞等待。编写测试:先推入不够的 IMU 数据,验证处理线程阻塞;再推入足够的数据,验证处理线程醒来。
  3. [跨章综合题]:原子操作与内存模型 将介绍 SPSC ring buffer。对比 deque + mutex 和 SPSC ring buffer 在 IMU 缓冲场景中的适用条件。什么情况下 SPSC 更合适?什么情况下 deque + mutex 更灵活?

17.9 std::chrono:时间点、持续时间与计时器 ⭐⭐⭐⭐

工程问题:机器人代码里有两种时间

机器人系统至少同时使用两类时间:

  1. 传感器时间戳:来自硬件、驱动或 ROS message,用于数据同步和状态估计。
  2. 程序计时:用于测量函数耗时、超时、周期任务和性能统计。

这两类时间不能混用。 传感器时间可能跟随仿真时间、ROS time 或设备时钟。 性能计时应该使用单调时钟,避免系统时间被 NTP 或用户调整导致耗时为负。

std::chrono 提供类型安全的时间表达:

  • duration 表示一段时间,例如 std::chrono::milliseconds
  • time_point 表示某个时钟上的时刻。
  • steady_clock 单调递增,适合性能计时和超时。
  • system_clock 表示系统实时时钟,适合转换日历时间或日志时间。
  • high_resolution_clock 不保证语义,可能是前两者之一的别名。

SLAM 性能计时优先用 steady_clock

反面失败:用 double 表示所有时间

很多代码把时间都写成 double seconds。 这简单,但会丢失语义。

double start = now();
runOptimization();
double elapsed = now() - start;

问题是读者不知道 now() 来自系统时间、ROS time 还是单调时钟。 单位也可能混乱:秒、毫秒、纳秒都可能被塞进 double。 当函数参数写成 double timeout,调用者也很难知道应该传 0.1 还是 100

std::chrono 用类型把单位写清楚:

using namespace std::chrono_literals;
auto timeout = 100ms;

这比裸 100 更难误用。

抽象不变量:同一段耗时必须来自同一时钟

计时不变量是:

两个 time_point 只有来自同一个 clock,差值才有明确含义。

steady_clock::now()system_clock::now() 不能混减。 编译器会阻止这种错误。 这就是 chrono 类型系统的价值。

性能计时器的 RAII 模式也很自然:

  1. 构造时记录开始时刻。
  2. 析构时记录结束时刻。
  3. 输出或累计耗时。

这适合函数级耗时统计,尤其在异常路径上也能记录耗时。

规则推导:ROS2 timer 和 chrono literals

C++14 引入 chrono literals:

using namespace std::chrono_literals;
auto period = 20ms;
auto timeout = 1s;

ROS2 C++ API 常接受 chrono duration 创建 wall timer:

// 片段:需要在 rclcpp Node 成员函数中使用。
timer_ = create_wall_timer(20ms, [this] {
    publishOdometry();
});

create_wall_timer 表达的是墙上时间周期,不等于传感器消息时间戳。 仿真时间、ROS time、message header stamp 是另一个时间体系。 写机器人代码时要在命名中区分:

  • stamp:传感器或消息时间戳。
  • received_time:本机收到消息的时间。
  • elapsed:程序耗时。
  • period:周期任务间隔。
  • timeout:等待上限。

工程边界:计时器本身也会扰动系统

性能计时不是免费操作。 高频热路径里每个点、每个残差都调用 now() 会明显增加开销。 合理粒度通常是:

  • 每帧 Tracking 耗时。
  • 每次局部地图更新耗时。
  • 每次回环检测耗时。
  • 每次优化求解耗时。
  • 队列等待时间和队列长度统计。

计时输出也不要在锁内频繁写 std::cout。 日志 I/O 可能持有内部锁并阻塞。 更好的方式是把耗时写入统计对象或低频日志。

代码验证:ScopedTimer 和耗时统计

下面代码可单文件编译。 ScopedTimer 使用 steady_clock,析构时输出耗时。

#include <chrono>
#include <iostream>
#include <string>
#include <thread>

class ScopedTimer {
public:
    explicit ScopedTimer(std::string name)
        : name_(std::move(name)),
          start_(Clock::now()) {}

    ~ScopedTimer() {
        const auto end = Clock::now();
        const auto us = std::chrono::duration_cast<std::chrono::microseconds>(
            end - start_);
        std::cout << name_ << " took " << us.count() << " us\n";
    }

private:
    using Clock = std::chrono::steady_clock;

    std::string name_;
    Clock::time_point start_;
};

int main() {
    using namespace std::chrono_literals;

    {
        ScopedTimer timer("local mapping");
        std::this_thread::sleep_for(12ms);
    }
}

编译命令示例:

g++ -std=c++17 -O2 -pthread scoped_timer.cpp && ./a.out

真实项目里可以把输出替换成统计累加器。 例如记录最近 100 帧 Tracking 平均耗时、最大耗时和超时次数。

⚠️ 编程陷阱:用 double seconds 表示所有时间并混用不同时钟 错误做法:传感器时间戳和性能计时都用 double seconds,函数参数写 double timeout 不注明单位。 现象:调用者不知道 timeout 是秒、毫秒还是微秒。传感器时间来自 ROS time,性能计时来自 steady_clock,两者混减导致负数或巨大时间差。 根本原因double 不携带单位和时钟来源信息。std::chrono 的类型系统能在编译期阻止不同时钟的 time_point 混合运算。 正确做法:性能计时用 steady_clock,持续时间用 chrono::milliseconds 等类型,传感器时间戳在命名中标注来源(如 sensor_stampreceived_time)。

练习

  1. [分析题]:解释为什么性能计时应使用 steady_clock 而不是 system_clock。如果系统时间被 NTP 向前调整 1 秒,两种时钟测出的耗时分别是什么?
  2. [代码题]:编写一个 ScopedTimer 类,支持在析构时将耗时累加到一个统计对象中(而不是打印到 cout)。统计对象记录调用次数、总耗时、最大耗时。
  3. [跨章综合题]:原子操作与内存模型 将介绍 std::atomic 计数器。如果统计对象的 total_uscall_count 被多个线程同时写入,需要什么同步机制?用 mutex 还是 atomic?

17.10 asyncfuturepromise:任务级异步 ⭐⭐⭐⭐

线程级并发 vs 任务级异步:两种并发抽象的本质差异

在进入 std::asyncstd::future 的具体 API 之前,需要理解一个更高层次的概念区分:**线程级并发**和**任务级异步**是两种根本不同的并发抽象,适用于不同的问题结构。

线程级并发的心智模型。 使用 std::thread 时,程序员直接管理操作系统线程——创建线程、决定线程执行什么代码、管理线程的生命周期(join/detach)、处理线程之间的同步(mutex、condition_variable)。这种抽象层次很低——程序员需要自己处理所有细节,包括线程数量、线程与 CPU 核心的映射、线程的停止协议等。线程级并发的优势是完全的控制权;代价是大量的样板代码和容易出错的同步逻辑。

任务级异步的心智模型。 使用 std::asyncstd::future 时,程序员不直接创建和管理线程——而是提交一个”任务”(可调用对象),由运行时决定如何执行它(可能创建新线程、可能在线程池中执行、可能延迟到 get() 时同步执行)。程序员只关心”任务何时有结果”和”结果是什么”,不关心”哪个线程在执行这个任务”。这种抽象层次更高——减少了样板代码,但也减少了控制权。

选择框架。 两种抽象适合不同的任务结构:

特征 线程级(std::thread 任务级(std::async/future
生命周期 长期运行的循环 一次性计算并返回结果
状态管理 线程维护自己的状态 无状态,输入 -> 计算 -> 输出
通信方式 共享内存 + mutex/atomic future/promise 传递结果
数量 通常少量(3-5 个) 可以大量(每个任务一个 future)
停止机制 需要显式停止协议 取消支持有限(标准不保证)
异常传播 手动通过 promise 或队列 future::get() 自动重新抛出

SLAM 系统中,Tracking/Mapping/LoopClosing 是长期循环,适合线程级管理。后台保存地图、一次性优化、批量描述子计算是一次性任务,适合任务级异步。不要把所有并发都用线程管理——任务级异步能显著简化代码。

工程问题:不是所有并发都需要手动管理线程

std::thread 适合长期存在的后台循环,例如 Mapping、LoopClosing、传感器处理线程。 但有些任务只是”一次性计算并返回结果”:

  • 后台保存地图。
  • 计算一批描述子。
  • 对一个子图运行全局优化。
  • 加载词袋或配置。
  • 执行一次耗时统计汇总。

这类任务不一定需要手动创建线程对象、设计循环和停止协议。 std::asyncstd::futurestd::promise 提供任务级异步抽象。

future 表示未来某个结果。 get() 阻塞等待结果并取得值。 如果任务中抛出异常,异常会在 get() 时重新抛出。 这比 std::thread 入口异常直接终止程序更适合一次性任务。

反面失败:忽略 future 让异步变同步

std::async 有一个容易误踩的点:如果使用 std::launch::async 启动任务,但返回的临时 future 立即析构,析构可能等待任务完成。 这样看似异步,实际在语句结束时阻塞。

#include <chrono>
#include <future>
#include <iostream>
#include <thread>

void saveMap() {
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(100ms);
    std::cout << "map saved\n";
}

int main() {
    std::async(std::launch::async, saveMap);
    std::cout << "this may print after saveMap\n";
}

很多编译器会警告忽略了 std::async 的返回值。 正确做法是保存 future,在明确的边界 wait()get()

auto save_future = std::async(std::launch::async, saveMap);
std::cout << "tracking can continue\n";
save_future.wait();

另一个边界是 launch policy。 如果不写 std::launch::async,实现可以选择延迟执行。 延迟任务会在 get()wait() 时才运行,不一定创建新线程。 需要真正后台执行时,应显式写 std::launch::async

抽象不变量:future 是一次性结果通道

std::future<T> 的不变量是:

它连接一个异步共享状态,最多成功 get() 一次。

get() 会取走结果。 之后这个 future 不再持有有效结果。 如果多个消费者都需要观察同一个结果,可以使用 std::shared_future<T>

std::promise<T> 是写入端。 它常用于手动创建线程时把结果或异常交给外部:

  1. 创建 promise
  2. promise 取得 future
  3. promise 移动进线程入口。
  4. 线程调用 set_value()set_exception()
  5. 外部通过 future.get() 取得结果或异常。

规则推导:选择 thread、async、promise 的方式

可以用下面的规则做第一判断:

需求 首选工具
长期循环、需要停止协议 std::jthread 或 RAII 管理的 std::thread
一次性计算并返回值 std::async + std::future
手动线程要传回结果或异常 std::promise + std::future
多生产者多消费者消息流 mutex + condition_variable 队列
高频共享状态标志 std::atomic 或锁保护状态

std::async 不等于线程池。 标准库不保证它复用线程。 如果每帧都 std::async(std::launch::async, ...) 创建大量小任务,开销可能高于收益。 对于高频小任务,应考虑固定线程池、任务队列或算法内部并行库。

工程边界:future 析构阻塞和共享状态生命周期

来自 std::async(std::launch::async, ...) 的最后一个 future 析构时可能阻塞等待任务结束。 这会在两个地方制造隐蔽延迟:

  1. 局部变量离开作用域。
  2. 容器清空或析构一组 future。

因此任务异步不是“发出去就不管”。 如果任务可能长时间运行,要明确设计等待边界和取消边界。 C++ 标准库的 future 本身没有取消接口。 需要取消时,应把 std::stop_token、原子标志或关闭队列作为任务参数。

代码验证:异步优化结果和异常传播

下面代码展示后台任务返回结果,以及异常在 get() 时传播。

#include <chrono>
#include <future>
#include <iostream>
#include <stdexcept>
#include <thread>

struct OptimizeResult {
    int iterations = 0;
    double final_error = 0.0;
};

OptimizeResult runGraphOptimization(bool fail) {
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(30ms);

    if (fail) {
        throw std::runtime_error("linear solver failed");
    }

    return OptimizeResult{5, 0.012};
}

int main() {
    auto ok_future = std::async(
        std::launch::async,
        runGraphOptimization,
        false);

    auto bad_future = std::async(
        std::launch::async,
        runGraphOptimization,
        true);

    const OptimizeResult ok = ok_future.get();
    std::cout << "iterations = " << ok.iterations
              << ", error = " << ok.final_error << '\n';

    try {
        const OptimizeResult bad = bad_future.get();
        std::cout << bad.final_error << '\n';
    } catch (const std::exception& e) {
        std::cout << "optimization error: " << e.what() << '\n';
    }
}

编译命令示例:

g++ -std=c++17 -O2 -pthread async_future.cpp && ./a.out

在 SLAM 后端中,future 适合表达”这次优化最终会给出一个结果”。 它不适合表达持续不断的关键帧流。

⚠️ 编程陷阱:忽略 std::async 的返回值导致同步执行 错误做法std::async(std::launch::async, saveMap); 不保存返回的 future现象:临时 future 在语句结束时析构,析构可能等待任务完成。看似异步,实际阻塞。 根本原因:来自 std::async(std::launch::async, ...) 的最后一个 future 析构时会阻塞等待任务结束。 正确做法:保存 future 到变量,在明确的边界 wait()get()

💡 概念误区:认为 std::async 是线程池 新手想法:”std::async 每次调用都高效地复用线程。” 实际上:标准库不保证 std::async 复用线程。每次调用可能创建新线程。如果高频调用(每帧、每点),线程创建销毁的开销可能超过计算本身。 正确理解std::async 适合低频、一次性、返回结果的任务。高频小任务应使用固定线程池或算法内部并行库。

练习

  1. [分析题]:对比 std::thread + std::promisestd::async + std::future 两种方式传回结果。各自的优劣是什么?
  2. [代码题]:用 std::async 实现后台保存地图功能。主线程在 Tracking 继续运行的同时等待保存完成。处理保存成功和保存异常两种情况。
  3. [跨章综合题]:C++语言核心/错误处理与异常安全 讲了异常安全和 std::exception_ptrstd::future::get() 会重新抛出任务中的异常——解释这个机制如何与 C++语言核心/错误处理与异常安全 的异常传播模型配合,以及为什么线程入口函数不能直接抛异常给主线程。

17.11 volatile 不是线程同步 ⭐⭐⭐⭐⭐

核心结论与定位

很多开发者从 Java、C# 或嵌入式 C 经验出发,会把线程退出标志写成 volatile bool running = true;,再由一个线程写、另一个线程读。在 C++ 中这不是线程同步——volatile 不提供原子性,不建立 happens-before,不保证跨线程可见性,因此仍然是 data race。它只表达“每次访问都应作为可观察副作用”,典型用途是内存映射 I/O,而不是 mutex 或 atomic。

// 反例:volatile 退出标志在 C++ 中仍然 data race
volatile bool stop = false;
void loop() { while (!stop) { /* work */ } }   // 另一线程写 stop = true

线程退出标志应改用 std::atomic<bool>、mutex 保护状态、条件变量或 std::stop_token

#include <atomic>
std::atomic<bool> stop_requested{false};   // 只表达布尔状态时 relaxed 通常足够

📌 本节只做边界提醒,完整论证见 18.8volatileatomic 的语义对比、为什么在 x86 上"似乎能工作"而在 ARM/RISC-V 上失效、Java/C# 的 volatile 为什么不同、以及"不要用 volatile 修复优化级别导致的并发问题"等内容,统一在「原子操作与内存模型」的 18.8 volatile 不是线程同步 中展开。本节只补充一个 17 章语境下的独有边界:信号处理。

工程边界:信号处理中的 volatile sig_atomic_t

C++ 和 C 的信号处理有特殊边界。 异步信号处理函数能安全做的事很少。 常见写法是设置一个 volatile std::sig_atomic_t 标志,让主循环稍后观察。

#include <csignal>
#include <iostream>

volatile std::sig_atomic_t interrupted = 0;

extern "C" void handleSignal(int) {
    interrupted = 1;
}

int main() {
    std::signal(SIGINT, handleSignal);

    while (!interrupted) {
        // main loop
    }

    std::cout << "interrupted\n";
}

这不是普通线程同步模式。 它服务于信号处理的极窄边界。 如果是两个 C++ 线程之间通信,应换成 atomic、锁或停止 token。 (用 std::atomic<bool> 表达退出标志的完整可运行示例见 18.8。)

🧠 思维陷阱:把 Java/C# 的 volatile 语义搬到 C++ 新手想法:"Java 里 volatile 能保证线程可见性,C++ 里应该也一样。" 实际上:Java 和 C# 的 volatile 有内存屏障和可见性语义;C++ 的 volatile 没有。跨语言迁移时,不能把写法平移。C++ 线程通信应使用 atomic、mutex 或条件变量。 正确思维:看到 volatile 用在线程通信场景中,应视为潜在 bug,用 ThreadSanitizer 验证。

练习

  1. [分析题]:解释 C++ volatile 和 Java volatile 的语义差异。为什么 C++ 选择不给 volatile 加线程同步语义?
  2. [代码题]:编写一个使用 volatile bool stop 的程序(反例),用 ThreadSanitizer 检测 data race。然后改为 std::atomic<bool>,验证 data race 消失。
  3. [跨章综合题]:C++语言核心/预处理器与宏 的条件编译宏和本节的 volatile 都是容易被误用的 C/C++ 特性。总结"看起来能用但语义错误"的共同特征:编译能过、在某些平台看似正常、但标准层面是错误的。

17.12 并发调试与性能分析工具 ⭐⭐⭐⭐

为什么需要专用调试工具

并发 bug 和单线程 bug 有本质区别。单线程 bug 通常是确定性的——给定相同输入,程序每次都以相同方式失败,可以用 printf 或断点逐步定位。并发 bug 依赖线程调度时序、CPU 缓存状态和系统负载,同一段代码在开发机上运行一万次都正常,在 Jetson ARM 板上或 CI 的高负载环境中却偶发崩溃。更糟的是,加入调试用的 printf 或断点可能改变线程调度时序,让 bug 消失——这就是 Heisenbug。

这意味着"跑了很多次没出问题"不能作为正确性论据。需要用专门的编译时插桩工具来系统性地检测并发问题。

本质洞察:并发调试工具的价值不是"找到你已经知道的 bug",而是"发现你不知道存在的 bug"。ThreadSanitizer 能检测到大多数 data race,即使它们在你的测试中从未表现出可观察的错误行为。一个在 x86 上"碰巧正确"的程序,可能在 ARM 上立刻崩溃——TSan 能在 x86 上就发现这个问题。

ThreadSanitizer(TSan):data race 检测的标准工具 ⭐⭐⭐⭐

ThreadSanitizer 是 Clang 和 GCC 内置的动态分析工具。它在编译时对每一次内存访问插桩,运行时跟踪所有线程的访问历史,如果发现两个线程无同步地访问同一内存位置(至少一个是写),就报告 data race。

使用方式:

# 编译时加 -fsanitize=thread
g++ -std=c++20 -O1 -g -fsanitize=thread my_code.cpp -o my_code

# 直接运行,TSan 会在终端输出 data race 报告
./my_code

TSan 的报告包含两个访问的调用栈——一个写操作和一个冲突的读/写操作。报告还会指出创建这两个线程的位置。工程上应把 TSan 测试纳入 CI 流水线,作为常规检查项。

TSan 有几个工程边界需要注意。首先,它会使程序变慢 5-15 倍,内存占用增加 5-10 倍——这意味着测试用例需要调整输入规模和超时时间。其次,TSan 只能检测到实际执行路径上的 data race——如果某个分支没有被测试覆盖,TSan 不会分析它。第三,TSan 不检测逻辑竞态(race condition)——消除了 data race 不代表程序逻辑正确。

如果 TSan 在你的并发代码中报告了 0 个 data race,这是一个好消息。但如果你的测试覆盖率低,这只说明"被测试的路径"没有 data race。增加测试覆盖率和压力测试(多次运行、不同线程数、不同调度时序)能显著提高 TSan 的检测效果。

Valgrind Helgrind/DRD:备用 data race 检测器 ⭐⭐⭐

Helgrind 和 DRD 是 Valgrind 套件中的两个并发错误检测工具。与 TSan 的编译时插桩不同,它们在运行时通过二进制翻译(binary translation)拦截内存访问。优势是不需要重新编译(直接 valgrind --tool=helgrind ./my_code),劣势是速度更慢(20-100 倍)且误报率更高。

在 TSan 可用的平台上,优先使用 TSan。Helgrind/DRD 适合无法重新编译的场景(如分析第三方库)或交叉验证 TSan 的结果。

perf 与火焰图:定位锁竞争和调度延迟 ⭐⭐⭐⭐

perf 是 Linux 的系统性能分析工具,能直接从硬件性能计数器读取数据,几乎零开销。在并发场景中,perf 特别适合定位三类问题:

  1. 锁竞争perf lock record && perf lock report 能显示每把锁的等待时间和争用次数。如果 map_mutex 的等待时间占比超过 10%,说明锁粒度需要优化。
  2. CPU 利用率perf stat -e task-clock,context-switches,cpu-migrations ./my_code 能显示上下文切换和 CPU 迁移次数。高频上下文切换通常说明线程过多或锁竞争激烈。
  3. 热点函数perf record -g ./my_code && perf report 或生成火焰图(FlameGraph),能直观看到 CPU 时间花在哪些函数和调用路径上。

火焰图在分析并发系统时尤其有价值——它能同时展示所有线程的 CPU 时间分布,让你一眼看出哪个线程在空转、哪个线程被锁阻塞。

# 录制 CPU 采样数据(采样频率 999Hz,记录调用栈)
perf record -F 999 -g ./mini_slam_pipeline

# 生成火焰图(需要 FlameGraph 工具)
perf script | stackcollapse-perf.pl | flamegraph.pl > flamegraph.svg

GDB 并发调试技巧 ⭐⭐⭐

GDB 对多线程程序有基本支持,但调试并发问题时需要特殊技巧。

# 查看所有线程的回溯(死锁排查的第一步)
gdb -batch -ex "thread apply all bt" ./my_code core

# 在 GDB 中手动切换线程
(gdb) info threads
(gdb) thread 3
(gdb) bt

死锁排查时,thread apply all bt 是最有价值的命令——它能显示每个线程卡在哪个系统调用上。如果两个线程都卡在 __lll_lock_wait(Linux 的 futex 等待),并且各自在等对方持有的 mutex,就确认了死锁。

⚠️ 编程陷阱:在高优化级别下调试并发问题 错误做法:用 -O2-O3 编译后直接用 GDB 调试并发 bug。 现象:变量被优化掉,断点命中位置与源码不符,单步执行跳来跳去。 根本原因:编译器优化会内联函数、消除变量、重排代码。这些优化在单线程中不改变行为,但在调试器中观察时会产生困惑。 正确做法:用 -O0 -g-Og -g 编译调试版本。TSan 建议使用 -O1 -g(平衡检测能力和优化干扰)。

练习

  1. [分析题]:列出 ThreadSanitizer、Helgrind、perf 和 GDB 各自适合检测的并发问题类型。画一个"问题→工具"的映射表。
  2. [代码题]:编写一个包含 data race 的简单程序,分别用 TSan 和 Helgrind 检测。对比两个工具的报告格式和信息详细程度。
  3. [跨章综合题]:C++语言核心/预处理器与宏 用 -Enm -C 验证宏和链接层面的正确性。本节用 TSan 验证并发正确性,用 perf 定位性能瓶颈。整理本课程中所有出现过的"编译/链接/运行时验证工具",形成一个按问题域分类的工具箱。

17.13 累积项目:Mini SLAM Concurrent Pipeline ⭐⭐⭐⭐⭐

项目目标

本章项目实现一个简化的并发 SLAM 流水线。 它不做真实图像处理、点云配准或图优化,而是把并发结构做完整:

  1. MessageQueue<T>:线程安全阻塞队列,支持 push()pop()close()
  2. Tracking:按固定周期产生关键帧,推给 Mapping。
  3. Mapping:消费关键帧,模拟局部建图,向 LoopClosing 提交候选关键帧。
  4. LoopClosing:消费候选关键帧,模拟回环检测。
  5. 停止协议:Tracking 结束后关闭 Mapping 队列,Mapping 结束后关闭 LoopClosing 队列。
  6. 计时:使用 steady_clock 统计模块耗时。
  7. 测试目标:验证消息数量、关闭顺序、无忙等、无 joinable 线程泄漏。

项目结构建议:

mini_slam_concurrent/
  message_queue.hpp
  scoped_timer.hpp
  pipeline.hpp
  main.cpp
  tests/
    message_queue_test.cpp
    pipeline_test.cpp

本章只给出一个单文件参考实现,方便直接编译运行。 真正放入课程项目时,可以按上面的结构拆分。

MessageQueue<T>:队列关闭是协议的一部分

队列接口选择 std::optional<T> pop()。 返回 std::nullopt 表示队列已经关闭且没有剩余数据。 这样消费循环可以自然写成:

while (auto item = queue.pop()) {
    consume(*item);
}

这比使用特殊哨兵消息更通用。 如果 T 本身也可能表示“停止”,协议会混乱。 关闭状态属于队列,不属于业务消息。

TrackingMappingLoopClosing 的数据流

数据流如下:

Tracking
  产生 Keyframe
  push 到 mapping_queue

Mapping
  pop Keyframe
  更新局部地图计数
  每隔若干关键帧 push LoopCandidate
  mapping_queue 关闭且耗尽后,关闭 loop_queue

LoopClosing
  pop LoopCandidate
  模拟检测
  loop_queue 关闭且耗尽后退出

这里 Tracking 没有直接调用 Mapping。 Mapping 也没有直接调用 LoopClosing。 模块之间只通过队列交换消息。 这使得每个线程的生命周期和背压点更清楚。

停止协议

停止协议按数据流方向传播:

  1. Tracking 生产完关键帧,调用 mapping_queue.close()
  2. Mapping 消费完所有关键帧,调用 loop_queue.close()
  3. LoopClosing 消费完所有候选,退出。
  4. 主线程 join 所有线程。

如果要支持外部提前停止,可以让队列也支持 close() 后拒绝新消息。 生产者捕获异常或检查返回值后退出。 C++20 版本还可以把 std::stop_token 传给各模块,在循环中同时检查停止请求。

单文件参考实现

下面代码可用 C++17 编译。 它包含队列、计时器、三阶段流水线和简单统计。

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>

template <typename T>
class MessageQueue {
public:
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(m_);
            if (closed_) {
                throw std::runtime_error("push to closed queue");
            }
            q_.push(std::move(value));
        }
        cv_.notify_one();
    }

    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(m_);
        cv_.wait(lock, [this] {
            return closed_ || !q_.empty();
        });

        if (q_.empty()) {
            return std::nullopt;
        }

        T value = std::move(q_.front());
        q_.pop();
        return value;
    }

    void close() {
        {
            std::lock_guard<std::mutex> lock(m_);
            closed_ = true;
        }
        cv_.notify_all();
    }

    std::size_t size() const {
        std::lock_guard<std::mutex> lock(m_);
        return q_.size();
    }

private:
    mutable std::mutex m_;
    std::condition_variable cv_;
    std::queue<T> q_;
    bool closed_ = false;
};

class ScopedTimer {
public:
    explicit ScopedTimer(std::string name)
        : name_(std::move(name)),
          start_(Clock::now()) {}

    ~ScopedTimer() {
        const auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(
            Clock::now() - start_);
        std::lock_guard<std::mutex> lock(outputMutex());
        std::cout << name_ << " took " << elapsed.count() << " us\n";
    }

private:
    using Clock = std::chrono::steady_clock;

    static std::mutex& outputMutex() {
        static std::mutex m;
        return m;
    }

    std::string name_;
    Clock::time_point start_;
};

struct Keyframe {
    int id = 0;
};

struct LoopCandidate {
    int keyframe_id = 0;
};

struct PipelineStats {
    int tracked = 0;
    int mapped = 0;
    int loop_checked = 0;
};

class JoinOnExit {
public:
    explicit JoinOnExit(std::thread& thread) : thread_(thread) {}

    ~JoinOnExit() {
        if (thread_.joinable()) {
            thread_.join();
        }
    }

    JoinOnExit(const JoinOnExit&) = delete;
    JoinOnExit& operator=(const JoinOnExit&) = delete;

private:
    std::thread& thread_;
};

class MiniSlamPipeline {
public:
    explicit MiniSlamPipeline(int keyframe_count)
        : keyframe_count_(keyframe_count) {}

    void run() {
        std::thread tracking_thread(&MiniSlamPipeline::trackingLoop, this);
        JoinOnExit join_tracking(tracking_thread);
        std::thread mapping_thread(&MiniSlamPipeline::mappingLoop, this);
        JoinOnExit join_mapping(mapping_thread);
        std::thread loop_thread(&MiniSlamPipeline::loopClosingLoop, this);
        JoinOnExit join_loop(loop_thread);
    }

    PipelineStats stats() const {
        std::lock_guard<std::mutex> lock(stats_mutex_);
        return stats_;
    }

private:
    void trackingLoop() {
        using namespace std::chrono_literals;
        ScopedTimer timer("Tracking");

        for (int i = 0; i < keyframe_count_; ++i) {
            std::this_thread::sleep_for(5ms);
            mapping_queue_.push(Keyframe{i});

            {
                std::lock_guard<std::mutex> lock(stats_mutex_);
                stats_.tracked++;
            }
        }

        mapping_queue_.close();
    }

    void mappingLoop() {
        using namespace std::chrono_literals;
        ScopedTimer timer("Mapping");

        while (auto keyframe = mapping_queue_.pop()) {
            std::this_thread::sleep_for(8ms);

            {
                std::lock_guard<std::mutex> lock(stats_mutex_);
                stats_.mapped++;
            }

            if (keyframe->id % 3 == 0) {
                loop_queue_.push(LoopCandidate{keyframe->id});
            }
        }

        loop_queue_.close();
    }

    void loopClosingLoop() {
        using namespace std::chrono_literals;
        ScopedTimer timer("LoopClosing");

        while (auto candidate = loop_queue_.pop()) {
            std::this_thread::sleep_for(12ms);

            {
                std::lock_guard<std::mutex> lock(stats_mutex_);
                stats_.loop_checked++;
            }

            std::lock_guard<std::mutex> lock(output_mutex_);
            std::cout << "checked loop candidate "
                      << candidate->keyframe_id << '\n';
        }
    }

    int keyframe_count_ = 0;
    MessageQueue<Keyframe> mapping_queue_;
    MessageQueue<LoopCandidate> loop_queue_;

    mutable std::mutex stats_mutex_;
    PipelineStats stats_;

    std::mutex output_mutex_;
};

int main() {
    MiniSlamPipeline pipeline(10);
    pipeline.run();

    const PipelineStats stats = pipeline.stats();
    std::cout << "tracked = " << stats.tracked << '\n';
    std::cout << "mapped = " << stats.mapped << '\n';
    std::cout << "loop checked = " << stats.loop_checked << '\n';

    if (stats.tracked != 10 || stats.mapped != 10 || stats.loop_checked != 4) {
        return 1;
    }

    return 0;
}

编译命令示例:

g++ -std=c++17 -O2 -pthread mini_slam_pipeline.cpp && ./a.out

测试目标

项目测试不应只看“能打印日志”。 至少要覆盖这些行为:

  1. 空队列 pop() 会阻塞,close() 后返回 std::nullopt
  2. push() 后消费者能收到消息。
  3. close() 后继续 push() 会失败。
  4. Tracking 产生的关键帧数量等于 Mapping 消费数量。
  5. Mapping 关闭后 LoopClosing 能自然退出。
  6. 所有线程都被 join,没有 joinable 线程对象进入析构。
  7. 统计值在锁保护下读写。
  8. 计时使用 steady_clock

如果测试框架支持超时,应给并发测试设置短超时。 并发测试一旦卡住,超时比无限等待更容易定位。

工程边界

这个 Mini Pipeline 仍然简化了真实系统:

  • 没有容量限制,因此没有背压。
  • 没有外部提前停止请求。
  • 没有异常聚合和错误上报。
  • 没有地图对象和读写锁。
  • 没有传感器时间同步。

这些不是缺陷,而是分层。 本项目先验证线程生命周期、队列关闭和模块解耦。 下一步再加入容量、地图快照、传感器缓冲和错误传播。


本章小结

本章的主线不是“记住几个并发 API”,而是建立机器人系统里的并发不变量。

std::thread 让一个入口函数在新的执行流里运行,但线程对象本身仍是 C++ 对象。 joinable 对象析构会终止程序,线程入口异常不能逃出。 因此线程生命周期必须由 RAII 管理。 C++17 可以写线程守卫,C++20 优先考虑 std::jthreadstd::stop_token

mutex 保护的是共享数据不变量。 data race 是 C++ 语义层的未定义行为,不是偶发小错误。 lock_guard 适合短临界区,unique_lock 适合条件变量和灵活锁管理,scoped_lock 适合一次获取多把锁。 锁粒度要服务不变量:锁太粗会拖慢实时路径,锁太细会拆碎一致性。

条件变量表达等待-通知。 通知只是提示状态可能变化,谓词才是真相。 wait(lock, pred) 同时处理虚假唤醒和丢失通知边界。 生产者消费者队列必须把关闭状态纳入协议。

deadlock 的四个必要条件是互斥、持有并等待、不可剥夺、循环等待。 工程上主要通过锁排序、std::scoped_lock、缩短临界区和减少多锁访问来破坏循环等待。

shared_mutex 适合读多写少地图查询,但它只保护读锁持有期间的观察窗口。 读锁内返回内部引用仍然危险。 读写锁也有饥饿和公平性边界,不能把重计算放进读锁。

传感器缓冲常用 deque + mutex。 回调短临界区入队,处理线程按时间窗复制或移动数据,旧数据按 margin 剪裁。 时间同步失败必须显式表达,不能无声外推。

std::chrono 让时间单位和时钟类型进入类型系统。 性能计时使用 steady_clock。 系统时间、传感器时间、ROS time 和程序耗时应在命名和类型上区分。

std::asyncfuturepromise 适合一次性任务结果和异常传播。 它们不是线程池,也不适合高频消息流。 忽略 future 可能让异步任务在临时对象析构时阻塞。

C++ volatile 不是线程同步工具。 线程通信应使用 atomic、mutex、条件变量、停止 token 或 future/promise。 跨语言经验不能直接迁移到 C++。

回顾本章的核心脉络:从 17.1 的"为什么机器人系统选择多线程"出发,建立了"延迟隔离"和"所有权边界"的设计动机。17.2-17.3 把线程对象和 RAII 生命周期管理讲清楚。17.4-17.5 从 data race 和条件变量建立了互斥同步的基础——mutex 保护不变量,条件变量表达等待-通知。17.6 的死锁和 17.7 的读写锁处理了多锁和读多写少的工程场景。17.8-17.9 把传感器缓冲和时间管理纳入并发设计。17.10-17.11 补充了任务级异步和 volatile 的边界。每一节不是孤立的 API 教学,而是围绕"如何在多线程环境下维护机器人系统的数据不变量"这一核心问题展开的不同侧面。

如果只能从本章带走一个判断框架,应该是这样的:面对一个并发设计问题时,先问五个问题——实时路径是什么?哪些任务可以滞后?哪些对象跨线程共享?每把锁保护什么不变量?停止时如何退出?这五个问题回答清楚后,线程数量、锁粒度、队列设计和停止协议自然就确定了。不要从"用哪个 API"出发设计,而要从"保护什么不变量"出发。


延伸阅读

  • Anthony Williams, C++ Concurrency in Action, 2nd Edition:C++ 并发编程的系统教材,覆盖线程、锁、条件变量、atomic 和内存模型。⭐⭐
  • cppreference:std::threadstd::jthreadstd::mutexstd::condition_variablestd::shared_mutexstd::futurestd::chrono 页面。⭐⭐
  • Scott Meyers, Effective Modern C++:并发 API、std::asyncstd::threadstd::future 相关条款(Item 35-40)。⭐⭐
  • Herb Sutter, atomic<> Weapons(CppCon 2012):理解 atomic 和内存序的经典材料,是 原子操作与内存模型 的预备读物。⭐⭐⭐
  • ROS2 文档:Executors、Callback Groups、Timers、Parameters 和 message timestamp 的使用边界。⭐⭐⭐
  • ORB-SLAM3 源码(UZ-SLAMLab/ORB_SLAM3):Tracking、LocalMapping、LoopClosing 的线程协作和多 mutex 结构。⭐⭐⭐
  • LIO-SAM 源码(TixiaoShan/LIO-SAM):IMU、LiDAR、Odom 回调中的 deque 缓冲和时间窗处理。⭐⭐⭐
  • chenshuo muduo:工业 C++ 网络库中的线程、锁、条件变量和事件循环封装,适合学习生产级并发代码风格。⭐⭐⭐
  • ThreadSanitizer 文档(clang.llvm.org/docs/ThreadSanitizer.html):data race 检测工具的使用指南和局限性。⭐⭐
  • Brendan Gregg, Systems Performance, 2nd Edition:性能分析方法论,含 perf、火焰图和 eBPF 工具链的系统讲解。⭐⭐⭐

🔧 故障排查手册

症状 可能原因 排查步骤 相关章节
程序退出时直接终止 joinable 的 std::thread 进入析构 1. 检查异常路径是否跳过 join() 2. 检查类析构函数是否收束线程 3. 用 ThreadGuard 或 jthread 修复 17.2-17.3
程序偶尔卡死 死锁或条件变量谓词错误 1. gdb -batch -ex “thread apply all bt” 查看线程状态 2. 检查多锁路径是否有统一顺序 3. 检查是否持锁调用外部回调 17.6
CPU 占用高但没处理数据 队列为空时忙等 1. 检查消费循环是否用 condition_variable 2. 确认不是 continue 自旋 3. 用 perf top 确认热点函数 17.5
消费线程无法退出 队列没有”关闭”状态 1. 检查 close() 是否调用 notify_all() 2. 检查谓词是否包含 closed_ 17.5
偶发崩溃在容器内部 容器并发访问或迭代器失效 1. 用 TSan 检测 data race 2. 检查锁内是否返回引用/迭代器给锁外使用 17.4
Tracking 延迟周期性升高 实时路径被后台任务持锁阻塞 1. 统计每阶段耗时和队列长度 2. 检查锁内是否做重计算 3. 用 perf 火焰图定位持锁热点 17.4, 17.7
条件变量偶尔漏消息 通知被当作消息本身 1. 确认使用谓词版本 wait(lock, pred) 2. 确认谓词检查 closed_ \|\| !q_.empty() 17.5
std::async 没有并行效果 未保存 future 或未指定 launch::async 1. 确认 future 被保存到变量 2. 显式指定 std::launch::async 17.10
volatile 后仍行为异常 volatile 不是同步工具 1. 搜索线程通信中的 volatile 2. 替换为 atomic 或 mutex 17.11
读写锁没有提升性能 读临界区太长或写操作频繁 1. 用 perf 测量读写锁开销 2. 比较与普通 mutex 的吞吐 17.7
传感器同步不稳定 时间窗不完整或传感器/接收时间混用 1. 检查缓冲区是否按时间戳有序 2. 检查 LiDAR 时间窗是否被 IMU 覆盖 3. 检查剪裁是否保留插值 margin 17.8