线程管理与互斥同步¶
难度:⭐⭐⭐~⭐⭐⭐⭐ | 建议用时:2 周 | 前置要求:C++语言核心/类型系统与值类别推导-C++语言核心/Lambda与STL算法 的 C++ 对象模型、RAII、异常、STL 容器,C++语言核心/Eigen基础与SLAM数学预备-C++语言核心/Concepts与Policy 的机器人数据结构和泛型接口基础
前置自测¶
答不出两题以上,建议先复习对象生命周期、引用传参、异常传播、容器迭代器失效和 RAII。 并发编程的难点不在 API 名字,而在多个控制流同时观察和修改同一批对象时,原来顺序程序里隐含成立的假设会消失。
std::thread t(f);创建的是线程对象还是操作系统线程?t析构时如果仍然 joinable 会发生什么?- 为什么
std::thread t(process, msg);默认会拷贝参数?如果要传引用,为什么必须显式使用std::ref? - 两个线程同时读写同一个普通
int,即使机器上看起来只是“偶尔错一次”,在 C++ 语义里为什么已经是未定义行为? std::lock_guard和std::unique_lock都能管理 mutex,它们在所有权、延迟加锁、条件变量配合上有什么差异?condition_variable::wait(lock, pred)为什么要写谓词?虚假唤醒和丢失通知分别是什么问题?- 如果 Tracking 线程持有关键帧队列锁后再申请地图锁,而 Mapping 线程持有地图锁后再申请关键帧队列锁,会触发哪类并发故障?
std::shared_mutex允许多个读者并发,为什么它并不等于“地图访问可以随便读”?volatile bool running能不能作为线程退出标志?如果不能,应该考虑std::atomic<bool>、std::stop_token还是condition_variable?
本章目标¶
学完本章,你将能够:
- 解释 SLAM 和机器人系统为什么常采用多线程流水线,而不是把所有计算塞进单线程循环或粗暴拆成多个进程。
- 从对象模型角度理解
std::thread:线程入口、参数传递、join/detach、移动语义、析构时std::terminate、异常不能逃出入口函数。 - 用 RAII 管理线程生命周期:线程守卫、
std::jthread、std::stop_token、协作式停止协议。 - 识别 data race,划定 critical section,并用
std::lock_guard、std::unique_lock、std::scoped_lock选择合适的锁管理方式。 - 正确使用
std::condition_variable:等待-通知模型、谓词、虚假唤醒、丢失通知、生产者消费者队列。 - 解释 deadlock 的四个必要条件,并用锁排序、
std::lock、std::scoped_lock避免多锁循环等待。 - 在读多写少地图查询中使用
std::shared_mutex,同时理解读写锁的饥饿、公平性和临界区边界。 - 构造
deque + mutex的传感器缓冲范式,处理 IMU/LiDAR/Odom 时间同步和时间窗剪裁。 - 使用
std::chrono表达时间点、持续时间、性能计时和 ROS2 timer 的基本语义。 - 区分线程级并发和任务级异步,掌握
std::async、std::future、std::promise的返回值、阻塞和异常传播边界。 - 说明 C++
volatile为什么不是线程同步工具,并区分它和 Java/C#volatile的语义差异。 - 完成 Mini SLAM Concurrent Pipeline:
MessageQueue<T>、Tracking/Mapping/LoopClosing、停止协议、计时和测试目标。
知识树:
线程管理与互斥同步
├── 为什么机器人系统选择多线程 ⭐⭐⭐⭐
│ ├── 延迟隔离与所有权边界
│ ├── 单线程 / 多线程 / 多进程的边界
│ └── 共享可变状态是根本困难
├── std::thread 对象模型 ⭐⭐⭐⭐
│ ├── 线程对象 vs 执行线程
│ ├── joinable 不变量
│ ├── 参数传递与 std::ref
│ └── 异常不能逃出入口
├── RAII 线程管理 ⭐⭐⭐⭐⭐
│ ├── ThreadGuard(C++17)
│ ├── std::jthread 与 stop_token(C++20)
│ ├── 协作式停止三范式
│ └── 队列关闭协议
├── mutex 基础 ⭐⭐⭐⭐⭐
│ ├── data race = 未定义行为
│ ├── 不变量保护而非数据保护
│ ├── lock_guard / unique_lock / scoped_lock
│ └── 锁粒度权衡
├── condition_variable ⭐⭐⭐⭐⭐
│ ├── 等待-通知模型
│ ├── 谓词 / 虚假唤醒 / 丢失通知
│ └── MessageQueue<T> 实现
├── Deadlock ⭐⭐⭐⭐⭐
│ ├── 四个必要条件(Coffman 1971)
│ ├── 锁排序与 scoped_lock
│ └── 持锁不调用外部回调
├── shared_mutex ⭐⭐⭐⭐
│ ├── 读写锁语义与公平性
│ └── 读锁内返回引用的危险
├── 传感器缓冲 ⭐⭐⭐⭐⭐
│ ├── deque + mutex 范式
│ └── 时间窗与旧数据剪裁
├── 时间管理 ⭐⭐⭐⭐
│ └── std::chrono 与 ROS2 time
├── 异步任务 ⭐⭐⭐
│ └── async / future / promise
├── volatile 不是同步 ⭐⭐⭐
└── 并发调试工具 ⭐⭐⭐⭐
├── ThreadSanitizer
├── Helgrind / DRD
└── perf 与火焰图
本章在课程中的位置:前面的章节把单个对象、单条控制流、单个算法内核讲清楚。
回顾 C++语言核心/RAII与智能指针:RAII 保证"获取即初始化,析构即释放"——这在单线程中管理内存、文件和锁非常自然。本章的挑战是:当多个线程同时触碰同一批对象时,RAII 的"析构即释放"必须配合停止协议和队列关闭才能安全工作。析构函数不能在其他线程还在访问对象时就释放资源。回顾 C++语言核心/移动语义与完美转发:移动语义让 std::thread 只能移动不能复制,这与 std::unique_ptr 的设计一脉相承——线程句柄是系统资源,复制没有清晰语义。
从本章开始,同一批机器人状态会被多个控制流同时触碰。 Tracking 想要低延迟,Mapping 想要持续优化局部地图,LoopClosing 想要在后台做昂贵的回环检测,传感器回调还要把 IMU、LiDAR、轮速计数据按时间塞进缓冲区。 这时 C++ 对象模型没有消失,反而更重要:对象由谁拥有、谁能修改、锁保护什么不变量、线程如何停止,都会直接影响 SLAM 系统是否稳定。
17.1 为什么机器人系统选择多线程 ⭐⭐⭐⭐¶
工程问题:SLAM 不是一条均匀流水线¶
一个在线 SLAM 系统看起来像“传感器输入一帧,算法输出一个位姿”。 如果只看论文公式,这个过程可以被写成顺序伪代码:
while sensor_has_data:
read_frame()
track_pose()
update_local_map()
detect_loop()
optimize_graph()
publish_result()
工程系统不会这样简单。 Tracking 需要尽快给控制、导航或可视化输出当前位姿。 Local Mapping 可能要三角化新地图点、筛选关键帧、做局部 Bundle Adjustment。 Loop Closing 可能要检索词袋、做几何验证、触发 pose graph 优化。 IMU 预积分和 LiDAR 去畸变还要求高频数据按时间顺序进入缓冲区。
这些工作有三个典型差异:
- 实时性不同:Tracking 延迟敏感,后端优化吞吐敏感。
- 计算粒度不同:单帧跟踪通常短,局部优化和回环检测可能长。
- 数据依赖不同:Tracking 依赖最新传感器,Mapping 依赖关键帧队列,LoopClosing 依赖关键帧数据库和全局图。
如果把它们全部塞进一个循环,慢任务会直接拖住快任务。 如果把每个模块拆成进程,又要承担序列化、共享地图同步、跨进程生命周期管理和调试成本。 因此,许多 SLAM 系统选择“一个进程内多线程”:同一地址空间共享地图对象,用 mutex 和条件变量管理并发访问。
ORB-SLAM 系列的经典结构就是 Tracking、LocalMapping、LoopClosing。 Tracking 通常在主线程中消费图像或传感器帧。 LocalMapping 和 LoopClosing 分别运行在独立线程里,通过关键帧队列、地图对象和状态标志协作。 LIO-SAM、FAST-LIO、FAST-LIVO 等 LiDAR/IMU 系统也大量依赖传感器回调和处理线程之间的缓冲同步。
反面失败:单线程让实时路径被慢任务拖住¶
考虑一个相机 30Hz、IMU 200Hz、局部优化偶尔耗时 80ms 的系统。 单线程循环里如果每插入一个关键帧都同步执行局部优化,那么接下来两到三帧图像会排队。 Tracking 得到的数据已经过时,控制层看到的位姿延迟变大。 更糟糕的是,传感器驱动还在继续推数据,缓冲区越来越长,系统开始出现周期性卡顿。
反面结构常见于教学代码:
// 片段:能表达顺序流程,但不适合作为在线系统结构。
void runSequentialPipeline() {
while (true) {
Frame frame = readFrame();
Pose pose = track(frame);
if (shouldInsertKeyframe(frame)) {
updateLocalMap(frame);
runLocalBundleAdjustment();
detectLoopAndOptimizeGraph();
}
publish(pose);
}
}
这段代码的问题不在语法,而在系统不变量没有分层。 Tracking 的不变量是“尽快处理最新帧并输出位姿”。 Mapping 的不变量是“最终消费关键帧并维护局部地图一致性”。 LoopClosing 的不变量是“在可接受延迟内发现全局约束并修正图”。 把三者合在一个循环,会让慢任务破坏快任务的实时性。
为什么共享可变状态是并发编程的根本困难¶
在进入具体的线程管理之前,需要理解一个更基本的问题:为什么并发编程比顺序编程困难得多?答案不是”API 更复杂”,而是**顺序程序中隐含成立的一个根本假设——在并发中崩塌了**。
顺序一致性假设。 在单线程程序中,程序员可以安全地假设:(1) 语句按源代码顺序执行;(2) 对一个变量的写入立刻对后续读取可见;(3) 一个操作的中间状态不会被其他代码观察到。这三条假设合在一起构成了程序员的”顺序一致性直觉”——代码做什么就是它看起来做什么。
在多线程环境中,这三条假设全部失效:
- 指令重排:编译器和 CPU 都会重排指令。单线程中这不影响结果(as-if 规则保证单线程行为不变),但多线程中另一个线程可能观察到重排后的执行顺序。
- 可见性延迟:一个线程写入变量后,另一个线程可能看不到这次写入——因为写入可能停留在 CPU 的 store buffer 或 cache 中,尚未传播到其他核心。
- 操作的非原子性:即使是
++i这样简单的操作,在机器层面也是”读取-修改-写回”三步。另一个线程可能在三步之间观察或修改同一个变量。
这些失效不是”偶发的 bug”,而是 C++ 语言规范明确允许的行为——标准只保证没有 data race 时程序行为有定义。当存在 data race(两个线程无同步地访问同一内存位置且至少一个是写)时,程序行为**完全未定义**——不是”偶尔读到旧值”,而是编译器可以基于”没有 data race”的假设做出任何优化,程序可能以任何方式失败。
共享可变状态是这些问题的交汇点。 如果状态不共享(每个线程有自己的数据),没有可见性问题;如果状态不可变(只读数据),没有 data race;如果程序是单线程的,没有并发访问。问题只在”共享 + 可变 + 多线程”三者同时存在时出现。因此并发编程的核心策略都是在这三者中消除至少一个:不共享(消息传递、线程本地存储)、不可变(不可变快照、函数式风格)、或保护访问(mutex、atomic)。
抽象不变量:并发拆分服务于延迟隔离和所有权边界¶
多线程不是为了”看起来高级”,而是为了维护几个系统级不变量:
- 实时路径不被后台优化长期阻塞。
- 模块间通过队列传递事件,而不是互相调用长函数。
- 共享地图只在受保护的临界区内被修改。
- 线程停止时,队列、地图和状态标志有明确的关闭顺序。
- 异常、超时和传感器断流不会把整个系统拖进半关闭状态。
在这个视角下,线程不是算法的本体。 线程只是把不同延迟需求的任务放到不同执行上下文里。 真正需要设计的是数据所有权和同步协议:
- Tracking 产生关键帧,写入 Mapping 队列。
- Mapping 消费关键帧,修改局部地图,可能产生回环检测请求。
- LoopClosing 消费候选关键帧,读取地图并偶尔触发全局修正。
- 传感器回调只做短临界区入队,不在回调里执行重优化。
- 停止协议先通知队列关闭,再让消费线程退出,最后 join。
规则推导:单线程、多线程、多进程各有边界¶
单线程适合离线工具、确定性实验和小规模脚本。 它的优势是调试简单、顺序可复现、没有 data race。 如果系统目标是“读一个 rosbag,离线跑完并输出轨迹”,单线程可能更好。
多线程适合同一进程内共享大量状态、模块延迟差异明显、数据交换频繁的系统。 它的优势是共享内存成本低,线程间传递对象可以移动或引用,地图结构不必频繁序列化。 代价是需要严格设计锁、队列、停止和异常处理。
多进程适合故障隔离、语言边界、权限隔离或部署边界明显的系统。 例如相机驱动、深度学习检测、SLAM 后端、导航栈可以分进程,用 ROS2 topic/service/action 连接。 代价是消息序列化、调度延迟、跨进程状态一致性和部署复杂度。
机器人系统常混合三者:
- 进程之间用 ROS2 通信隔离大模块。
- 进程内部用多线程处理回调、队列和后端优化。
- 算法内核内部仍尽量保持顺序、可测试、可复现。
工程边界:多线程不能修复算法结构混乱¶
一个常见误解是:只要系统变慢,就加线程。 如果算法本身没有明确数据边界,加线程只会让问题更难定位。 例如地图对象到处暴露可变引用,任何模块都能随时改关键帧、地图点和位姿图。 此时即使加了 mutex,也很难知道锁保护的是哪个不变量。
多线程设计前应先问五个问题:
- 哪条路径是实时路径,最大可接受延迟是多少?
- 哪些任务可以滞后处理,滞后上限是多少?
- 哪些对象跨线程共享,哪些对象只在线程内部拥有?
- 每把锁保护哪个对象集合和哪个不变量?
- 停止、异常、传感器断流时,队列和线程如何退出?
如果这些问题答不清楚,线程数越多,系统越不可控。
代码验证:三阶段流水线的最小可运行模型¶
下面的代码不是完整 SLAM。 它只验证一个工程事实:Tracking 可以保持固定节奏产生关键帧,Mapping 可以以自己的速度消费,LoopClosing 可以在后台处理低频事件。 后面项目会把这个模型扩展成带停止协议和泛型队列的版本。
#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
struct Keyframe {
int id = 0;
};
int main() {
using namespace std::chrono_literals;
std::mutex m;
std::queue<Keyframe> queue;
bool finished = false;
std::thread mapping_thread([&] {
for (;;) {
Keyframe kf;
{
std::lock_guard<std::mutex> lock(m);
if (!queue.empty()) {
kf = queue.front();
queue.pop();
} else if (finished) {
break;
} else {
continue;
}
}
std::this_thread::sleep_for(30ms);
std::cout << "mapping keyframe " << kf.id << '\n';
}
});
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(10ms);
{
std::lock_guard<std::mutex> lock(m);
queue.push(Keyframe{i});
}
std::cout << "tracking keyframe " << i << '\n';
}
{
std::lock_guard<std::mutex> lock(m);
finished = true;
}
mapping_thread.join();
}
这个模型故意还没有使用 condition_variable。
Mapping 线程在队列空时会忙等,浪费 CPU。
它适合说明”线程拆分隔离了节奏”,不适合作为最终队列实现。
17.6 节会修正等待模型。
本质洞察:多线程的本质不是”让代码跑得更快”,而是”让不同延迟需求的任务不互相拖累”。Tracking 需要低延迟,Mapping 需要高吞吐,LoopClosing 可以后台慢慢做。线程只是把这些不同节奏的任务放进不同执行上下文的手段。真正需要设计的不是”开几个线程”,而是”数据由谁拥有、通过什么协议传递、在什么边界同步”。
多线程系统像一个工厂流水线:Tracking 是高速检测工位(每件产品停留时间短),Mapping 是精加工工位(每件产品需要更长时间),LoopClosing 是质检工位(低频抽检)。如果把三个工位串行排列,精加工慢了就会堵住检测工位。流水线设计让每个工位按自己的节奏工作,工位之间用传送带(队列)缓冲。
⚠️ 编程陷阱:为了”加速”而盲目添加线程 错误做法:系统变慢时,第一反应是”加线程”。 现象:加了线程后出现 data race、死锁、退出时崩溃等新问题,调试成本远大于性能收益。 根本原因:多线程不是性能优化的默认手段。如果算法本身没有明确的数据边界和所有权划分,加线程只会让问题更难定位。 正确做法:先回答五个问题(实时路径是什么?哪些任务可以滞后?哪些对象跨线程共享?每把锁保护什么?停止时如何退出?),再决定是否需要多线程。
💡 概念误区:认为单线程永远低效 新手想法:”单线程不能利用多核,所以实时系统一定要多线程。” 实际上:如果所有计算加起来不超过帧时间预算(如 33ms@30Hz),单线程顺序执行更简单、更可调试、更可复现。离线实验和算法验证阶段,单线程通常更合适。 正确理解:多线程解决的是”延迟隔离”和”吞吐瓶颈”,不是”让 CPU 看起来很忙”。
练习¶
- [分析题]:对比单线程顺序流水线和多线程流水线。列出至少 3 个场景适合单线程,3 个场景必须多线程。
- [代码题]:修改上面的最小流水线代码,添加帧延迟统计:记录每帧从 Tracking 产生到 Mapping 消费的时间差。观察忙等模式下 Mapping 线程的 CPU 占用。
- [跨章综合题]:C++语言核心/RAII与智能指针 讲了 RAII 管理单线程资源生命周期。在多线程环境下,RAII 的”析构即释放”原则需要配合什么额外机制(停止协议、队列关闭)才能安全工作?
17.2 std::thread 对象模型 ⭐⭐⭐⭐¶
工程问题:线程对象不是线程函数本身¶
std::thread 把一个可调用对象交给系统创建新的执行流。
创建后,C++ 程序里出现两个概念:
- 执行线程:操作系统调度的控制流,正在运行线程入口函数。
std::thread对象:C++ 对象,保存线程句柄,负责 join/detach 管理。
这两个概念必须分开。
线程函数可能已经运行结束,但 std::thread 对象仍然 joinable,调用者仍必须 join() 或 detach()。
反过来,std::thread 对象可以被移动到另一个变量里,底层执行线程并不会因此重启。
线程入口可以是普通函数、函数对象、lambda、成员函数加对象指针。 ORB-SLAM 风格的写法本质上就是把某个对象的成员函数作为线程入口:
class LocalMapping {
public:
void run();
};
LocalMapping mapping;
std::thread mapping_thread(&LocalMapping::run, &mapping);
这段代码的关键边界是:mapping 对象必须活得比线程入口更久。
如果对象先析构,线程还在调用它的成员函数,就是悬空指针。
反面失败:忘记 join 导致析构时终止程序¶
下面是一个反例片段。
它能编译,但离开作用域时会调用 std::terminate,因为 t 仍然 joinable。
C++ 标准库选择终止程序,而不是自动 detach。
原因很实际:自动 detach 会让后台线程继续访问已经析构的局部对象,故障更隐蔽。
自动 join 也不适合 std::thread,因为析构可能发生在异常展开路径上,隐式阻塞会让程序在意外位置卡住。
所以 std::thread 把责任交给调用者:你必须明确选择 join() 或 detach()。
另一个反面失败是参数引用。
#include <iostream>
#include <thread>
void addOne(int& x) {
++x;
}
void badReferencePassing() {
int value = 41;
// std::thread t(addOne, value); // 无法按 int& 调用:thread 会先复制参数。
std::thread t(addOne, std::ref(value));
t.join();
std::cout << value << '\n';
}
std::thread 默认会把参数 decay-copy 到内部存储。
这能避免很多悬空引用,但也意味着引用传递必须显式写 std::ref。
显式性是安全边界:读代码的人能立刻看到跨线程共享了外部对象。
抽象不变量:joinable 线程必须被收束¶
std::thread 的核心不变量可以写成一句话:
任何 joinable 的
std::thread对象在析构前必须被join()、detach()或移动走。
围绕这个不变量,可以推导出几条规则:
- 默认构造的
std::thread不代表执行线程,joinable() == false。 - 成功创建线程后,
joinable() == true。 join()等待执行线程结束,并让对象变为 not joinable。detach()让执行线程脱离对象管理,并让对象变为 not joinable。- 移动构造或移动赋值会转移线程句柄。
- joinable 对象析构会
std::terminate。
线程入口还有一个异常不变量:
异常不能逃出线程入口函数。
如果线程入口函数抛出异常并离开入口,程序同样会调用 std::terminate。
线程外部不能直接 catch 另一个线程里逃出的异常。
如果需要把错误传回创建者,要在入口内部捕获,再通过 std::promise、错误队列、状态对象或日志系统传递。
规则推导:参数传递、移动和成员函数入口¶
线程参数传递遵循“先保存,再在线程中调用”的模型。
std::thread 构造函数会把可调用对象和参数复制或移动到内部存储,然后在新线程中用这些存储值调用入口。
这解释了几个常见现象:
- 传普通对象会复制,除非使用
std::move。 - 传引用需要
std::ref或std::cref。 - 传指针只是复制指针值,不延长目标对象生命周期。
- 传成员函数时,第一个额外参数是对象指针、引用包装器或智能指针。
std::thread本身不可复制,只能移动。
成员函数入口示例:
#include <iostream>
#include <thread>
class Mapper {
public:
void run(int start_id) {
std::cout << "mapping starts at " << start_id << '\n';
}
};
int main() {
Mapper mapper;
std::thread t(&Mapper::run, &mapper, 10);
t.join();
}
如果 Mapper 由 std::shared_ptr 管理,也可以把 shared_ptr 复制进线程,显式延长对象生命周期。
这适合后台任务需要拥有对象的场景,但不应滥用。
共享所有权会让关闭顺序变得不直观。
工程边界:detach 很少是机器人核心线程的正确选择¶
detach() 的含义是:我不再关心这个线程何时结束,也不会再 join 它。
这在短命日志刷新、一次性后台统计等弱依赖任务中可能可接受。
但对 SLAM 的 Tracking、Mapping、LoopClosing、传感器处理线程来说,detach() 通常是错误信号。
核心线程需要明确关闭顺序:
- 停止接收新输入。
- 关闭队列或发送停止请求。
- 唤醒等待中的线程。
- 等待线程退出。
- 析构地图、词袋、优化器和传感器缓冲。
如果 detach 了 Mapping 线程,主系统可能先析构地图对象,后台线程随后访问地图,形成悬空引用。 这种故障往往只在退出时偶发,调试成本很高。
因此,机器人核心线程的默认选择是 RAII 管理和显式 join。
C++20 的 std::jthread 进一步把这个规则做成类型行为。
代码验证:安全入口、引用传递和异常转移¶
下面代码可单文件编译。
它展示三件事:引用要用 std::ref,线程入口内部捕获异常,错误通过 std::promise 传回主线程。
#include <exception>
#include <future>
#include <iostream>
#include <stdexcept>
#include <thread>
void increment(int& value) {
++value;
}
int main() {
int counter = 0;
std::thread t1(increment, std::ref(counter));
t1.join();
std::promise<int> result_promise;
std::future<int> result = result_promise.get_future();
std::thread t2([p = std::move(result_promise)]() mutable {
try {
throw std::runtime_error("mapping failed");
p.set_value(1);
} catch (...) {
p.set_exception(std::current_exception());
}
});
try {
std::cout << "counter = " << counter << '\n';
std::cout << "result = " << result.get() << '\n';
} catch (const std::exception& e) {
std::cout << "caught from thread: " << e.what() << '\n';
}
t2.join();
}
编译命令示例:
这个例子里的 std::promise 只负责传递一次结果或一次异常。
如果要长期传递多条消息,应使用线程安全队列。
⚠️ 编程陷阱:忘记
join()导致析构时std::terminate错误做法:创建std::thread后在正常路径join(),但异常路径跳过了join()。 现象:正常退出时一切正常;但当初始化函数抛异常时,程序在std::thread析构处直接终止。 根本原因:C++ 标准要求 joinable 的std::thread在析构前必须join()或detach()。异常路径上的栈展开会触发析构,此时线程仍然 joinable。 正确做法:用 RAII 守卫(ThreadGuard 或std::jthread)确保所有路径都能 join。🧠 思维陷阱:认为
std::ref是多余的语法 新手想法:"addOne(int& x)参数是引用,传给std::thread时引用自然就传过去了。" 实际上:std::thread构造函数会 decay-copy 所有参数到内部存储。不用std::ref,参数会被拷贝——函数修改的是拷贝件,不是原件。std::ref的显式性是安全边界:读代码的人立刻知道"这里跨线程共享了外部对象"。 正确思维:std::ref不是语法负担,而是"我知道自己在做什么"的声明。
练习¶
- [分析题]:解释
std::thread析构时为什么选择std::terminate而不是自动join()或自动detach()。两种自动策略各有什么隐患? - [代码题]:编写一个程序,故意让
std::thread在 joinable 状态下析构。用 try-catch 观察是否能捕获终止。然后用 ThreadGuard 修复。 - [跨章综合题]:C++语言核心/移动语义与完美转发 讲了移动语义。
std::thread不可复制只能移动——解释这个设计与"线程句柄是系统资源"之间的关系。对比std::unique_ptr的设计(同样不可复制只能移动)。
17.3 RAII 线程管理:守卫、jthread 与停止请求 ⭐⭐⭐⭐⭐¶
工程问题:异常路径也必须 join¶
手动 join() 在直线代码里不难。
困难在异常路径。
void runMapping() {
std::thread t([] {
runLocalMappingLoop();
});
loadVocabulary(); // 如果这里抛异常,t 的析构会 terminate。
initializeMap();
t.join();
}
只要 loadVocabulary() 或 initializeMap() 抛异常,函数开始栈展开,t 析构时仍然 joinable,程序终止。
这和内存泄漏问题相似:资源获取和释放不能散落在控制流里。
线程句柄也是资源,应该用 RAII 收束。
线程生命周期管理可以类比公司的入职和离职流程。std::thread 的创建就像员工入职——系统开始运行新的工作流。但如果员工离职(线程需要结束)时没有交接流程(join),而是直接走人(对象析构),公司(程序)会陷入混乱。更糟的是,如果员工还在使用公司资源(访问对象成员)时公司就关门了(对象析构),就会出现"人还在用但工位已经拆了"的情况(use-after-free)。RAII 线程管理就是确保"先完成交接,再关闭工位"。
如果 std::thread 的析构函数自动调用 join() 而不是 terminate() 会怎样?异常展开路径上的隐式 join 可能让程序在意外位置无限期卡住——想象一个后台优化线程需要 30 秒才能完成,异常处理路径却在等它结束。自动 detach 也不好——后台线程会继续访问可能已经销毁的局部对象。C++ 标准库选择 terminate() 是"fail-fast"策略:强迫程序员显式做出选择,而不是在两个都不好的默认行为之间猜测。C++20 的 std::jthread 选择了"先 request_stop 再 join",这是一个更合理的默认行为,因为它同时提供了协作式停止机制。
线程停止的三种范式:为什么协作式停止是最佳选择¶
线程停止看似简单——"让线程退出循环就好了"——但在有共享资源的系统中,停止顺序和停止时机直接影响系统的正确性。理解三种停止范式的差异,有助于选择正确的停止策略。
范式一:强制终止。 POSIX 的 pthread_cancel 和 Windows 的 TerminateThread 可以从外部强制杀死一个线程。这看似最简单,但在 C++ 中几乎不可用。原因是:强制终止不会执行栈展开——局部对象的析构函数不会被调用,RAII 资源不会被释放,mutex 不会被解锁。如果被杀死的线程持有 map_mutex_,这把锁就永远处于锁定状态,其他线程永久卡住。强制终止是"最暴力也最危险"的停止方式,在 SLAM 系统中绝不应使用。
范式二:标志轮询。 设置一个 atomic<bool> stop_requested 标志,工作线程在每次循环迭代中检查这个标志。这是最常用的轻量级停止方式,适合"循环频率高、每次迭代时间短"的线程。但如果线程正在 condition_variable::wait() 或阻塞式 I/O 中等待,它不会主动检查标志——需要额外的 notify_all() 唤醒或 wait_for 超时来打破等待。
范式三:C++20 协作式停止(stop_token/stop_source)。 std::jthread 和 std::stop_token 是 C++20 引入的完整停止协议。stop_source 像一个"停止按钮",stop_token 像一个"停止信号接收器"。工作线程可以通过 token.stop_requested() 轮询,也可以通过 stop_callback 注册回调——当停止按钮被按下时,回调自动执行。这解决了"线程在 condition_variable::wait() 中无法感知停止"的问题:可以在 stop_callback 中 notify_all(),唤醒等待中的线程。
SLAM 系统的停止顺序设计。 在多线程 SLAM 系统中,停止顺序通常是反向的——先创建的线程后停止:
- 停止接收新传感器数据(关闭驱动回调)。
- 关闭传感器缓冲队列(使消费者退出等待)。
- 停止 Tracking 线程并 join。
- 关闭关键帧队列。
- 停止 Mapping 线程并 join。
- 关闭回环检测队列。
- 停止 LoopClosing 线程并 join。
- 释放地图、词袋数据库和优化器。
每一步都要确保"先让线程退出,再释放它访问的资源"。颠倒顺序就会出现 use-after-free。这种严格的停止顺序很像操作系统的关机流程——先停止用户进程,再停止系统服务,最后卸载文件系统。任何一步跳过都可能导致数据损坏。
反面失败:析构顺序和停止协议分离¶
一个常见反面写法是把停止标志、线程对象和 join 放在不同位置:
class BadPipeline {
public:
void start();
void requestStop();
void wait();
private:
bool stop_requested_ = false;
std::thread mapping_thread_;
};
这个设计缺少三个约束:
stop_requested_如果被多个线程读写,普通bool会 data race。- 调用者可能只调用
requestStop(),忘记wait()。 - 析构函数如果没有处理 joinable 线程,退出路径仍然不安全。
真正的生命周期设计应把“请求停止”和“等待退出”放在同一个所有者里。 所有者析构时必须让线程收束。
抽象不变量:线程所有者析构后不留下活动入口¶
RAII 线程管理的不变量是:
拥有线程句柄的 C++ 对象析构完成后,不再有入口函数访问该对象的成员。
这句话比“析构时 join”更完整。
如果线程入口捕获了 this,析构函数必须先请求线程退出,再 join。
否则 join 只是等待,线程可能永远不退出。
停止协议通常包含三部分:
- 一个线程安全的停止请求。
- 等待点能被唤醒。
- 线程循环在合适边界检查停止请求并退出。
C++20 的 std::jthread 同时提供两个改进:
- 析构时如果 joinable,会先 request_stop,再 join。
- 线程入口可以接收
std::stop_token,用它查询停止请求。
std::jthread 不会强制杀死线程。
它仍然是协作式停止。
线程入口必须检查 token,等待点也要能被唤醒。
规则推导:线程守卫适合 C++17,jthread 适合 C++20¶
C++17 可以写一个不可复制的线程守卫:
#include <thread>
class ThreadGuard {
public:
explicit ThreadGuard(std::thread t) noexcept
: thread_(std::move(t)) {}
ThreadGuard(const ThreadGuard&) = delete;
ThreadGuard& operator=(const ThreadGuard&) = delete;
ThreadGuard(ThreadGuard&&) noexcept = default;
ThreadGuard& operator=(ThreadGuard&&) noexcept = default;
~ThreadGuard() {
if (thread_.joinable()) {
thread_.join();
}
}
private:
std::thread thread_;
};
这个守卫解决“忘记 join”的问题,但不解决“线程如何退出”的问题。
如果线程函数是无限循环,析构会永久等待。
因此 C++17 常配合 std::atomic<bool> 或条件变量关闭队列。
C++20 可以使用 std::jthread:
#include <chrono>
#include <iostream>
#include <thread>
int main() {
using namespace std::chrono_literals;
std::jthread mapping_thread([](std::stop_token stop) {
while (!stop.stop_requested()) {
std::cout << "mapping tick\n";
std::this_thread::sleep_for(20ms);
}
std::cout << "mapping exits\n";
});
std::this_thread::sleep_for(60ms);
}
mapping_thread 离开作用域时会请求停止并等待退出。
但注意:如果线程卡在不响应停止请求的阻塞调用里,析构仍会等待。
工程边界:协作式停止不能替代队列关闭¶
机器人系统里线程经常阻塞在队列等待。
如果只检查 stop_token,但线程正在 condition_variable::wait(),它不会自动醒来。
C++20 有 condition_variable_any 可以配合 stop_token,但常见工程做法仍是给队列设计 close():
close()在锁内设置关闭标志。close()调用notify_all()唤醒所有等待者。pop()在谓词里同时检查“队列非空”和“已关闭”。- 队列空且已关闭时,
pop()返回空结果,消费循环退出。
这种队列关闭协议比单独的停止标志更完整。 它把“没有数据”和“以后也不会有数据”区分开。
jthread 适合拥有后台循环的对象。
MessageQueue::close() 适合阻塞等待数据的消费端。
实际系统常同时使用两者:析构时 request_stop,队列 close 唤醒等待线程。
代码验证:C++20 jthread 与可停止等待¶
下面代码展示 std::jthread 的协作停止。
为了避免等待点睡太久,循环使用短周期检查 token。
在高性能系统中,应把这个模式替换为条件变量唤醒。
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
class PeriodicTask {
public:
void start() {
thread_ = std::jthread([this](std::stop_token stop) {
run(stop);
});
}
private:
void run(std::stop_token stop) {
using namespace std::chrono_literals;
while (!stop.stop_requested()) {
++ticks_;
std::cout << "tick " << ticks_.load() << '\n';
std::this_thread::sleep_for(10ms);
}
}
// 用 atomic:本例中 ticks_ 只在工作线程内修改,但一旦将来需要从外部
// 线程读取计数(如监控、日志),普通 int 的读写就会与这里的 ++ticks_
// 构成 data race。默认用 atomic 计数器是更安全的教学习惯。
std::atomic<int> ticks_{0};
std::jthread thread_;
};
int main() {
using namespace std::chrono_literals;
PeriodicTask task;
task.start();
std::this_thread::sleep_for(35ms);
}
编译命令示例:
这段代码的边界很清楚:PeriodicTask 析构时,std::jthread 先请求停止再 join。
如果 run() 内部访问共享对象,仍然需要 mutex 或其他同步机制。
⚠️ 编程陷阱:
jthread析构时线程卡在不响应停止请求的阻塞调用 错误做法:线程入口在cv.wait()上阻塞,但wait的谓词没有检查停止请求。 现象:jthread析构时调用request_stop()并等待 join,但线程永远不醒来——程序卡死在析构函数里。 根本原因:request_stop()只设置停止标志,不唤醒阻塞调用。如果线程在cv.wait()上睡眠,谓词中没有检查停止标志,notify_all()也没有被调用,线程就不会醒来。 正确做法:队列的close()方法应同时设置关闭标志和调用notify_all()。或使用condition_variable_any配合stop_token。💡 概念误区:认为
detach()是合理的线程管理方式 新手想法:"detach()让线程自己跑,不用管它,比join()更灵活。" 实际上:detach()意味着放弃对线程生命周期的控制。主系统析构地图、传感器缓冲和配置对象后,detached 线程可能仍在访问这些已销毁的对象——这是经典的 use-after-free。更糟的是,这种崩溃只在退出时偶发,极难复现和调试。 正确理解:机器人核心线程的默认选择是 RAII 管理 + 显式 join。只有真正"发出去就不管"的弱依赖任务才考虑 detach。
练习¶
- [分析题]:
std::jthread的析构顺序是request_stop()→join()。如果线程入口函数捕获了this指针,析构函数中的 join 和成员变量析构的顺序为什么很关键? - [代码题]:编写一个
PeriodicTask类,使用std::jthread和std::stop_token实现可停止的周期任务。验证离开作用域后线程能干净退出。 - [跨章综合题]:C++语言核心/RAII与智能指针 的 RAII 保证"获取即初始化,析构即释放"。
std::jthread是 RAII 在线程管理上的应用。对比std::jthread和std::unique_ptr:两者都是 RAII 包装器,分别管理什么资源?析构时分别做什么?
17.4 mutex 基础:data race、临界区与锁粒度 ⭐⭐⭐⭐⭐¶
这段代码模拟的是什么场景?——Tracking 和 Mapping 在争抢什么¶
在进入 mutex API 之前,先想清楚我们到底在保护什么。考虑 ORB-SLAM 的核心数据流:Tracking 线程每帧都要读取局部地图中的地图点来做特征匹配;Mapping 线程则在不断向地图中插入新的关键帧和地图点。如果你把地图想象成一个共享的白板——Tracking 在读白板上的内容做计算,Mapping 在白板上写新内容——那么问题就来了:Mapping 正在擦掉旧内容写新内容的那一瞬间,Tracking 读到的是什么?是旧内容?新内容?还是擦了一半的乱码?
这不是理论问题。ORB-SLAM2/3 的 Map 类内部有 mMutexMapUpdate,正是因为 Tracking 和 Mapping 同时访问 mspMapPoints 和 mspKeyFrames 这两个 std::set。如果没有这把锁,std::set 在一个线程插入(可能触发红黑树旋转)的同时被另一个线程遍历,迭代器可能指向已经移动的节点——程序不是"偶尔读到旧数据",而是直接段错误。
理解了这个场景,mutex 的动机就自然了:它不是"因为教科书说并发要加锁",而是因为 Tracking 线程需要在**地图处于一致状态**时观察它,Mapping 线程需要在**没有其他线程观察**时修改它。mutex 划定的就是这个"观察窗口"和"修改窗口"的边界。
工程问题:共享地图不是普通全局变量¶
SLAM 系统里最诱人的写法是让多个模块直接共享地图对象:
Tracking 读地图做位姿估计,Mapping 插入关键帧和地图点,LoopClosing 修正位姿图。
如果这些操作没有同步,std::vector 可能一边扩容,一边被另一个线程遍历。
这不是“读到旧数据”的小问题,而是 C++ data race 和容器不变量破坏。
data race 的定义可以简化理解为:
两个线程并发访问同一内存位置,其中至少一个是写操作,并且没有 happens-before 同步关系。
一旦发生 data race,程序行为未定义。 编译器可以基于“没有 data race”这个语言前提优化代码。 所以不要把 data race 当成“偶发数值错误”,它是整个 C++ 语义层的断裂。
反面失败:只保护写,不保护读¶
很多初学者会写成“写的时候加锁,读的时候不加锁”。 他们的直觉是读操作不会修改对象。 但如果另一个线程正在写,读也会和写并发访问同一内存位置。
#include <mutex>
#include <vector>
class UnsafeMap {
public:
void addKeyframe(int id) {
std::lock_guard<std::mutex> lock(m_);
keyframes_.push_back(id);
}
int latestKeyframe() const {
return keyframes_.back(); // 反例:读没有同步。
}
private:
mutable std::mutex m_;
std::vector<int> keyframes_;
};
如果 push_back 触发扩容,back() 可能在旧缓冲区或半更新状态上读取。
即使没有扩容,std::vector 的 size 更新也不是线程同步操作。
抽象不变量:mutex 不是保护数据,而是保护不变量¶
这个区分至关重要,值得展开讲清楚。很多初学者的心智模型是”mutex 保护数据”——好像给数据加了一把锁,只要加了锁就安全了。更准确的心智模型是:mutex 保护不变量(invariant)——保证在临界区外部观察到的数据始终满足某组一致性条件。
什么是不变量? 以银行转账为例。假设 Alice 有 100 元,Bob 有 200 元,系统的不变量是”Alice 和 Bob 的余额之和 = 300 元”。转账操作需要两步:从 Alice 扣款、给 Bob 加款。在两步之间,不变量被暂时破坏——Alice 已经被扣款但 Bob 还没有收到。如果此时另一个线程观察余额,会看到总和不等于 300 的中间状态。
mutex 的临界区就是为了覆盖”不变量暂时被破坏”的这段时间。进入临界区时,不变量成立;在临界区内部,不变量可以被暂时破坏(因为修改正在进行中);退出临界区前,不变量必须被恢复。其他线程只能在临界区外部观察数据——此时不变量一定成立。
为什么”mutex 保护数据”是不够精确的心智模型? 因为同一个 mutex 可能保护多个相关的数据结构。SLAM 地图的不变量可能是”关键帧数组、关键帧 id 到索引的映射、共视图边、最新关键帧 id 四者一致”。如果只锁住 keyframes_.push_back() 一行,而不锁住 id 映射和共视图边的更新,那么另一个线程在锁释放后可能看到”关键帧数组已经有新帧,但 id 映射还没更新”的不一致状态。锁住了数据,但没有保护不变量。
mutex 的名字不应表达”谁在用”,而应表达”保护什么”。 例如:
map_mutex_保护地图容器和关键帧位姿的一致性。queue_mutex_保护队列内容和关闭标志。state_mutex_保护系统状态枚举和错误消息。
临界区不是”觉得危险就加锁”的代码块。 临界区应覆盖一次不变量观察或修改的完整过程。
例如地图插入关键帧时,可能要同时更新:
- 关键帧数组。
- 关键帧 id 到索引的映射。
- 共视图边。
- 最新关键帧 id。
这些更新如果代表同一个地图不变量,就应该在同一把锁保护下完成,或者用更高层事务机制封装。
只锁 push_back 一行,不能保护跨容器一致性。
规则推导:lock_guard、unique_lock、scoped_lock 的分工¶
std::lock_guard<std::mutex> 是最简单的 RAII 锁。
构造时加锁,析构时解锁,不可手动解锁。
它适合短临界区。
std::unique_lock<std::mutex> 拥有更丰富的状态。
它可以延迟加锁、手动解锁、移动,并且是 condition_variable 的标准搭档。
std::unique_lock<std::mutex> lock(mutex_);
prepareSharedState();
lock.unlock();
doExpensiveWorkWithoutHoldingMutex();
std::scoped_lock 是 C++17 引入的通用 RAII 锁。
它可以一次锁多把 mutex,并使用避免死锁的锁定算法。
锁粒度需要权衡。 锁太粗,系统容易串行化,Tracking 被 Mapping 长时间阻塞。 锁太细,不变量被拆碎,死锁和竞态更容易出现。 机器人系统常用策略是:
- 共享数据结构用清晰的少数几把锁保护。
- 重计算不在锁内执行。
- 在锁内复制或移动出所需快照,然后释放锁计算。
- 写回共享状态时再次短暂加锁。
工程边界:锁不能保护锁外引用¶
一个隐蔽错误是从锁内返回引用或迭代器:
class BadMapView {
public:
const std::vector<int>& keyframes() const {
std::lock_guard<std::mutex> lock(m_);
return keyframes_;
}
private:
mutable std::mutex m_;
std::vector<int> keyframes_;
};
函数返回后锁已经释放。
调用者拿到的引用没有同步保护。
另一个线程可以同时修改 keyframes_。
正确做法通常是:
- 返回快照副本。
- 让调用者提供回调,在锁内访问受控视图。
- 返回不可变、生命周期稳定的数据结构。
- 用更高层消息传递替代共享容器。
对 SLAM 地图而言,直接返回内部容器引用尤其危险。 地图点和关键帧往往互相引用,单个容器快照也可能不代表一致地图。
锁粒度的选择是一个持续的工程权衡。锁太粗(整个地图一把锁),Tracking 读地图时会被 Mapping 的长时间写操作阻塞,实时性受损。锁太细(每个关键帧一把锁、每个地图点一把锁),跨对象不变量(如"关键帧 A 和地图点 B 的共视关系")没有统一保护,容易出现不一致状态。机器人系统的常见平衡点是:用少数几把锁保护清晰定义的数据聚合体,在锁内只做数据传递(复制快照或移动所有权),在锁外做重计算。这样既不会长时间持锁阻塞实时路径,又能保持不变量的完整性。
理解锁粒度还需要一个反事实:如果每个函数内部各自加锁,不考虑函数组合怎么样?empty() 返回 false、pop() 返回元素——两个函数各自正确,但组合时中间的间隙可能被其他线程利用。这就是 TOCTOU(Time-Of-Check-To-Time-Of-Use)竞态。正确的并发接口应把"检查+操作"合并为一个受锁保护的原子操作,如 tryPop() 返回 optional<T>。这个原则贯穿整个并发编程:不要让调用者在两个安全操作之间暴露不安全的间隙。
代码验证:锁内修改、锁外计算¶
下面代码可单文件编译。 它展示一个线程安全计数器和一个“复制快照后锁外计算”的模式。
#include <iostream>
#include <mutex>
#include <numeric>
#include <thread>
#include <vector>
class KeyframeStore {
public:
void add(int id) {
std::lock_guard<std::mutex> lock(m_);
ids_.push_back(id);
}
std::vector<int> snapshot() const {
std::lock_guard<std::mutex> lock(m_);
return ids_;
}
private:
mutable std::mutex m_;
std::vector<int> ids_;
};
int main() {
KeyframeStore store;
std::thread a([&] {
for (int i = 0; i < 1000; ++i) {
store.add(i);
}
});
std::thread b([&] {
for (int i = 1000; i < 2000; ++i) {
store.add(i);
}
});
a.join();
b.join();
const auto ids = store.snapshot();
const long sum = std::accumulate(ids.begin(), ids.end(), 0L);
std::cout << ids.size() << " ids, sum = " << sum << '\n';
}
快照副本有成本。 如果地图很大,不能每次都复制全部对象。 但这个例子强调的是边界:锁保护共享容器,计算在锁外做。 真实系统会根据数据量把快照缩小到关键帧 id、位姿数组或局部子图。
本质洞察:mutex 不是保护代码的,而是保护不变量的。当你说"这个 mutex 保护 map",真正的意思是"这个 mutex 保证在临界区外观察到的 map 始终满足其不变量(如关键帧数组和 id 映射一致、共视图边完整)"。临界区应覆盖一次不变量修改的完整过程,而不是"觉得危险就加锁"的任意代码块。
mutex 就像图书馆的借还书流程。借书时,图书馆员要同时更新书架上的库存和借阅记录——这两个操作必须在同一次事务中完成(类似同一个临界区)。如果只更新了库存但没更新记录,下一个读者查记录时会以为书还在,实际上书已经被拿走了。mutex 保护的就是这种"多个数据结构之间的一致性"。
如果只保护写操作不保护读操作会怎样?一个线程正在执行 push_back(可能触发 vector 扩容:分配新内存→复制元素→释放旧内存),另一个线程同时调用 back() 读取。读者可能读到旧缓冲区(已释放)、半复制状态、或者扩容前的 size 配合扩容后的数据指针。这不是"偶尔读到旧值"的问题,而是未定义行为——程序可能崩溃、返回垃圾数据、或者"看起来正常"但实际损坏了内存。
⚠️ 编程陷阱:从锁内返回容器的引用或迭代器 错误做法:
const std::vector<int>& keyframes() const { lock_guard lock(m_); return keyframes_; }现象:调用者拿到引用后锁已释放,另一个线程随时可能修改 vector。调用者遍历时可能遇到迭代器失效或数据损坏。 根本原因:锁只在持有期间保护数据。返回引用让调用者在无锁状态下访问受保护对象,等于把保险箱的门打开后交出钥匙。 正确做法:返回副本(return keyframes_;),或让调用者在回调中在锁内访问受控视图。🧠 思维陷阱:认为"只要每个函数都加锁就线程安全了" 新手想法:"
empty()加锁、pop()加锁,所以if (!q.empty()) q.pop()线程安全。" 实际上:每个函数单独无 data race,但empty()返回 false 后、进入pop()前,另一个线程可能已经清空了队列。这是 race condition——两个安全操作之间的间隙仍然不安全。 正确思维:并发接口设计要考虑"一个业务动作需要的完整不变量在哪里被保护"。把检查和操作合并为原子接口(如tryPop() -> optional<T>)。
练习¶
- [分析题]:解释为什么
lock_guard适合短临界区,unique_lock适合条件变量,scoped_lock适合多锁场景。各给一个 SLAM 中的真实用例。 - [代码题]:实现一个线程安全的
KeyframeStore,支持add()、snapshot()(返回副本)和size()。用两个线程同时写入 2000 个 id,验证最终 snapshot 包含全部 id。 - [跨章综合题]:C++语言核心/类型系统与值类别推导 讲了 C++ 的值类别和对象模型。在单线程中,对象的"未初始化"是常见陷阱;在多线程中,对象的"并发访问"是常见陷阱。对比两种陷阱的检测工具(单线程用 sanitizer,多线程用 ThreadSanitizer)。
17.5 condition_variable:等待-通知模型 ⭐⭐⭐⭐⭐¶
这段代码模拟的是什么场景?——Mapping 线程在等什么¶
想象 ORB-SLAM 的 LocalMapping::Run() 函数。Mapping 线程的主循环是:"从关键帧队列里取一个新关键帧,做局部 BA,然后再取下一个。"但关键帧不是匀速产生的——Tracking 线程只有在当前帧被判定为关键帧时才会入队。大部分时间关键帧队列是空的。
如果 Mapping 线程用忙等(不断检查队列是否为空),它会一直占着一个 CPU 核心做无用功。在 Jetson 这种 4-8 核平台上,一个核心被空转占用意味着 Tracking 或 LoopClosing 可能被调度延迟。更实际地说,如果你在笔记本上开发,忙等的 Mapping 线程会让风扇全速运转——虽然什么有用的计算都没做。
条件变量解决的正是这个问题:Mapping 线程在队列为空时**睡眠**,不占 CPU;Tracking 线程插入关键帧后**唤醒** Mapping 线程。这不仅是 API 层面的优化,而是让整个系统的 CPU 资源分配合理化——睡眠的线程不参与调度,空出来的核心可以服务其他计算密集型任务。
ORB-SLAM3 的 LocalMapping 中使用了类似机制:mbAbortBA 和关键帧队列检查配合 usleep 实现了一种简化的等待-通知模式。而更规范的 C++ 做法是使用 std::condition_variable——它同时解决了"何时睡眠""何时唤醒""如何处理虚假唤醒"三个问题。
工程问题:队列为空时不应该忙等¶
17.1 节的最小流水线有一个明显缺陷:Mapping 线程在队列为空时不断循环检查。 这种忙等会浪费 CPU,尤其当系统里还有图像解码、点云处理、优化器和 ROS2 executor 时,空转线程会抢占真正需要计算的任务。
生产者消费者队列需要表达的是:
- 生产者入队后通知消费者。
- 消费者在队列为空时睡眠。
- 队列关闭时,消费者即使没有新数据也要醒来退出。
- 等待和检查状态必须在同一个 mutex 保护的不变量下完成。
std::condition_variable 正是为这种等待-通知关系设计的。
它不是消息队列,也不保存通知次数。
它只负责让线程在某个条件不满足时睡眠,并在其他线程通知后重新检查条件。
反面失败:先检查再等待导致丢失通知¶
下面是一个常见错误模式。 代码片段能表达问题,但不应作为队列实现使用。
#include <condition_variable>
#include <mutex>
#include <queue>
std::mutex m;
std::condition_variable cv;
std::queue<int> q;
int badPop() {
if (q.empty()) {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock);
}
int value = q.front();
q.pop();
return value;
}
问题至少有四个:
q.empty()在锁外检查,本身就是并发访问风险。- 检查到空以后、进入
wait()以前,生产者可能已经入队并notify_one(),通知丢失。 wait()返回后没有重新检查队列是否真的非空。- 当
q.empty()为假而走"快路径"时,q.front()和q.pop()都在锁外执行,与生产者的push并发访问同一个std::queue(至少一个是写),同样是 data race——这条快路径根本没有任何同步保护。
condition_variable 的通知不是事件计数器。
如果没有线程正在等待,notify_one() 不会为未来保存一次唤醒。
因此等待方必须把“状态检查”和“进入等待”放进同一个原子化等待操作中。
抽象不变量:condition_variable 的完整语义模型¶
条件变量的核心不变量是:
通知只是提示状态可能改变,谓词才是状态是否满足的真相。
理解条件变量需要把握三个层次:接口语义(为什么需要谓词)、原子操作序列(wait 内部做了什么)、以及**虚假唤醒的底层原因**。
为什么需要谓词? 这不是"最佳实践"级别的建议,而是正确性的必要条件。条件变量的 notify_one() / notify_all() 不是事件计数器——它不会"保存"通知供未来使用。如果调用 notify_one() 时没有线程在等待,这次通知直接丢失。因此,等待方不能只依赖"收到通知"来判断条件是否满足,而必须自己检查实际状态。谓词就是这个"实际状态检查"。
标准写法:
这行代码等价于一个循环:
wait() 的原子操作序列是理解条件变量的关键。 wait(lock) 内部做了一个精心设计的原子操作序列:
- 调用方已经持有 mutex(这是前置条件,违反会导致未定义行为)。
wait()**原子地**释放 mutex 并让线程进入等待状态。"原子地"是关键——释放锁和进入等待必须是不可分割的一步。如果先释放锁再进入等待,生产者可能在释放锁之后、进入等待之前执行notify_one(),这次通知就丢失了。- 被通知(
notify_one/notify_all)或虚假唤醒后,线程被唤醒。 wait()重新获取 mutex。只有获取成功后,wait()才返回。- 如果使用谓词版本,返回前再次检查谓词。谓词不满足就继续回到步骤 2。
虚假唤醒(spurious wakeup)的底层原因。 很多教材把虚假唤醒说成"实现的遗憾",但它实际上有深层的技术原因:
- POSIX pthread 实现原因:
pthread_cond_wait在底层使用 futex 系统调用(Linux)或类似机制。futex 的等待可以被信号中断(EINTR),此时线程被唤醒但条件并未改变。与其让每一层都处理EINTR重试逻辑,POSIX 标准选择把"可能的虚假唤醒"作为接口契约的一部分,让调用者用循环+谓词统一处理。 - 多处理器优化原因:在某些多处理器架构上,允许虚假唤醒可以避免复杂的跨核心同步,让
notify_one的实现更高效。如果标准要求"绝不虚假唤醒",实现需要在唤醒路径上做额外的确认步骤,增加延迟。 - 多等待者竞争原因:当多个线程等待同一个条件变量,
notify_one唤醒了一个线程。但在被唤醒的线程重新获取 mutex 之前,另一个线程可能先获取了 mutex 并消费了数据。被唤醒的线程最终获取 mutex 后发现数据已被消费——这在效果上等同于虚假唤醒。
因此不写谓词的 wait(lock) 只适合非常底层的封装;业务代码应优先使用 wait(lock, pred)。
丢失通知(lost wakeup)问题。 与虚假唤醒相对的是丢失通知。如果生产者在消费者进入 wait() 之前就调用了 notify_one(),这次通知不会被保存——消费者随后进入 wait() 就可能永远睡下去。谓词版本 wait(lock, pred) 同时解决了这两个问题:虚假唤醒后重新检查谓词发现条件不满足就继续等待;丢失通知时因为状态已经满足谓词(生产者已经入队了数据)而直接跳过等待、不进入 wait()。
规则推导:生产者消费者队列必须同时处理数据和关闭¶
一个可用的阻塞队列至少需要:
push():在锁内检查是否关闭,入队,然后通知一个消费者。pop():等待“队列非空或已关闭”。close():在锁内设置关闭标志,然后通知所有等待者。- 返回值能表达“拿到数据”和“队列已空且关闭”两种结果。
C++17 可以用 std::optional<T> 表达 pop() 的结果。
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <utility>
template <typename T>
class MessageQueue {
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(m_);
if (closed_) {
throw std::runtime_error("push to closed queue");
}
q_.push(std::move(value));
}
cv_.notify_one();
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [this] {
return closed_ || !q_.empty();
});
if (q_.empty()) {
return std::nullopt;
}
T value = std::move(q_.front());
q_.pop();
return value;
}
void close() {
{
std::lock_guard<std::mutex> lock(m_);
closed_ = true;
}
cv_.notify_all();
}
private:
std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
bool closed_ = false;
};
notify_one() 放在锁外通常更好。
因为被唤醒的线程需要重新获取同一把 mutex。
如果通知时还持有锁,被唤醒线程可能立刻阻塞在锁上。
不过“锁内通知”也不是语义错误;关键是状态修改必须在通知之前完成。
工程边界:条件变量不替代背压和容量控制¶
上面的队列是无界队列。 如果 Tracking 生产关键帧速度持续高于 Mapping 消费速度,队列会不断增长。 真实系统需要背压策略:
- 阻塞生产者,限制队列最大长度。
- 丢弃旧数据,例如视觉显示帧或低价值中间结果。
- 合并任务,例如只保留最新状态。
- 触发降级,例如暂停插入关键帧或降低优化频率。
不同消息语义需要不同策略。
关键帧通常不能随便丢。
实时可视化图像可以丢旧帧。
停止信号不能丢。
这就是为什么工程队列不应只看作 std::queue + mutex,还要把消息语义写进接口。
条件变量还有一个边界:谓词必须只检查受同一把锁保护的状态。 如果谓词同时读一个未同步的外部标志,就会重新引入 data race。
代码验证:阻塞队列驱动 Mapping 线程退出¶
下面是完整可编译示例。
它验证队列关闭后,消费线程会从 pop() 得到 std::nullopt 并自然退出。
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <thread>
#include <utility>
template <typename T>
class MessageQueue {
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(m_);
if (closed_) {
throw std::runtime_error("push to closed queue");
}
q_.push(std::move(value));
}
cv_.notify_one();
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [this] {
return closed_ || !q_.empty();
});
if (q_.empty()) {
return std::nullopt;
}
T value = std::move(q_.front());
q_.pop();
return value;
}
void close() {
{
std::lock_guard<std::mutex> lock(m_);
closed_ = true;
}
cv_.notify_all();
}
private:
std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
bool closed_ = false;
};
int main() {
using namespace std::chrono_literals;
MessageQueue<int> keyframes;
std::thread mapping([&] {
while (auto id = keyframes.pop()) {
std::cout << "mapping consumes " << *id << '\n';
std::this_thread::sleep_for(20ms);
}
std::cout << "mapping exits\n";
});
for (int i = 0; i < 4; ++i) {
keyframes.push(i);
}
keyframes.close();
mapping.join();
}
编译命令示例:
这个队列已经足够支撑本章累积项目。 后续可以加入容量、超时、统计计数和丢弃策略。
条件变量的工作方式可以类比餐厅的叫号系统。顾客(消费线程)到店后如果没有空桌(队列为空),不应该站在门口每秒问一次"有桌了吗"(忙等),而是拿号坐在等候区(wait)。当有桌子空出来时,服务员喊号(notify_one)。顾客被叫醒后还要检查"是不是真的轮到我了"(谓词),因为可能是误叫(虚假唤醒)。如果餐厅关门了(队列关闭),所有等候的顾客都要被通知离开(notify_all)。
如果不使用谓词版本的 wait 会怎样?虚假唤醒时,线程从 wait() 返回后队列仍然为空,如果直接调用 q_.front() 就会触发未定义行为。丢失通知时,生产者在消费者进入 wait() 之前就 notify_one() 了,这次通知不会被保存——消费者随后进入 wait() 就永远睡下去。谓词版本 wait(lock, pred) 同时解决了这两个问题:虚假唤醒后重新检查谓词,丢失通知时因为状态已经满足谓词而直接跳过等待。
⚠️ 编程陷阱:
condition_variable::wait()不写谓词 错误做法:cv.wait(lock);不检查共享状态。 现象:虚假唤醒时消费者在空队列上调用front(),崩溃或读到垃圾数据。 根本原因:条件变量允许虚假唤醒——即使没有对应通知,wait()也可能返回。不写谓词就没有防线。 正确做法:始终使用cv.wait(lock, [&]{ return !q_.empty() || closed_; });。💡 概念误区:认为
notify_one()会"保存"一次通知 新手想法:"生产者先notify_one(),消费者稍后wait()时就能收到这次通知。" 实际上:条件变量的通知不是事件计数器。如果没有线程正在等待,notify_one()什么都不做——通知直接丢失。这也是为什么必须把状态检查和进入等待放在同一把锁保护下:先检查状态,状态不满足才进入wait()。 正确理解:通知只是"提示状态可能改变",谓词才是"状态是否满足"的真相。
练习¶
- [分析题]:解释
cv.wait(lock, pred)等价于while (!pred()) cv.wait(lock);。为什么这个循环能同时解决虚假唤醒和丢失通知? - [代码题]:给
MessageQueue添加tryPopFor(duration)方法,超时返回std::nullopt。使用cv.wait_for(lock, timeout, pred)实现。 - [跨章综合题]:原子操作与内存模型 将介绍 atomic 操作。
condition_variable使用 mutex + 谓词实现等待-通知;atomic 可以实现无等待的状态检查。讨论:什么场景适合条件变量(如队列非空),什么场景适合 atomic(如停止标志)?
17.6 Deadlock:四个必要条件与多锁策略 ⭐⭐⭐⭐⭐¶
从一个真实场景出发:两个线程各持一把锁,想要对方的锁¶
死锁不是一个需要死记四条理论条件的抽象概念。让我们从一个具体的、在 SLAM 系统中真实会发生的场景开始,然后从这个场景中**推导出**四条条件——而不是反过来。
场景:ORB-SLAM 风格的系统中,Mapping 线程需要从关键帧队列取出新关键帧,然后插入地图。这个操作涉及两把锁——queue_mutex(保护关键帧队列)和 map_mutex(保护地图数据)。Mapping 线程的逻辑是"先锁队列取关键帧,然后锁地图插入关键帧"。
与此同时,Tracking 线程在决定是否插入新关键帧时,需要查看局部地图的状态和队列的长度。Tracking 线程的逻辑是"先锁地图查状态,然后锁队列检查长度"。
注意两个线程的锁获取**顺序相反**:
时刻 T1:
Mapping 线程:获取 queue_mutex ✓ → 准备获取 map_mutex...
Tracking 线程:获取 map_mutex ✓ → 准备获取 queue_mutex...
时刻 T2:
Mapping 线程:持有 queue_mutex,等待 map_mutex(被 Tracking 持有)
Tracking 线程:持有 map_mutex,等待 queue_mutex(被 Mapping 持有)
结果:两个线程永久等待,系统冻结。
这就是死锁。不是理论概念,而是"程序突然不动了,top 显示两个线程都在 futex_wait,但没有任何输出"的真实故障。
从这个场景推导四条必要条件:为什么死锁能发生?让我们逐一检查这个场景中满足了哪些前提——如果其中任何一条不满足,死锁就不可能出现:
- 互斥:两把锁都是互斥的——同一时刻只能被一个线程持有。如果锁可以被多个线程同时持有(比如只读操作用 shared_lock),就不需要等待,死锁不可能发生。
- 持有并等待:每个线程在已经持有一把锁的情况下,去等待另一把锁。如果线程在获取第二把锁之前必须先释放第一把锁,就不会出现"双方各持一把"的对峙局面。
- 不可剥夺:一个线程持有的锁不能被另一个线程强行抢走。如果系统可以强行中断 Tracking 线程并释放
map_mutex,Mapping 线程就能继续——但这会让被保护的地图数据处于不一致状态,比死锁更危险。 - 循环等待:Mapping 等 Tracking 释放
map_mutex,Tracking 等 Mapping 释放queue_mutex——形成了等待环。如果两个线程都按同一顺序获取锁(比如都先锁queue_mutex再锁map_mutex),等待关系就是一条链而非环,链的末端线程能完成工作并释放锁,连锁反应让所有线程最终都能继续。
这四条就是 Coffman、Elphick 和 Shoshani 1971 年在"System Deadlocks"论文中证明的死锁四个必要条件——它们不是人为规定的,而是从"两个线程各持一把锁想要对方的锁"这个物理现象中提取出的结构性条件。只要破坏其中任何一条,死锁就不可能发生。
工程问题:地图锁和队列锁经常一起出现¶
SLAM 系统中,多锁访问很常见。 上面的场景不是特例。LoopClosing 线程读取地图后,可能向优化队列提交任务。 Tracking 线程可能在决定是否插入关键帧时同时查看局部地图和队列长度。 传感器回调可能在持有 IMU 缓冲锁时试图获取状态锁更新时间戳。
每一对锁的获取,如果不同路径使用不同顺序,就可能形成等待环。
反面失败:不同路径使用不同锁顺序¶
下面代码可编译,但可能永久卡住。 它是一个反例,用来观察锁顺序错误。
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
int main() {
using namespace std::chrono_literals;
std::mutex map_mutex;
std::mutex queue_mutex;
std::thread tracking([&] {
std::lock_guard<std::mutex> q_lock(queue_mutex);
std::this_thread::sleep_for(10ms);
std::lock_guard<std::mutex> m_lock(map_mutex);
std::cout << "tracking updated queue and map\n";
});
std::thread mapping([&] {
std::lock_guard<std::mutex> m_lock(map_mutex);
std::this_thread::sleep_for(10ms);
std::lock_guard<std::mutex> q_lock(queue_mutex);
std::cout << "mapping updated map and queue\n";
});
tracking.join();
mapping.join();
}
这段程序的问题不是 sleep。 sleep 只是放大时序窗口。 真实系统中,图优化、内存分配、日志输出、回调调度都可能制造同样窗口。
抽象不变量:如何预防死锁——逐条破坏必要条件¶
前面我们从具体场景推导出了死锁的四个必要条件。工程上最常破坏的是**条件四(循环等待)**——通过建立全局锁顺序消除等待环。为什么优先破坏这一条?因为条件一(互斥)通常不能取消——我们用 mutex 就是为了互斥;条件三(不可剥夺)也不能轻易取消——强行释放锁会让被保护的数据处于半修改状态。条件二(持有并等待)可以通过 std::scoped_lock 一次性获取所有锁来破坏,但这要求调用点知道所有需要的锁。条件四通过锁排序来破坏是最通用的策略——只需要一个全局约定,所有代码路径遵守即可。
具体手段:
- 锁排序(Lock Ordering):为所有 mutex 定义一个全局偏序关系,所有代码路径必须按这个顺序获取锁。回顾上面的场景:如果规定"必须先锁
queue_mutex再锁map_mutex",那么 Tracking 线程就不能"先锁 map 再锁 queue",循环等待在数学上不可能形成——等待图(wait-for graph)变成了 DAG。 std::scoped_lock:一次获取多把锁,内部使用 try-and-back-off 算法保证不形成环。这破坏了条件二——线程不会"持有一把锁再等另一把",而是要么同时获取所有锁,要么一个都不持有。- 缩短临界区:持锁时不调用外部代码(回调、ROS publish、优化器),避免在锁内引入不可控的新锁需求。
- 消息队列替代共享状态:把"多线程访问同一对象"转化为"通过队列传递消息",从根本上减少多锁需求。
规则推导:锁排序和 std::scoped_lock¶
锁排序要求为所有 mutex 定义一个固定顺序。 例如:
任何线程只要需要多把锁,都必须按这个顺序获取。 这样就不可能形成循环等待。
C++17 的 std::scoped_lock 可以一次接收多把 mutex:
它内部使用避免死锁的锁定算法。 这不意味着“自动解决所有 deadlock”。 它只解决这一行里多把 mutex 的获取问题。 如果你先手动持有了一把锁,再在深层函数里获取另一把锁,仍然可能和其他路径形成环。
C++11/14 可以用 std::lock 加 std::adopt_lock:
std::lock(queue_mutex, map_mutex);
std::lock_guard<std::mutex> q_lock(queue_mutex, std::adopt_lock);
std::lock_guard<std::mutex> m_lock(map_mutex, std::adopt_lock);
std::scoped_lock 更简洁,优先使用。
工程边界:持锁时不要调用不受控回调¶
另一个死锁来源是锁内调用外部代码。
例如地图对象在持有 map_mutex_ 时调用用户注册的回调:
// 反例片段:回调可能再次访问地图或队列。
void notifyMapChanged() {
std::lock_guard<std::mutex> lock(map_mutex_);
callback_();
}
如果 callback_() 内部又调用需要 map_mutex_ 的函数,可能自锁。
如果它访问队列,也可能形成跨锁环。
更稳妥的做法是锁内准备事件数据,锁外调用回调:
void notifyMapChangedSafely() {
MapEvent event;
{
std::lock_guard<std::mutex> lock(map_mutex_);
event = buildEventLocked();
}
callback_(event);
}
机器人系统里尤其要避免锁内做这些事:
- 调用 ROS publish、service、action。
- 调用优化器长时间求解。
- 写磁盘或保存地图。
- 调用未知回调。
- 等待另一个线程结束。
这些操作的耗时和内部锁不可控。
代码验证:用 std::scoped_lock 安全移动关键帧¶
下面示例展示同时锁定队列和地图。 它刻意让临界区只做移动和插入,昂贵计算放到锁外。
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <vector>
struct Keyframe {
int id = 0;
};
class SharedState {
public:
void pushKeyframe(Keyframe kf) {
std::lock_guard<std::mutex> lock(queue_mutex_);
queue_.push(std::move(kf));
}
std::optional<Keyframe> moveOneKeyframeToMap() {
std::scoped_lock lock(queue_mutex_, map_mutex_);
if (queue_.empty()) {
return std::nullopt;
}
Keyframe kf = std::move(queue_.front());
queue_.pop();
map_keyframes_.push_back(kf.id);
return kf;
}
std::vector<int> mapSnapshot() const {
std::lock_guard<std::mutex> lock(map_mutex_);
return map_keyframes_;
}
private:
mutable std::mutex map_mutex_;
std::mutex queue_mutex_;
std::queue<Keyframe> queue_;
std::vector<int> map_keyframes_;
};
int main() {
SharedState state;
state.pushKeyframe(Keyframe{1});
state.pushKeyframe(Keyframe{2});
std::thread mapping([&] {
while (auto kf = state.moveOneKeyframeToMap()) {
std::cout << "inserted " << kf->id << '\n';
}
});
mapping.join();
for (int id : state.mapSnapshot()) {
std::cout << "map has " << id << '\n';
}
}
多锁不是常态目标。 如果某个设计需要频繁同时持有三四把锁,通常说明数据边界需要重新切分。
⚠️ 编程陷阱:不同代码路径使用不同的锁顺序 错误做法:Tracking 线程先锁
queue_mutex再锁map_mutex;Mapping 线程先锁map_mutex再锁queue_mutex。 现象:两个线程各自持有一把锁等待另一把,系统永久卡死。只在特定时序下触发,极难复现。 根本原因:循环等待是死锁的必要条件之一。不同路径使用不同锁顺序就可能形成循环。 正确做法:定义全局锁顺序(如state < queue < map < optimizer),所有路径遵守;或用std::scoped_lock一次获取多把锁。💡 概念误区:认为
std::scoped_lock自动解决所有死锁 新手想法:"std::scoped_lock(a, b)能避免死锁,所以以后都用它就行了。" 实际上:scoped_lock只解决"同时获取多把锁"的顺序问题。如果你先手动持有了一把锁,然后在深层函数里获取另一把锁,scoped_lock看不到先前持有的锁,仍然可能和其他路径形成环。 正确理解:scoped_lock是工具,锁设计是策略。工具只能辅助策略,不能替代策略。
练习¶
- [分析题]:列出死锁的四个必要条件(互斥、持有并等待、不可剥夺、循环等待),对每个条件说明在 SLAM 系统中是否可以被破坏,以及如何破坏。
- [代码题]:编写一个故意死锁的程序(两个线程使用相反的锁顺序)。然后用
std::scoped_lock修复。使用 GDB 或gdb -batch -ex "thread apply all bt"观察死锁时的线程状态。 - [跨章综合题]:C++语言核心/错误处理与异常安全 讲了异常安全。在持有锁时如果函数抛异常,
lock_guard会确保锁被释放(RAII)。讨论:如果在双锁临界区中,第一把锁获取成功但第二把锁获取前抛异常,系统状态是否一致?scoped_lock如何处理这种情况?
17.7 shared_mutex:读多写少地图查询 ⭐⭐⭐⭐¶
这段代码模拟的是什么场景?——一张地图被多个模块同时查询¶
考虑 SLAM 系统中地图的访问模式。在一个典型的运行周期内:Tracking 线程以 30Hz 查询局部地图做特征匹配(每秒 30 次读);可视化线程以 10Hz 读取地图点和关键帧用于渲染(每秒 10 次读);LoopClosing 线程在检索候选回环时读取关键帧数据库(每秒 1-3 次读)。而 Mapping 线程插入新关键帧和地图点的频率大约只有 2-5Hz(每秒 2-5 次写)。
这意味着**读操作的频率是写操作的 10-20 倍**。如果用普通 std::mutex,Tracking 在读地图时持有互斥锁,可视化线程想同时读地图就必须等待——但两个纯读操作之间根本不需要互斥!它们各自独立地遍历相同的数据结构,不会互相干扰。把纯读操作也互斥排队,就像图书馆规定"同一时间只能有一个人看书"——完全不必要的限制。
std::shared_mutex 解决的就是这个问题:允许多个读者同时持有共享锁(共同看书),但写者需要独占锁(修改书架时所有人都暂停阅读)。
工程问题:地图查询远多于地图修改¶
SLAM 地图通常是读多写少。 Tracking 高频读取局部地图点用于匹配。 可视化线程读取关键帧和地图点发布。 LoopClosing 读取关键帧数据库做检索。 Mapping 相对低频地插入关键帧、剔除地图点、更新局部图。
如果所有读写都用一把 std::mutex,多个纯读操作也会互相排队。
std::shared_mutex 提供读写锁语义:
- 多个读者可以同时持有共享锁。
- 写者需要独占锁。
- 写者持锁时,读者不能进入。
C++17 提供 std::shared_mutex 和 std::shared_lock。
写锁仍用 std::unique_lock<std::shared_mutex>。
反面失败:读锁内返回内部引用¶
读写锁不改变生命周期规则。 下面仍然是错误边界:
#include <shared_mutex>
#include <vector>
class BadSharedMap {
public:
const std::vector<int>& keyframes() const {
std::shared_lock<std::shared_mutex> lock(m_);
return keyframes_;
}
private:
mutable std::shared_mutex m_;
std::vector<int> keyframes_;
};
函数返回时读锁释放。
调用者拿到的引用不再受保护。
写者随后可以修改 vector。
shared_mutex 只允许读并发,不允许锁外引用并发。
抽象不变量:共享锁保护的是只读观察窗口¶
共享锁的不变量是:
持有共享锁期间,受保护对象不会被写者修改;释放共享锁后,观察窗口结束。
因此读操作有两种安全模式:
- 在读锁内完成全部读取和轻量计算。
- 在读锁内复制必要快照,释放锁后做重计算。
第一种适合很短的查询,例如按 id 找位姿。 第二种适合较重的算法,例如复制局部地图点坐标后做匹配。
写操作必须用独占锁。 写锁临界区应尽量短。 如果写者需要先做昂贵计算,应先在锁外准备数据,再进入写锁提交修改。
读写锁的理论基础:公平性、饥饿与性能权衡¶
读写锁看起来是一个"严格更好"的同步原语——它允许多个读者并发,同时保证写者独占。但这种直觉忽略了一个关键的设计维度:公平性(fairness)。不同的公平策略会导致截然不同的性能和正确性特征。
读者优先(Reader-preference)。 在这种策略下,只要有读者持锁或等待读锁,新的读者都可以立即获取共享锁,即使有写者在等待。好处是读者永远不会被写者阻塞;坏处是写者可能被无限延迟——如果读者源源不断到来,写者永远等不到空窗期。这叫**写者饥饿(writer starvation)**。在 SLAM 系统中,如果 Tracking 和可视化线程不断读取地图,Mapping 线程可能永远无法获得写锁来插入新关键帧——系统看起来"运行正常"但地图不再更新。
写者优先(Writer-preference)。 在这种策略下,一旦有写者等待独占锁,新的读者会被阻塞,直到所有等待中的写者完成。好处是写者不会被饥饿;坏处是读者可能被连续的写操作延迟。在实时系统中,如果 Mapping 和 LoopClosing 频繁写入地图,Tracking 的读取可能出现不可接受的延迟。
公平锁(Fair lock)。 按请求到达的顺序排队,不区分读写。公平但吞吐可能更低,因为连续的读请求之间被写请求打断,无法充分利用读并发的优势。
C++ 标准的选择。 C++ 标准没有规定 std::shared_mutex 的公平策略——这是实现定义的。Linux 上的 pthread_rwlock 默认是写者优先(可通过属性修改)。因此,在没有测量之前不应假设读写锁一定比普通 mutex 更好。工程上的决策应基于实际负载的读写比例:当读写比高于 10:1 且读临界区足够短时,读写锁通常有收益;当读写比接近 1:1 或写临界区很长时,普通 mutex 可能更简单也更可预测。
读写锁的隐藏成本。 除了公平性问题,读写锁还有一个容易被忽视的性能陷阱:共享锁的获取不是免费的。每个读者获取共享锁时,都需要原子地修改一个共享的读者计数器——这意味着在多核 CPU 上,频繁获取共享锁会导致计数器所在的 cache line 在多个核心之间弹跳(cache line bouncing)。如果读操作非常短(如读取一个原子标志),cache line 弹跳的开销可能超过读操作本身的计算量,使得读写锁比 std::mutex 更慢。对于这类场景,std::atomic 或无锁设计可能是更好的选择。
规则推导:读写锁适合稳定容器,不适合高频写入热点¶
读写锁能提高吞吐的前提是读操作多、读临界区短、写操作少。 如果写操作频繁,读写锁可能比普通 mutex 更慢,因为内部状态更复杂。
读写锁还涉及公平性。 标准没有保证所有实现都采用同样策略。 某些实现可能读者优先,写者在持续读负载下等待很久。 某些实现可能写者优先,新读者会被阻塞以避免写饥饿。 这会影响实时性:
- 如果 Tracking 读锁长时间占用,Mapping 写地图会延迟。
- 如果写者优先,Tracking 可能被地图更新阻塞。
因此读锁内不要做点云配准、图优化、文件写入。 读锁应只取快照或完成小查询。
工程边界:地图一致性可能跨越多个容器¶
读写锁不是数据库事务。 如果地图状态分布在多个对象中,例如:
keyframes_landmarks_observations_covisibility_graph_
只给其中一个容器加读写锁不能保证整体一致。 要么使用同一把锁保护同一个地图不变量,要么把数据拆成可独立一致的子结构,并清楚定义跨结构同步点。
例如局部地图查询可以先复制关键帧 id 列表,再按 id 查询稳定的只读关键帧对象。 但如果关键帧对象本身也会被回环修正修改,就需要版本号、快照或更高层锁。
代码验证:读快照与写提交¶
下面示例展示 shared_mutex 的基本使用。
读者复制快照,写者独占插入。
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <vector>
class MapIndex {
public:
void addKeyframe(int id) {
std::unique_lock<std::shared_mutex> lock(m_);
keyframe_ids_.push_back(id);
}
std::vector<int> snapshotIds() const {
std::shared_lock<std::shared_mutex> lock(m_);
return keyframe_ids_;
}
std::size_t size() const {
std::shared_lock<std::shared_mutex> lock(m_);
return keyframe_ids_.size();
}
private:
mutable std::shared_mutex m_;
std::vector<int> keyframe_ids_;
};
int main() {
MapIndex map;
std::thread writer([&] {
for (int i = 0; i < 10; ++i) {
map.addKeyframe(i);
}
});
std::thread reader([&] {
for (int i = 0; i < 10; ++i) {
auto ids = map.snapshotIds();
std::cout << "snapshot size = " << ids.size() << '\n';
}
});
writer.join();
reader.join();
std::cout << "final size = " << map.size() << '\n';
}
编译命令示例:
如果读快照很大,应考虑只复制局部子图、只复制 id、或者使用不可变数据块加原子替换。
⚠️ 编程陷阱:读写锁内返回内部引用 错误做法:
const vector<int>& keyframes() const { shared_lock lock(m_); return keyframes_; }现象:函数返回时读锁释放,调用者拿到的引用不再受保护。写者随时可能修改 vector。 根本原因:shared_mutex只保护持锁期间的观察窗口。释放锁后,引用指向的数据重新暴露给写者。 正确做法:在读锁内复制快照返回,或在读锁内完成全部读取操作。💡 概念误区:认为读写锁总比 mutex 快 新手想法:"读写锁允许并发读,所以替换 mutex 一定能提升性能。" 实际上:
shared_mutex的内部状态比mutex更复杂(要维护读者计数和写者等待队列)。如果写操作频繁或读临界区很短,shared_mutex可能比mutex更慢。只有在"读多写少且读操作确实有重叠"的场景下,读写锁才有优势。 正确理解:先用mutex写正确,再用 profiler 证明 mutex 是瓶颈,最后才考虑换shared_mutex。
练习¶
- [分析题]:SLAM 地图查询是典型的"读多写少"场景。讨论在什么条件下
shared_mutex会比mutex更好,在什么条件下更差。 - [代码题]:用
shared_mutex实现一个MapIndex类,支持addKeyframe()(独占锁)和snapshotIds()(共享锁)。用 benchmark 对比一个写者 + 多个读者场景下shared_mutex和mutex的吞吐差异。 - [跨章综合题]:原子操作与内存模型 将介绍 atomic 操作和双缓冲。对于"最新状态发布"场景(可视化只需要最新位姿),
shared_mutex+ 快照和 atomic 双缓冲各有什么优劣?
17.8 deque + mutex 传感器缓冲范式 ⭐⭐⭐⭐⭐¶
传感器缓冲的理论模型:生产者消费者的多频率变体¶
传感器缓冲是 线程管理与互斥同步.5 生产者消费者队列在机器人系统中的特化应用。但与通用的 FIFO 队列不同,传感器缓冲有几个独特的约束,使得它的设计需要额外考量。
多频率异步生产者。 通用生产者消费者队列通常假设一个或少数几个生产者。机器人传感器缓冲面对的是多个不同频率的生产者——IMU 200-400 Hz、LiDAR 10-20 Hz、轮式里程计 50-100 Hz、GPS 1-10 Hz。这些生产者的时钟可能不同步,数据到达的时间可能抖动(jitter),而消费者(如融合线程)需要按时间顺序取出多个传感器的数据并对齐。
时间有序窗口而非简单 FIFO。 通用队列的消费操作是"取走最早的元素"。传感器缓冲的消费操作通常是"取走时间区间 \([t_1, t_2]\) 内的所有元素"——这意味着消费者需要按时间戳查找和截取,而不是简单的 pop_front()。std::deque 适合这种模式:它支持两端高效插入删除(新数据 push_back,过期数据 pop_front),也支持随机访问以进行时间戳二分查找。
"短临界区 + 复制小片段"的设计模式。 传感器回调通常运行在驱动线程或 ROS2 executor 线程中,这些线程有严格的实时约束——回调不能长时间持锁。因此传感器缓冲的设计原则是:回调只做"锁 -> push_back -> 解锁"三步,持锁时间控制在微秒级。消费者取数据时,也应在锁内完成时间戳查找和数据复制(复制到局部变量),然后在锁外进行插值、积分等重计算。绝不能在持锁期间执行积分或矩阵运算——这会让其他传感器回调排队等待,造成数据丢失。
旧数据剪裁(trimming)的必要性。 传感器数据源源不断到来,如果不剪裁旧数据,缓冲区会无限增长。剪裁策略通常基于两个条件:(1) 时间窗口——只保留最近 N 秒的数据;(2) 已消费标记——已经被融合线程取走的数据可以安全删除。剪裁应在持锁时进行,但由于 std::deque 的 pop_front 是 \(O(1)\) 操作,剪裁的持锁时间很短。
工程问题:IMU、LiDAR、Odom 频率不同但必须按时间对齐¶
机器人传感器不是同步到达的。 典型频率可能是:
- IMU:200Hz 到 1000Hz。
- LiDAR:10Hz 到 20Hz。
- Camera:30Hz 到 60Hz。
- Wheel Odom:50Hz 到 100Hz。
LiDAR 去畸变需要某一帧点云时间段内的 IMU。 紧耦合里程计需要点云时间戳附近的 odom 或预积分状态。 ROS 回调线程负责接收消息,处理线程负责取一段时间窗的数据。
通用结构是:
std::deque 很适合这个范式:
- 尾部
push_back高效。 - 头部
pop_front高效。 - 不要求像
vector那样整体连续搬移元素。 - 适合滑动时间窗。
std::vector 也能保存传感器数据,但频繁从头部删除会搬移大量元素。
std::queue 又不能方便地遍历和按时间裁剪。
因此 deque 是传感器缓冲的常见折中。
反面失败:回调里做重处理¶
反面写法是在 LiDAR 回调里直接等待 IMU、去畸变、配准、优化并发布。 这会把传感器接收路径变成重计算路径。 当某次配准耗时变长,回调积压,DDS/ROS 缓冲开始堆积或丢消息。
另一个反面写法是多个回调无锁写同一个 deque:
#include <deque>
struct ImuMsg {
double stamp = 0.0;
};
std::deque<ImuMsg> imu_buffer;
void imuCallback(const ImuMsg& msg) {
imu_buffer.push_back(msg); // 反例:如果处理线程也访问,会 data race。
}
容器本身不会因为只在一端 push、另一端 pop 就自动线程安全。 只要并发访问同一容器,仍然需要同步。
抽象不变量:缓冲区维护时间有序窗口¶
传感器缓冲区应维护几个不变量:
- 每个 deque 内部按时间戳非递减排列。
- 回调只做入队和轻量统计。
- 处理线程按目标时间窗取数据。
- 已经早于保留窗口的数据会被裁剪。
- 所有访问同一 deque 的操作都持有同一把 mutex。
LiDAR 去畸变常需要 [scan_start, scan_end] 内的 IMU。
为了插值,还可能需要 scan_start 之前最近的一条 IMU 和 scan_end 之后第一条 IMU。
因此裁剪时不能简单删除所有 < scan_start 的数据。
常见做法是保留一个 margin:
具体 margin 取决于传感器频率、插值方法和时间同步误差。
规则推导:取时间窗时优先复制小片段¶
处理线程有两种读取方式:
- 锁内遍历并复制目标时间窗,锁外处理。
- 锁内移动走一段数据,所有权转交处理线程。
对于 IMU 这种消息小、频率高的数据,复制一段窗口通常可以接受。 对于点云这种消息大、移动成本低的数据,常用智能指针或移动语义传递。
关键规则是:不要持有缓冲 mutex 做点云配准或优化。 锁只保护 deque 的结构和时间窗选择。 算法计算使用局部副本或移动出来的数据。
工程边界:时间同步失败要显式表达¶
如果 LiDAR 帧到达时 IMU 数据还没覆盖到 scan_end,处理线程不能假装成功。
可选策略包括:
- 等待更多 IMU 数据,直到超时。
- 暂时缓存 LiDAR 帧,下一轮再处理。
- 丢弃该 LiDAR 帧并记录原因。
- 使用外推,但必须标记精度风险。
这些策略的工程含义不同。 在线定位通常宁可短暂等待或丢帧,也不要无声使用错误时间窗。 建图离线处理可以等待完整数据。
代码验证:IMU 时间窗提取和旧数据剪裁¶
下面示例可单文件编译。 它模拟 IMU 回调入队,处理线程提取某个 LiDAR 时间窗所需的 IMU 数据。
#include <deque>
#include <iostream>
#include <mutex>
#include <optional>
#include <vector>
struct ImuMsg {
double stamp = 0.0;
double ax = 0.0;
};
class ImuBuffer {
public:
void push(ImuMsg msg) {
std::lock_guard<std::mutex> lock(m_);
if (!imu_.empty() && msg.stamp < imu_.back().stamp) {
out_of_order_count_++;
return;
}
imu_.push_back(msg);
}
std::optional<std::vector<ImuMsg>> extractWindow(double start, double end) {
std::lock_guard<std::mutex> lock(m_);
if (imu_.empty() || imu_.front().stamp > start || imu_.back().stamp < end) {
return std::nullopt;
}
std::vector<ImuMsg> window;
for (const auto& msg : imu_) {
if (msg.stamp >= start && msg.stamp <= end) {
window.push_back(msg);
}
}
const double keep_from = start - 0.1;
while (imu_.size() > 1 && imu_[1].stamp < keep_from) {
imu_.pop_front();
}
return window;
}
std::size_t size() const {
std::lock_guard<std::mutex> lock(m_);
return imu_.size();
}
int outOfOrderCount() const {
std::lock_guard<std::mutex> lock(m_);
return out_of_order_count_;
}
private:
mutable std::mutex m_;
std::deque<ImuMsg> imu_;
int out_of_order_count_ = 0;
};
int main() {
ImuBuffer buffer;
for (int i = 0; i < 50; ++i) {
buffer.push(ImuMsg{0.01 * i, 1.0});
}
auto window = buffer.extractWindow(0.12, 0.20);
if (!window) {
std::cout << "not enough imu data\n";
return 0;
}
std::cout << "imu samples in window = " << window->size() << '\n';
std::cout << "buffer size after trim = " << buffer.size() << '\n';
std::cout << "out of order = " << buffer.outOfOrderCount() << '\n';
}
这个例子没有用条件变量。
如果处理线程需要等待 IMU 覆盖 LiDAR 结束时间,可以给缓冲区增加 condition_variable,在 push() 后通知等待者。
等待谓词应检查 !imu_.empty() && imu_.back().stamp >= end 或关闭标志。
⚠️ 编程陷阱:在传感器回调中执行重计算 错误做法:在 LiDAR 回调函数里直接做去畸变、配准、优化并发布结果。 现象:当某次配准耗时变长,回调积压,DDS/ROS 缓冲堆积或丢消息。系统出现周期性卡顿。 根本原因:回调函数应该尽快返回。把重计算塞进回调等于让传感器驱动等待算法完成。 正确做法:回调只做短临界区入队(
push_back+ unlock),重计算由独立处理线程消费队列完成。💡 概念误区:认为
std::deque是线程安全的 新手想法:"deque一端 push 一端 pop,两端不互相影响,应该不需要锁。" 实际上:std::deque的内部结构(分块数组指针和 size)是跨所有操作共享的。即使一个线程只push_back、另一个只pop_front,两者都会修改size和可能触发内部指针重新分配。标准容器不是线程安全的。 正确理解:只要并发访问同一容器(无论是同一端还是不同端),都需要同步。无锁 SPSC 队列是专门设计的数据结构(原子操作与内存模型),不是deque加了"天然分离"就能实现的。
练习¶
- [分析题]:解释为什么 IMU 缓冲区的裁剪不能简单地删除所有
stamp < scan_start的数据。需要保留什么 margin?margin 的大小取决于什么? - [代码题]:给
ImuBuffer添加condition_variable,让处理线程在 IMU 数据未覆盖到scan_end时阻塞等待。编写测试:先推入不够的 IMU 数据,验证处理线程阻塞;再推入足够的数据,验证处理线程醒来。 - [跨章综合题]:原子操作与内存模型 将介绍 SPSC ring buffer。对比
deque + mutex和 SPSC ring buffer 在 IMU 缓冲场景中的适用条件。什么情况下 SPSC 更合适?什么情况下deque + mutex更灵活?
17.9 std::chrono:时间点、持续时间与计时器 ⭐⭐⭐⭐¶
工程问题:机器人代码里有两种时间¶
机器人系统至少同时使用两类时间:
- 传感器时间戳:来自硬件、驱动或 ROS message,用于数据同步和状态估计。
- 程序计时:用于测量函数耗时、超时、周期任务和性能统计。
这两类时间不能混用。 传感器时间可能跟随仿真时间、ROS time 或设备时钟。 性能计时应该使用单调时钟,避免系统时间被 NTP 或用户调整导致耗时为负。
std::chrono 提供类型安全的时间表达:
duration表示一段时间,例如std::chrono::milliseconds。time_point表示某个时钟上的时刻。steady_clock单调递增,适合性能计时和超时。system_clock表示系统实时时钟,适合转换日历时间或日志时间。high_resolution_clock不保证语义,可能是前两者之一的别名。
SLAM 性能计时优先用 steady_clock。
反面失败:用 double 表示所有时间¶
很多代码把时间都写成 double seconds。
这简单,但会丢失语义。
问题是读者不知道 now() 来自系统时间、ROS time 还是单调时钟。
单位也可能混乱:秒、毫秒、纳秒都可能被塞进 double。
当函数参数写成 double timeout,调用者也很难知道应该传 0.1 还是 100。
std::chrono 用类型把单位写清楚:
这比裸 100 更难误用。
抽象不变量:同一段耗时必须来自同一时钟¶
计时不变量是:
两个
time_point只有来自同一个 clock,差值才有明确含义。
steady_clock::now() 和 system_clock::now() 不能混减。
编译器会阻止这种错误。
这就是 chrono 类型系统的价值。
性能计时器的 RAII 模式也很自然:
- 构造时记录开始时刻。
- 析构时记录结束时刻。
- 输出或累计耗时。
这适合函数级耗时统计,尤其在异常路径上也能记录耗时。
规则推导:ROS2 timer 和 chrono literals¶
C++14 引入 chrono literals:
ROS2 C++ API 常接受 chrono duration 创建 wall timer:
create_wall_timer 表达的是墙上时间周期,不等于传感器消息时间戳。
仿真时间、ROS time、message header stamp 是另一个时间体系。
写机器人代码时要在命名中区分:
stamp:传感器或消息时间戳。received_time:本机收到消息的时间。elapsed:程序耗时。period:周期任务间隔。timeout:等待上限。
工程边界:计时器本身也会扰动系统¶
性能计时不是免费操作。
高频热路径里每个点、每个残差都调用 now() 会明显增加开销。
合理粒度通常是:
- 每帧 Tracking 耗时。
- 每次局部地图更新耗时。
- 每次回环检测耗时。
- 每次优化求解耗时。
- 队列等待时间和队列长度统计。
计时输出也不要在锁内频繁写 std::cout。
日志 I/O 可能持有内部锁并阻塞。
更好的方式是把耗时写入统计对象或低频日志。
代码验证:ScopedTimer 和耗时统计¶
下面代码可单文件编译。
ScopedTimer 使用 steady_clock,析构时输出耗时。
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
class ScopedTimer {
public:
explicit ScopedTimer(std::string name)
: name_(std::move(name)),
start_(Clock::now()) {}
~ScopedTimer() {
const auto end = Clock::now();
const auto us = std::chrono::duration_cast<std::chrono::microseconds>(
end - start_);
std::cout << name_ << " took " << us.count() << " us\n";
}
private:
using Clock = std::chrono::steady_clock;
std::string name_;
Clock::time_point start_;
};
int main() {
using namespace std::chrono_literals;
{
ScopedTimer timer("local mapping");
std::this_thread::sleep_for(12ms);
}
}
编译命令示例:
真实项目里可以把输出替换成统计累加器。 例如记录最近 100 帧 Tracking 平均耗时、最大耗时和超时次数。
⚠️ 编程陷阱:用
double seconds表示所有时间并混用不同时钟 错误做法:传感器时间戳和性能计时都用double seconds,函数参数写double timeout不注明单位。 现象:调用者不知道timeout是秒、毫秒还是微秒。传感器时间来自 ROS time,性能计时来自steady_clock,两者混减导致负数或巨大时间差。 根本原因:double不携带单位和时钟来源信息。std::chrono的类型系统能在编译期阻止不同时钟的time_point混合运算。 正确做法:性能计时用steady_clock,持续时间用chrono::milliseconds等类型,传感器时间戳在命名中标注来源(如sensor_stamp、received_time)。
练习¶
- [分析题]:解释为什么性能计时应使用
steady_clock而不是system_clock。如果系统时间被 NTP 向前调整 1 秒,两种时钟测出的耗时分别是什么? - [代码题]:编写一个
ScopedTimer类,支持在析构时将耗时累加到一个统计对象中(而不是打印到cout)。统计对象记录调用次数、总耗时、最大耗时。 - [跨章综合题]:原子操作与内存模型 将介绍
std::atomic计数器。如果统计对象的total_us和call_count被多个线程同时写入,需要什么同步机制?用 mutex 还是 atomic?
17.10 async、future、promise:任务级异步 ⭐⭐⭐⭐¶
线程级并发 vs 任务级异步:两种并发抽象的本质差异¶
在进入 std::async 和 std::future 的具体 API 之前,需要理解一个更高层次的概念区分:**线程级并发**和**任务级异步**是两种根本不同的并发抽象,适用于不同的问题结构。
线程级并发的心智模型。 使用 std::thread 时,程序员直接管理操作系统线程——创建线程、决定线程执行什么代码、管理线程的生命周期(join/detach)、处理线程之间的同步(mutex、condition_variable)。这种抽象层次很低——程序员需要自己处理所有细节,包括线程数量、线程与 CPU 核心的映射、线程的停止协议等。线程级并发的优势是完全的控制权;代价是大量的样板代码和容易出错的同步逻辑。
任务级异步的心智模型。 使用 std::async 和 std::future 时,程序员不直接创建和管理线程——而是提交一个”任务”(可调用对象),由运行时决定如何执行它(可能创建新线程、可能在线程池中执行、可能延迟到 get() 时同步执行)。程序员只关心”任务何时有结果”和”结果是什么”,不关心”哪个线程在执行这个任务”。这种抽象层次更高——减少了样板代码,但也减少了控制权。
选择框架。 两种抽象适合不同的任务结构:
| 特征 | 线程级(std::thread) |
任务级(std::async/future) |
|---|---|---|
| 生命周期 | 长期运行的循环 | 一次性计算并返回结果 |
| 状态管理 | 线程维护自己的状态 | 无状态,输入 -> 计算 -> 输出 |
| 通信方式 | 共享内存 + mutex/atomic | future/promise 传递结果 |
| 数量 | 通常少量(3-5 个) | 可以大量(每个任务一个 future) |
| 停止机制 | 需要显式停止协议 | 取消支持有限(标准不保证) |
| 异常传播 | 手动通过 promise 或队列 | future::get() 自动重新抛出 |
SLAM 系统中,Tracking/Mapping/LoopClosing 是长期循环,适合线程级管理。后台保存地图、一次性优化、批量描述子计算是一次性任务,适合任务级异步。不要把所有并发都用线程管理——任务级异步能显著简化代码。
工程问题:不是所有并发都需要手动管理线程¶
std::thread 适合长期存在的后台循环,例如 Mapping、LoopClosing、传感器处理线程。
但有些任务只是”一次性计算并返回结果”:
- 后台保存地图。
- 计算一批描述子。
- 对一个子图运行全局优化。
- 加载词袋或配置。
- 执行一次耗时统计汇总。
这类任务不一定需要手动创建线程对象、设计循环和停止协议。
std::async、std::future、std::promise 提供任务级异步抽象。
future 表示未来某个结果。
get() 阻塞等待结果并取得值。
如果任务中抛出异常,异常会在 get() 时重新抛出。
这比 std::thread 入口异常直接终止程序更适合一次性任务。
反面失败:忽略 future 让异步变同步¶
std::async 有一个容易误踩的点:如果使用 std::launch::async 启动任务,但返回的临时 future 立即析构,析构可能等待任务完成。
这样看似异步,实际在语句结束时阻塞。
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
void saveMap() {
using namespace std::chrono_literals;
std::this_thread::sleep_for(100ms);
std::cout << "map saved\n";
}
int main() {
std::async(std::launch::async, saveMap);
std::cout << "this may print after saveMap\n";
}
很多编译器会警告忽略了 std::async 的返回值。
正确做法是保存 future,在明确的边界 wait() 或 get()。
auto save_future = std::async(std::launch::async, saveMap);
std::cout << "tracking can continue\n";
save_future.wait();
另一个边界是 launch policy。
如果不写 std::launch::async,实现可以选择延迟执行。
延迟任务会在 get() 或 wait() 时才运行,不一定创建新线程。
需要真正后台执行时,应显式写 std::launch::async。
抽象不变量:future 是一次性结果通道¶
std::future<T> 的不变量是:
它连接一个异步共享状态,最多成功
get()一次。
get() 会取走结果。
之后这个 future 不再持有有效结果。
如果多个消费者都需要观察同一个结果,可以使用 std::shared_future<T>。
std::promise<T> 是写入端。
它常用于手动创建线程时把结果或异常交给外部:
- 创建
promise。 - 从
promise取得future。 - 把
promise移动进线程入口。 - 线程调用
set_value()或set_exception()。 - 外部通过
future.get()取得结果或异常。
规则推导:选择 thread、async、promise 的方式¶
可以用下面的规则做第一判断:
| 需求 | 首选工具 |
|---|---|
| 长期循环、需要停止协议 | std::jthread 或 RAII 管理的 std::thread |
| 一次性计算并返回值 | std::async + std::future |
| 手动线程要传回结果或异常 | std::promise + std::future |
| 多生产者多消费者消息流 | mutex + condition_variable 队列 |
| 高频共享状态标志 | std::atomic 或锁保护状态 |
std::async 不等于线程池。
标准库不保证它复用线程。
如果每帧都 std::async(std::launch::async, ...) 创建大量小任务,开销可能高于收益。
对于高频小任务,应考虑固定线程池、任务队列或算法内部并行库。
工程边界:future 析构阻塞和共享状态生命周期¶
来自 std::async(std::launch::async, ...) 的最后一个 future 析构时可能阻塞等待任务结束。
这会在两个地方制造隐蔽延迟:
- 局部变量离开作用域。
- 容器清空或析构一组 future。
因此任务异步不是“发出去就不管”。
如果任务可能长时间运行,要明确设计等待边界和取消边界。
C++ 标准库的 future 本身没有取消接口。
需要取消时,应把 std::stop_token、原子标志或关闭队列作为任务参数。
代码验证:异步优化结果和异常传播¶
下面代码展示后台任务返回结果,以及异常在 get() 时传播。
#include <chrono>
#include <future>
#include <iostream>
#include <stdexcept>
#include <thread>
struct OptimizeResult {
int iterations = 0;
double final_error = 0.0;
};
OptimizeResult runGraphOptimization(bool fail) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(30ms);
if (fail) {
throw std::runtime_error("linear solver failed");
}
return OptimizeResult{5, 0.012};
}
int main() {
auto ok_future = std::async(
std::launch::async,
runGraphOptimization,
false);
auto bad_future = std::async(
std::launch::async,
runGraphOptimization,
true);
const OptimizeResult ok = ok_future.get();
std::cout << "iterations = " << ok.iterations
<< ", error = " << ok.final_error << '\n';
try {
const OptimizeResult bad = bad_future.get();
std::cout << bad.final_error << '\n';
} catch (const std::exception& e) {
std::cout << "optimization error: " << e.what() << '\n';
}
}
编译命令示例:
在 SLAM 后端中,future 适合表达”这次优化最终会给出一个结果”。
它不适合表达持续不断的关键帧流。
⚠️ 编程陷阱:忽略
std::async的返回值导致同步执行 错误做法:std::async(std::launch::async, saveMap);不保存返回的future。 现象:临时future在语句结束时析构,析构可能等待任务完成。看似异步,实际阻塞。 根本原因:来自std::async(std::launch::async, ...)的最后一个future析构时会阻塞等待任务结束。 正确做法:保存future到变量,在明确的边界wait()或get()。💡 概念误区:认为
std::async是线程池 新手想法:”std::async每次调用都高效地复用线程。” 实际上:标准库不保证std::async复用线程。每次调用可能创建新线程。如果高频调用(每帧、每点),线程创建销毁的开销可能超过计算本身。 正确理解:std::async适合低频、一次性、返回结果的任务。高频小任务应使用固定线程池或算法内部并行库。
练习¶
- [分析题]:对比
std::thread+std::promise和std::async+std::future两种方式传回结果。各自的优劣是什么? - [代码题]:用
std::async实现后台保存地图功能。主线程在 Tracking 继续运行的同时等待保存完成。处理保存成功和保存异常两种情况。 - [跨章综合题]:C++语言核心/错误处理与异常安全 讲了异常安全和
std::exception_ptr。std::future::get()会重新抛出任务中的异常——解释这个机制如何与 C++语言核心/错误处理与异常安全 的异常传播模型配合,以及为什么线程入口函数不能直接抛异常给主线程。
17.11 volatile 不是线程同步 ⭐⭐⭐⭐⭐¶
核心结论与定位¶
很多开发者从 Java、C# 或嵌入式 C 经验出发,会把线程退出标志写成 volatile bool running = true;,再由一个线程写、另一个线程读。在 C++ 中这不是线程同步——volatile 不提供原子性,不建立 happens-before,不保证跨线程可见性,因此仍然是 data race。它只表达“每次访问都应作为可观察副作用”,典型用途是内存映射 I/O,而不是 mutex 或 atomic。
// 反例:volatile 退出标志在 C++ 中仍然 data race
volatile bool stop = false;
void loop() { while (!stop) { /* work */ } } // 另一线程写 stop = true
线程退出标志应改用 std::atomic<bool>、mutex 保护状态、条件变量或 std::stop_token:
📌 本节只做边界提醒,完整论证见 18.8。
volatile与atomic的语义对比、为什么在 x86 上"似乎能工作"而在 ARM/RISC-V 上失效、Java/C# 的volatile为什么不同、以及"不要用 volatile 修复优化级别导致的并发问题"等内容,统一在「原子操作与内存模型」的 18.8volatile不是线程同步 中展开。本节只补充一个 17 章语境下的独有边界:信号处理。
工程边界:信号处理中的 volatile sig_atomic_t¶
C++ 和 C 的信号处理有特殊边界。
异步信号处理函数能安全做的事很少。
常见写法是设置一个 volatile std::sig_atomic_t 标志,让主循环稍后观察。
#include <csignal>
#include <iostream>
volatile std::sig_atomic_t interrupted = 0;
extern "C" void handleSignal(int) {
interrupted = 1;
}
int main() {
std::signal(SIGINT, handleSignal);
while (!interrupted) {
// main loop
}
std::cout << "interrupted\n";
}
这不是普通线程同步模式。
它服务于信号处理的极窄边界。
如果是两个 C++ 线程之间通信,应换成 atomic、锁或停止 token。
(用 std::atomic<bool> 表达退出标志的完整可运行示例见 18.8。)
🧠 思维陷阱:把 Java/C# 的
volatile语义搬到 C++ 新手想法:"Java 里volatile能保证线程可见性,C++ 里应该也一样。" 实际上:Java 和 C# 的volatile有内存屏障和可见性语义;C++ 的volatile没有。跨语言迁移时,不能把写法平移。C++ 线程通信应使用 atomic、mutex 或条件变量。 正确思维:看到volatile用在线程通信场景中,应视为潜在 bug,用 ThreadSanitizer 验证。
练习¶
- [分析题]:解释 C++
volatile和 Javavolatile的语义差异。为什么 C++ 选择不给volatile加线程同步语义? - [代码题]:编写一个使用
volatile bool stop的程序(反例),用 ThreadSanitizer 检测 data race。然后改为std::atomic<bool>,验证 data race 消失。 - [跨章综合题]:C++语言核心/预处理器与宏 的条件编译宏和本节的
volatile都是容易被误用的 C/C++ 特性。总结"看起来能用但语义错误"的共同特征:编译能过、在某些平台看似正常、但标准层面是错误的。
17.12 并发调试与性能分析工具 ⭐⭐⭐⭐¶
为什么需要专用调试工具¶
并发 bug 和单线程 bug 有本质区别。单线程 bug 通常是确定性的——给定相同输入,程序每次都以相同方式失败,可以用 printf 或断点逐步定位。并发 bug 依赖线程调度时序、CPU 缓存状态和系统负载,同一段代码在开发机上运行一万次都正常,在 Jetson ARM 板上或 CI 的高负载环境中却偶发崩溃。更糟的是,加入调试用的 printf 或断点可能改变线程调度时序,让 bug 消失——这就是 Heisenbug。
这意味着"跑了很多次没出问题"不能作为正确性论据。需要用专门的编译时插桩工具来系统性地检测并发问题。
本质洞察:并发调试工具的价值不是"找到你已经知道的 bug",而是"发现你不知道存在的 bug"。ThreadSanitizer 能检测到大多数 data race,即使它们在你的测试中从未表现出可观察的错误行为。一个在 x86 上"碰巧正确"的程序,可能在 ARM 上立刻崩溃——TSan 能在 x86 上就发现这个问题。
ThreadSanitizer(TSan):data race 检测的标准工具 ⭐⭐⭐⭐¶
ThreadSanitizer 是 Clang 和 GCC 内置的动态分析工具。它在编译时对每一次内存访问插桩,运行时跟踪所有线程的访问历史,如果发现两个线程无同步地访问同一内存位置(至少一个是写),就报告 data race。
使用方式:
# 编译时加 -fsanitize=thread
g++ -std=c++20 -O1 -g -fsanitize=thread my_code.cpp -o my_code
# 直接运行,TSan 会在终端输出 data race 报告
./my_code
TSan 的报告包含两个访问的调用栈——一个写操作和一个冲突的读/写操作。报告还会指出创建这两个线程的位置。工程上应把 TSan 测试纳入 CI 流水线,作为常规检查项。
TSan 有几个工程边界需要注意。首先,它会使程序变慢 5-15 倍,内存占用增加 5-10 倍——这意味着测试用例需要调整输入规模和超时时间。其次,TSan 只能检测到实际执行路径上的 data race——如果某个分支没有被测试覆盖,TSan 不会分析它。第三,TSan 不检测逻辑竞态(race condition)——消除了 data race 不代表程序逻辑正确。
如果 TSan 在你的并发代码中报告了 0 个 data race,这是一个好消息。但如果你的测试覆盖率低,这只说明"被测试的路径"没有 data race。增加测试覆盖率和压力测试(多次运行、不同线程数、不同调度时序)能显著提高 TSan 的检测效果。
Valgrind Helgrind/DRD:备用 data race 检测器 ⭐⭐⭐¶
Helgrind 和 DRD 是 Valgrind 套件中的两个并发错误检测工具。与 TSan 的编译时插桩不同,它们在运行时通过二进制翻译(binary translation)拦截内存访问。优势是不需要重新编译(直接 valgrind --tool=helgrind ./my_code),劣势是速度更慢(20-100 倍)且误报率更高。
在 TSan 可用的平台上,优先使用 TSan。Helgrind/DRD 适合无法重新编译的场景(如分析第三方库)或交叉验证 TSan 的结果。
perf 与火焰图:定位锁竞争和调度延迟 ⭐⭐⭐⭐¶
perf 是 Linux 的系统性能分析工具,能直接从硬件性能计数器读取数据,几乎零开销。在并发场景中,perf 特别适合定位三类问题:
- 锁竞争:
perf lock record && perf lock report能显示每把锁的等待时间和争用次数。如果map_mutex的等待时间占比超过 10%,说明锁粒度需要优化。 - CPU 利用率:
perf stat -e task-clock,context-switches,cpu-migrations ./my_code能显示上下文切换和 CPU 迁移次数。高频上下文切换通常说明线程过多或锁竞争激烈。 - 热点函数:
perf record -g ./my_code && perf report或生成火焰图(FlameGraph),能直观看到 CPU 时间花在哪些函数和调用路径上。
火焰图在分析并发系统时尤其有价值——它能同时展示所有线程的 CPU 时间分布,让你一眼看出哪个线程在空转、哪个线程被锁阻塞。
# 录制 CPU 采样数据(采样频率 999Hz,记录调用栈)
perf record -F 999 -g ./mini_slam_pipeline
# 生成火焰图(需要 FlameGraph 工具)
perf script | stackcollapse-perf.pl | flamegraph.pl > flamegraph.svg
GDB 并发调试技巧 ⭐⭐⭐¶
GDB 对多线程程序有基本支持,但调试并发问题时需要特殊技巧。
# 查看所有线程的回溯(死锁排查的第一步)
gdb -batch -ex "thread apply all bt" ./my_code core
# 在 GDB 中手动切换线程
(gdb) info threads
(gdb) thread 3
(gdb) bt
死锁排查时,thread apply all bt 是最有价值的命令——它能显示每个线程卡在哪个系统调用上。如果两个线程都卡在 __lll_lock_wait(Linux 的 futex 等待),并且各自在等对方持有的 mutex,就确认了死锁。
⚠️ 编程陷阱:在高优化级别下调试并发问题 错误做法:用
-O2或-O3编译后直接用 GDB 调试并发 bug。 现象:变量被优化掉,断点命中位置与源码不符,单步执行跳来跳去。 根本原因:编译器优化会内联函数、消除变量、重排代码。这些优化在单线程中不改变行为,但在调试器中观察时会产生困惑。 正确做法:用-O0 -g或-Og -g编译调试版本。TSan 建议使用-O1 -g(平衡检测能力和优化干扰)。
练习¶
- [分析题]:列出 ThreadSanitizer、Helgrind、perf 和 GDB 各自适合检测的并发问题类型。画一个"问题→工具"的映射表。
- [代码题]:编写一个包含 data race 的简单程序,分别用 TSan 和 Helgrind 检测。对比两个工具的报告格式和信息详细程度。
- [跨章综合题]:C++语言核心/预处理器与宏 用
-E和nm -C验证宏和链接层面的正确性。本节用 TSan 验证并发正确性,用 perf 定位性能瓶颈。整理本课程中所有出现过的"编译/链接/运行时验证工具",形成一个按问题域分类的工具箱。
17.13 累积项目:Mini SLAM Concurrent Pipeline ⭐⭐⭐⭐⭐¶
项目目标¶
本章项目实现一个简化的并发 SLAM 流水线。 它不做真实图像处理、点云配准或图优化,而是把并发结构做完整:
MessageQueue<T>:线程安全阻塞队列,支持push()、pop()、close()。- Tracking:按固定周期产生关键帧,推给 Mapping。
- Mapping:消费关键帧,模拟局部建图,向 LoopClosing 提交候选关键帧。
- LoopClosing:消费候选关键帧,模拟回环检测。
- 停止协议:Tracking 结束后关闭 Mapping 队列,Mapping 结束后关闭 LoopClosing 队列。
- 计时:使用
steady_clock统计模块耗时。 - 测试目标:验证消息数量、关闭顺序、无忙等、无 joinable 线程泄漏。
项目结构建议:
mini_slam_concurrent/
message_queue.hpp
scoped_timer.hpp
pipeline.hpp
main.cpp
tests/
message_queue_test.cpp
pipeline_test.cpp
本章只给出一个单文件参考实现,方便直接编译运行。 真正放入课程项目时,可以按上面的结构拆分。
MessageQueue<T>:队列关闭是协议的一部分¶
队列接口选择 std::optional<T> pop()。
返回 std::nullopt 表示队列已经关闭且没有剩余数据。
这样消费循环可以自然写成:
这比使用特殊哨兵消息更通用。
如果 T 本身也可能表示“停止”,协议会混乱。
关闭状态属于队列,不属于业务消息。
Tracking、Mapping、LoopClosing 的数据流¶
数据流如下:
Tracking
产生 Keyframe
push 到 mapping_queue
Mapping
pop Keyframe
更新局部地图计数
每隔若干关键帧 push LoopCandidate
mapping_queue 关闭且耗尽后,关闭 loop_queue
LoopClosing
pop LoopCandidate
模拟检测
loop_queue 关闭且耗尽后退出
这里 Tracking 没有直接调用 Mapping。 Mapping 也没有直接调用 LoopClosing。 模块之间只通过队列交换消息。 这使得每个线程的生命周期和背压点更清楚。
停止协议¶
停止协议按数据流方向传播:
- Tracking 生产完关键帧,调用
mapping_queue.close()。 - Mapping 消费完所有关键帧,调用
loop_queue.close()。 - LoopClosing 消费完所有候选,退出。
- 主线程 join 所有线程。
如果要支持外部提前停止,可以让队列也支持 close() 后拒绝新消息。
生产者捕获异常或检查返回值后退出。
C++20 版本还可以把 std::stop_token 传给各模块,在循环中同时检查停止请求。
单文件参考实现¶
下面代码可用 C++17 编译。 它包含队列、计时器、三阶段流水线和简单统计。
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
template <typename T>
class MessageQueue {
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(m_);
if (closed_) {
throw std::runtime_error("push to closed queue");
}
q_.push(std::move(value));
}
cv_.notify_one();
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [this] {
return closed_ || !q_.empty();
});
if (q_.empty()) {
return std::nullopt;
}
T value = std::move(q_.front());
q_.pop();
return value;
}
void close() {
{
std::lock_guard<std::mutex> lock(m_);
closed_ = true;
}
cv_.notify_all();
}
std::size_t size() const {
std::lock_guard<std::mutex> lock(m_);
return q_.size();
}
private:
mutable std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
bool closed_ = false;
};
class ScopedTimer {
public:
explicit ScopedTimer(std::string name)
: name_(std::move(name)),
start_(Clock::now()) {}
~ScopedTimer() {
const auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - start_);
std::lock_guard<std::mutex> lock(outputMutex());
std::cout << name_ << " took " << elapsed.count() << " us\n";
}
private:
using Clock = std::chrono::steady_clock;
static std::mutex& outputMutex() {
static std::mutex m;
return m;
}
std::string name_;
Clock::time_point start_;
};
struct Keyframe {
int id = 0;
};
struct LoopCandidate {
int keyframe_id = 0;
};
struct PipelineStats {
int tracked = 0;
int mapped = 0;
int loop_checked = 0;
};
class JoinOnExit {
public:
explicit JoinOnExit(std::thread& thread) : thread_(thread) {}
~JoinOnExit() {
if (thread_.joinable()) {
thread_.join();
}
}
JoinOnExit(const JoinOnExit&) = delete;
JoinOnExit& operator=(const JoinOnExit&) = delete;
private:
std::thread& thread_;
};
class MiniSlamPipeline {
public:
explicit MiniSlamPipeline(int keyframe_count)
: keyframe_count_(keyframe_count) {}
void run() {
std::thread tracking_thread(&MiniSlamPipeline::trackingLoop, this);
JoinOnExit join_tracking(tracking_thread);
std::thread mapping_thread(&MiniSlamPipeline::mappingLoop, this);
JoinOnExit join_mapping(mapping_thread);
std::thread loop_thread(&MiniSlamPipeline::loopClosingLoop, this);
JoinOnExit join_loop(loop_thread);
}
PipelineStats stats() const {
std::lock_guard<std::mutex> lock(stats_mutex_);
return stats_;
}
private:
void trackingLoop() {
using namespace std::chrono_literals;
ScopedTimer timer("Tracking");
for (int i = 0; i < keyframe_count_; ++i) {
std::this_thread::sleep_for(5ms);
mapping_queue_.push(Keyframe{i});
{
std::lock_guard<std::mutex> lock(stats_mutex_);
stats_.tracked++;
}
}
mapping_queue_.close();
}
void mappingLoop() {
using namespace std::chrono_literals;
ScopedTimer timer("Mapping");
while (auto keyframe = mapping_queue_.pop()) {
std::this_thread::sleep_for(8ms);
{
std::lock_guard<std::mutex> lock(stats_mutex_);
stats_.mapped++;
}
if (keyframe->id % 3 == 0) {
loop_queue_.push(LoopCandidate{keyframe->id});
}
}
loop_queue_.close();
}
void loopClosingLoop() {
using namespace std::chrono_literals;
ScopedTimer timer("LoopClosing");
while (auto candidate = loop_queue_.pop()) {
std::this_thread::sleep_for(12ms);
{
std::lock_guard<std::mutex> lock(stats_mutex_);
stats_.loop_checked++;
}
std::lock_guard<std::mutex> lock(output_mutex_);
std::cout << "checked loop candidate "
<< candidate->keyframe_id << '\n';
}
}
int keyframe_count_ = 0;
MessageQueue<Keyframe> mapping_queue_;
MessageQueue<LoopCandidate> loop_queue_;
mutable std::mutex stats_mutex_;
PipelineStats stats_;
std::mutex output_mutex_;
};
int main() {
MiniSlamPipeline pipeline(10);
pipeline.run();
const PipelineStats stats = pipeline.stats();
std::cout << "tracked = " << stats.tracked << '\n';
std::cout << "mapped = " << stats.mapped << '\n';
std::cout << "loop checked = " << stats.loop_checked << '\n';
if (stats.tracked != 10 || stats.mapped != 10 || stats.loop_checked != 4) {
return 1;
}
return 0;
}
编译命令示例:
测试目标¶
项目测试不应只看“能打印日志”。 至少要覆盖这些行为:
- 空队列
pop()会阻塞,close()后返回std::nullopt。 push()后消费者能收到消息。close()后继续push()会失败。- Tracking 产生的关键帧数量等于 Mapping 消费数量。
- Mapping 关闭后 LoopClosing 能自然退出。
- 所有线程都被 join,没有 joinable 线程对象进入析构。
- 统计值在锁保护下读写。
- 计时使用
steady_clock。
如果测试框架支持超时,应给并发测试设置短超时。 并发测试一旦卡住,超时比无限等待更容易定位。
工程边界¶
这个 Mini Pipeline 仍然简化了真实系统:
- 没有容量限制,因此没有背压。
- 没有外部提前停止请求。
- 没有异常聚合和错误上报。
- 没有地图对象和读写锁。
- 没有传感器时间同步。
这些不是缺陷,而是分层。 本项目先验证线程生命周期、队列关闭和模块解耦。 下一步再加入容量、地图快照、传感器缓冲和错误传播。
本章小结¶
本章的主线不是“记住几个并发 API”,而是建立机器人系统里的并发不变量。
std::thread 让一个入口函数在新的执行流里运行,但线程对象本身仍是 C++ 对象。
joinable 对象析构会终止程序,线程入口异常不能逃出。
因此线程生命周期必须由 RAII 管理。
C++17 可以写线程守卫,C++20 优先考虑 std::jthread 和 std::stop_token。
mutex 保护的是共享数据不变量。
data race 是 C++ 语义层的未定义行为,不是偶发小错误。
lock_guard 适合短临界区,unique_lock 适合条件变量和灵活锁管理,scoped_lock 适合一次获取多把锁。
锁粒度要服务不变量:锁太粗会拖慢实时路径,锁太细会拆碎一致性。
条件变量表达等待-通知。
通知只是提示状态可能变化,谓词才是真相。
wait(lock, pred) 同时处理虚假唤醒和丢失通知边界。
生产者消费者队列必须把关闭状态纳入协议。
deadlock 的四个必要条件是互斥、持有并等待、不可剥夺、循环等待。
工程上主要通过锁排序、std::scoped_lock、缩短临界区和减少多锁访问来破坏循环等待。
shared_mutex 适合读多写少地图查询,但它只保护读锁持有期间的观察窗口。
读锁内返回内部引用仍然危险。
读写锁也有饥饿和公平性边界,不能把重计算放进读锁。
传感器缓冲常用 deque + mutex。
回调短临界区入队,处理线程按时间窗复制或移动数据,旧数据按 margin 剪裁。
时间同步失败必须显式表达,不能无声外推。
std::chrono 让时间单位和时钟类型进入类型系统。
性能计时使用 steady_clock。
系统时间、传感器时间、ROS time 和程序耗时应在命名和类型上区分。
std::async、future、promise 适合一次性任务结果和异常传播。
它们不是线程池,也不适合高频消息流。
忽略 future 可能让异步任务在临时对象析构时阻塞。
C++ volatile 不是线程同步工具。
线程通信应使用 atomic、mutex、条件变量、停止 token 或 future/promise。
跨语言经验不能直接迁移到 C++。
回顾本章的核心脉络:从 17.1 的"为什么机器人系统选择多线程"出发,建立了"延迟隔离"和"所有权边界"的设计动机。17.2-17.3 把线程对象和 RAII 生命周期管理讲清楚。17.4-17.5 从 data race 和条件变量建立了互斥同步的基础——mutex 保护不变量,条件变量表达等待-通知。17.6 的死锁和 17.7 的读写锁处理了多锁和读多写少的工程场景。17.8-17.9 把传感器缓冲和时间管理纳入并发设计。17.10-17.11 补充了任务级异步和 volatile 的边界。每一节不是孤立的 API 教学,而是围绕"如何在多线程环境下维护机器人系统的数据不变量"这一核心问题展开的不同侧面。
如果只能从本章带走一个判断框架,应该是这样的:面对一个并发设计问题时,先问五个问题——实时路径是什么?哪些任务可以滞后?哪些对象跨线程共享?每把锁保护什么不变量?停止时如何退出?这五个问题回答清楚后,线程数量、锁粒度、队列设计和停止协议自然就确定了。不要从"用哪个 API"出发设计,而要从"保护什么不变量"出发。
延伸阅读¶
- Anthony Williams, C++ Concurrency in Action, 2nd Edition:C++ 并发编程的系统教材,覆盖线程、锁、条件变量、atomic 和内存模型。⭐⭐
- cppreference:
std::thread、std::jthread、std::mutex、std::condition_variable、std::shared_mutex、std::future、std::chrono页面。⭐⭐ - Scott Meyers, Effective Modern C++:并发 API、
std::async、std::thread、std::future相关条款(Item 35-40)。⭐⭐ - Herb Sutter, atomic<> Weapons(CppCon 2012):理解 atomic 和内存序的经典材料,是 原子操作与内存模型 的预备读物。⭐⭐⭐
- ROS2 文档:Executors、Callback Groups、Timers、Parameters 和 message timestamp 的使用边界。⭐⭐⭐
- ORB-SLAM3 源码(UZ-SLAMLab/ORB_SLAM3):Tracking、LocalMapping、LoopClosing 的线程协作和多 mutex 结构。⭐⭐⭐
- LIO-SAM 源码(TixiaoShan/LIO-SAM):IMU、LiDAR、Odom 回调中的 deque 缓冲和时间窗处理。⭐⭐⭐
- chenshuo muduo:工业 C++ 网络库中的线程、锁、条件变量和事件循环封装,适合学习生产级并发代码风格。⭐⭐⭐
- ThreadSanitizer 文档(clang.llvm.org/docs/ThreadSanitizer.html):data race 检测工具的使用指南和局限性。⭐⭐
- Brendan Gregg, Systems Performance, 2nd Edition:性能分析方法论,含 perf、火焰图和 eBPF 工具链的系统讲解。⭐⭐⭐
🔧 故障排查手册¶
| 症状 | 可能原因 | 排查步骤 | 相关章节 |
|---|---|---|---|
| 程序退出时直接终止 | joinable 的 std::thread 进入析构 |
1. 检查异常路径是否跳过 join() 2. 检查类析构函数是否收束线程 3. 用 ThreadGuard 或 jthread 修复 |
17.2-17.3 |
| 程序偶尔卡死 | 死锁或条件变量谓词错误 | 1. gdb -batch -ex “thread apply all bt” 查看线程状态 2. 检查多锁路径是否有统一顺序 3. 检查是否持锁调用外部回调 |
17.6 |
| CPU 占用高但没处理数据 | 队列为空时忙等 | 1. 检查消费循环是否用 condition_variable 2. 确认不是 continue 自旋 3. 用 perf top 确认热点函数 |
17.5 |
| 消费线程无法退出 | 队列没有”关闭”状态 | 1. 检查 close() 是否调用 notify_all() 2. 检查谓词是否包含 closed_ |
17.5 |
| 偶发崩溃在容器内部 | 容器并发访问或迭代器失效 | 1. 用 TSan 检测 data race 2. 检查锁内是否返回引用/迭代器给锁外使用 | 17.4 |
| Tracking 延迟周期性升高 | 实时路径被后台任务持锁阻塞 | 1. 统计每阶段耗时和队列长度 2. 检查锁内是否做重计算 3. 用 perf 火焰图定位持锁热点 | 17.4, 17.7 |
| 条件变量偶尔漏消息 | 通知被当作消息本身 | 1. 确认使用谓词版本 wait(lock, pred) 2. 确认谓词检查 closed_ \|\| !q_.empty() |
17.5 |
std::async 没有并行效果 |
未保存 future 或未指定 launch::async |
1. 确认 future 被保存到变量 2. 显式指定 std::launch::async |
17.10 |
volatile 后仍行为异常 |
volatile 不是同步工具 | 1. 搜索线程通信中的 volatile 2. 替换为 atomic 或 mutex |
17.11 |
| 读写锁没有提升性能 | 读临界区太长或写操作频繁 | 1. 用 perf 测量读写锁开销 2. 比较与普通 mutex 的吞吐 | 17.7 |
| 传感器同步不稳定 | 时间窗不完整或传感器/接收时间混用 | 1. 检查缓冲区是否按时间戳有序 2. 检查 LiDAR 时间窗是否被 IMU 覆盖 3. 检查剪裁是否保留插值 margin | 17.8 |