3. 线程间的等待与通知

ZaynPei Lv6

这一章是继上一章“保护共享数据”之后的进阶,核心是解决线程间的“等待”与“通知”问题,即一个线程如何高效地等待另一个线程完成某个操作。

等待一个事件

假如你正在坐一趟夜间火车, 你需要在正确的站点下车。现在有三种策略可以选择:

策略1:忙等待 (Busy-Waiting), 也就是整晚不睡,每到一站都探出头去看。优点是绝对不会错过下车点, 但缺点是极度疲劳(浪费CPU)。

策略2:轮询休眠 (Polling with Sleep), 也就是看一眼时间表,估计一个大致时间,设个闹钟。优点是可以休息,比一直忙等待好很多;缺点是时间很难把握。火车晚点?你被早早吵醒,还是得等(引入延迟)。闹钟出问题?你睡过站了(bug)。

策略3:条件变量 (Ideal Solution), 这是最优的策略, 你可以安心睡觉,让乘务员在火车到站时(事件发生时)来唤醒你。它不浪费精力(CPU),也不会错过(无延迟)。

回到多线程编程中,常见的场景是一个线程需要等待另一个线程完成某个任务后才能继续执行。例如,主线程可能需要等待工作线程完成数据处理后才能使用处理结果。而只有互斥量是无法满足这种需求的,因为互斥量只能保护共享数据的访问,而不能实现线程间的等待与通知

对于这个任务, 与上述示例对应的策略1就是忙等待 (Busy-Waiting): 一个线程在一个 while 循环中不断地加锁检查标志、解锁

1
2
3
4
5
6
7
8
// 策略1:忙等待(非常糟糕)
while(true) {
std::lock_guard<std::mutex> lk(m);
if(flag) {
break; // 条件满足
}
// 锁被释放,但循环立即再次开始
}
缺点是灾难性的: 不仅浪费CPU, 等待线程在 while(true) 中空转,消耗宝贵的CPU时间片; 而且还会引发锁竞争:等待线程(消费者)为了检查 flag 而频繁加锁,这会阻塞那个唯一能设置 flag 的线程(生产者)!

对应的策略2是轮询休眠 (Polling with Sleep), 也就是在每次检查标志之前让等待线程休眠一段时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool flag;
std::mutex m;

void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1. 关键:在休眠前必须解锁
// 2. 休眠,不占用CPU
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lk.lock(); // 3. 重新加锁,准备在 while 循环中再次检查
}
}
优点: - 不空转:线程在 sleep_for 期间是休眠的,不消耗CPU。 - 释放锁:它很聪明地在休眠前解锁了互斥量,这使得生产者线程可以在此期间获取锁并设置 flag。

缺点是休眠时间无法确定: - 太短(如1毫秒):和“忙等待”几乎一样,依然有大量的加锁/解锁开销。 - 太长(如100毫秒):生产者可能在第1毫秒就设置了 flag,但消费者线程要睡到第100毫秒才醒来。这引入了99毫秒的延迟 (Latency)。

而策略3:条件变量 (Condition Variable) 则是理想的解决方案。它允许一个线程等待某个条件,并在条件满足时被另一个线程通知。条件变量内部会自动处理线程的休眠唤醒,避免了忙等待和轮询休眠的问题。

条件变量(condition_variable)

条件变量是 C++11 标准库提供的一个同步原语,定义在 <condition_variable> 头文件中。它允许线程在等待某个条件时进入休眠状态,并在条件满足时被其他线程唤醒。目前有两种主要的条件变量类型: - std::condition_variable:首选, 它性能更高,但只能和 std::mutex 配合使用。 - std::condition_variable_any:更通用(因此叫 _any), 它可以和任何满足“可锁定”要求的锁(如 std::shared_mutex)配合使用。 - 因为更通用,所以它有额外的开销(体积、性能),应避免在 std::mutex 场景下使用。

条件变量必须和一个互斥量 (Mutex) 配合使用。互斥量保护的是共享数据, 但同时也是条件变量本身, 因为条件变量检查和修改条件时需要保证原子性。

下面是一个经典的生产者-消费者示例,展示了如何使用条件变量来实现线程间的等待与通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::mutex mut;
std::queue<data_chunk> data_queue; // 1. 被保护的“条件”
std::condition_variable data_cond;

// 生产者线程
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(mut); // 生产者加锁
data_queue.push(data); // 2. 修改“条件”
data_cond.notify_one(); // 3. 唤醒一个等待者(“嘿,有新数据了!”)
}
}

// 消费者线程
void data_processing_thread()
{
while(true)
{
// 4. 必须使用 unique_lock,不能用 lock_guard
std::unique_lock<std::mutex> lk(mut);

// 5. 核心:等待
data_cond.wait(lk, []{ return !data_queue.empty(); });

data_chunk data = data_queue.front();
data_queue.pop();

// 6. 尽早解锁
lk.unlock();

process(data);
// ...
}
}

data_cond.wait(lk, []{return !data_queue.empty();}); 这一行是理解本章的关键。它有两个参数:一个锁(lk)和一个谓词(predicate,即lambda函数)。

wait() 函数的内部逻辑如下: - 检查谓词:wait 首先调用 lambda 函数 []{ return !data_queue.empty(); }。 - 如果谓词为 true(队列不为空):wait 函数立即返回, 线程继续持有锁 lk,向下执行(去 pop 数据)。 - 如果谓词为 false(队列是空的):wait 函数释放互斥锁 lk, 并将当前线程(消费者)置于休眠/等待状态。 - 被唤醒后:当生产者线程调用 data_cond.notify_one() 时,等待的消费者线程会被唤醒。wait 函数会重新获取锁 lk,然后再次检查谓词。 - 如果谓词现在为 true:wait 函数返回,线程继续执行。(这是为了防止“虚假唤醒”, 即没有notify的唤醒却醒来的情况) - 如果谓词仍为 false:wait 函数会再次释放锁并进入休眠,直到被再次唤醒。

也就是 wait() 函数一开始是没有锁的, 因此必须和互斥量配合使用,先加锁再调用 wait()

还要注意, 与条件变量配合使用的锁必须是 std::unique_lock<std::mutex> 类型,不能使用 std::lock_guard<std::mutex>。这是因为 wait() 函数需要在等待时释放锁,而 lock_guard 不支持中途解锁操作。

使用条件变量构建线程安全队列

在上一节, 我们构造了一个使用条件变量的“生产者-消费者”代码。那个例子是有效的,但它是“一次性”的:mut、data_queue 和 data_cond 都是全局变量,紧密耦合在两个特定的函数中。

本节的目标是重构这些代码,将其封装成一个通用的、可复用的、线程安全的 threadsafe_queue 类。

当然, 这一切的灵感来源还是来自 std::queue 标准实现的固有缺陷. 它的主要接口有以下三类:

  • empty() / size() (检查状态)
  • front() / back() (获取元素)
  • push() / pop() / emplace() (修改队列)

问题是, 这些接口的依旧存在固有条件竞争, 这与我们在第3章中讨论的 std::stack 有完全相同的问题。

std::queue 的接口迫使你将“获取”和“删除”分成两个步骤:

  • T& value = my_queue.front(); (检查/获取)
  • my_queue.pop(); (修改/使用)

在多线程环境下,在这两个调用之间存在一个“间隙”,可能导致数据被重复处理或丢失。同样,if (!my_queue.empty()) { ... } 也是一个经典的 TOCTTOU 竞争。

为了解决这些竞争,我们必须将“检查”、“获取”和“删除”合并成原子的操作。

此外,因为这是一个用于并发环境的队列,我们必须考虑一个新问题:当队列为空时,消费者想 pop 怎么办?

在第3章的 stack 示例中,我们只是抛出了一个异常。但在第4章,我们有了条件变量,所以我们可以提供一个更好的选项:等待 (Wait)。这导致我们设计出两种 pop 操作:

  • try_pop()(非阻塞):“尝试 pop”。如果队列中有数据,就获取它并返回 true。如果队列是空的,不要等待,立即返回 false

  • wait_and_pop()(阻塞):“等待并 pop”。如果队列中有数据,就获取它。如果队列是空的,就使用条件变量进入休眠,直到生产者 push 了数据并通知它。

为了兼顾异常安全和灵活性,我们为 pop 操作提供了和 stack 一样的两种重载:

  • try_pop(T& value):通过引用返回,(如果成功)返回 bool。

  • try_pop():返回 std::shared_ptr。(如果失败)返回 nullptr。

  • wait_and_pop(T& value):通过引用返回,(阻塞直到成功)。

  • wait_and_pop():返回 std::shared_ptr,(阻塞直到成功)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 1 互斥量必须是可变的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut); // 复制时加锁
data_queue=other.data_queue;
}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut); // push 只是一个快速的、非阻塞的操作,所以它使用最高效的 std::lock_guard 来保护队列。
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut); // wait 必须使用 unique_lock
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front(); // 传出参数
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); // 构造智能指针传出
data_queue.pop();
return res;
}

bool try_pop(T& value) // 非阻塞版本
{
std::lock_guard<std::mutex> lk(mut); // 不需要 unique_lock,因为不需要等待
if(data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop() // 非阻塞版本
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const // 声明为 const 成员函数
{
std::lock_guard<std::mutex> lk(mut); // 读取时加锁, 因为加锁也是修改, 所以 mut 必须是 mutable
return data_queue.empty();
}
};

这里需要注意的是 mutable 关键字的使用。因为 empty() 函数是一个 const 成员函数,它不能修改类的任何成员变量。然而,为了保证线程安全,我们需要在 empty() 中加锁, 而锁定互斥量是一个修改(non-const)互斥量本身的操作。因此,我们将互斥量 mut 声明为 mutable,这样即使在 const 成员函数中也可以对其进行修改(加锁/解锁)。 > mutable 关键字的含义是允许在 const 成员函数中修改该成员变量。它通常用于那些逻辑上不影响对象状态的成员变量,例如缓存或互斥量。

除了notify_one(),条件变量还有另一个通知函数:notify_all()。它们的区别在于唤醒的线程数量: - notify_one():当 push 一个数据项时,我们只唤醒一个消费者线程。这是最高效的,因为只有一个线程能成功获取该数据项。如果唤醒所有线程,其他线程醒来后会发现队列又空了(因为被第一个线程拿走了),然后再次进入休眠,这被称为“惊群效应 (Thundering Herd)”。 - notify_all():适用于“广播”事件, 唤醒所有等待该事件的线程。例如,当一个共享配置被更新,或系统即将关闭时,你希望所有等待这个事件的线程都醒来并作出反应。

使用 std::future 等待一次性事件

在上一节, 我们学习了“条件变量”,它非常适合于“重复性”的等待(例如,一个消费者反复等待一个队列变为非空)。

而本节引入了一种完全不同的同步模型,它专为“一次性事件” (one-off event) 而设计。 > “一次性事件”是指在程序的某个时间点上,你预期会发生一次的事件。一旦它发生了,它就永远处于“已发生”状态,不能被重置。

C++标准库(在 <future> 头文件中)将这种模型称为“期望” (Future)。 - std::future<T>:代表一个未来的 T 类型的值。 - std::future<void>:如果事件没有关联的数据(只是一个“信号”),则使用 void 特化版。

关键特性:一个 future 一旦变为“就绪”状态,它就不能被重置

期望有两种类型 :std::future vs. std::shared_future - std::future<T> (唯一期望), 可以类比:std::unique_ptr<T>。 - 所有权:它代表了对异步结果唯一所有权。 - 不可拷贝 (Non-copyable), 只可移动 (Move-only)。 - 含义:在任何时刻,只有一个 std::future 对象可以关联到那个“一次性事件”的结果。你可以通过 std::move 将这个所有权从一个线程转移到另一个线程,但永远只有一张“登机牌”。 - 限制:get() 成员函数(用于获取结果)只能被调用一次。调用后,future 对象内部的状态就变了(不再持有值)。

  • std::shared_future<T> (共享期望), 类比 std::shared_ptr<T>
    • 所有权:它代表了对异步结果的共享所有权
    • 可以拷贝 (Copyable), 将一个 std::shared_future 拷贝多份,分发给多个不同的线程。当“事件”发生时,所有这些拷贝的 shared_future 对象会同时变为“就绪”
    • 限制:get() 成员函数可以被多次调用(每个拷贝对象都可以调用)。(注意:get() 在 shared_future 上返回的是 const T&,而不是 T)。

future 对象的主要成员函数如下: - .get():阻塞等待任务完成并获取返回值。只能调用一次,之后 future 变为无效状态。 - .wait():阻塞等待任务完成,但不获取返回值。适用于你只关心任务是否完成,而不需要结果的情况。 - .valid():检查 future 对象是否关联了一个有效的异步任务。如果 future 是默认构造的,或者 get() 已经被调用过,它将返回 false。 - .wait_for().wait_until():允许你以超时的方式等待任务完成。例如,你可以等待最多 100 毫秒,如果任务在这段时间内没有完成,你可以选择继续做其他事情。 - .share():将一个 std::future<T> 转换为 std::shared_future<T>。这允许你将同一个异步结果共享给多个线程。

带返回值的后台任务(std::async)

在只使用 std::thread 的情况下,如果你想让一个新线程计算一个值并返回给主线程,你必须手动实现一套复杂的同步机制:

  • 在主线程创建一个共享变量(如 int result)。
  • 创建一个 std::mutex 来保护这个变量。
  • (可能还需要一个 std::condition_variable 和一个 bool 标志)来通知主线程“计算已完成”。
  • 主线程需要加锁、等待、然后才能安全地读取 result。

这个过程非常繁琐且容易出错。

C++11在 <future> 头文件中提供了一个高级函数模板 std::async,它将启动任务返回结果这两个过程完美地封装了起来。

它是一个函数,你像 std::thread 一样传递给它一个“任务”(函数、lambda、可调用对象), 之后它(通常)会启动一个新线程来执行这个任务。

不同于 std::thread 的是, 它立即返回一个 std::future<T> 对象, T 就是你传递的那个任务的返回值类型。

你可以在主线程继续执行其他操作而不需要阻塞等待, 当你需要那个计算结果时,你对 future 对象调用 .get() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <future>
#include <iostream>

int find_the_answer_to_ltuae(); // 一个耗时的计算任务
void do_other_stuff(); // 主线程要做的其他工作

int main()
{
// 1. 启动异步任务, 构造一个 future 对象接收结果
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);

// 2. 主线程并发执行其他工作
do_other_stuff();

// 3. 当需要结果时,调用 .get()
// 如果任务还没完成,get() 会阻塞等待直到结果可用
std::cout << "The answer is " << the_answer.get() << std::endl;
}

此外, 对于 std::async, 其实不一定会启动一个新线程。具体实现取决于它的启动策略 (Launch Policies)

你可以通过一个 std::launch 类型的可选第一参数来指定“启动策略”:

  • std::launch::async (异步执行): 强制 std::async 必须在一个新线程上异步执行任务,就像 std::thread 一样。例如auto f6 = std::async(std::launch::async, Y(), 1.2);

  • std::launch::deferred (延迟执行): 强制 std::async 不要启动新线程。例如auto f7 = std::async(std::launch::deferred, baz, std::ref(x));

    • 那任务何时执行? 任务会被“延迟”,直到你调用 .get().wait()时, 它会在调用 .get() 的那个线程上(即主线程)同步地执行。
    • 类似于Lazy - loading(惰性加载)
    • 某种程度上来说, 这并不是真正的“异步并发”操作, 因为任务实际上还是在主线程上执行的, 只不过被推迟了。
  • std::launch::async | std::launch::deferred (默认策略): 当不设定时, 意味着你把决定权交给了C++标准库。库的实现可以自由选择:它可能会启动一个新线程(async),也可能会将其标记为“延迟”(deferred)。

    • 这是为了“任务超额”时的自动负载均衡。如果库发现你已经启动了1000个线程,快耗尽资源了,它可能会自动将你的新任务切换为 deferred 模式,以避免系统崩溃。
    • 如果你确定你需要真正的并发执行,最好显式指定 std::launch::async。

下面是 std::async 传递参数的规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct X
{
void foo(int, std::string const&);
std::string bar(std::string const&);
};
X x;

// f1: 调用 x.foo(42, "hello")
auto f1 = std::async(&X::foo, &x, 42, "hello");

// f2: 调用 tmpx.bar("goodbye"),tmpx 是 x 的拷贝副本
auto f2 = std::async(&X::bar, x, "goodbye");

struct Y
{
double operator()(double);
};
Y y;

// f3: 调用 Y() 构造的临时对象的 operator()(3.141)
auto f3 = std::async(Y(), 3.141);

// f4: 调用 y.operator()(2.718),y 是通过引用传递的
auto f4 = std::async(std::ref(y), 2.718);

class move_only
{
// ... (一个只能移动,不能拷贝的类) ...
void operator()();
};

// f5: 调用一个通过移动构造的 move_only 临时对象的 operator()
auto f5 = std::async(move_only());

任务与期望 (std::packaged_task)

上一节, 我们学习了 std::async。它是一个“高级”工具,像一个黑盒:你给它一个任务,它自己决定如何运行(新线程或延迟),然后给你一个 future。

本节介绍了一个更“底层”、更灵活的工具:std::packaged_task

std::packaged_task 的名字就说明了它的作用:它是一个“被打包的任务”。

  • 它是一个类模板,它将一个可调用对象(函数、lambda等)与一个期望 (future) 绑定(打包)在一起。

  • 核心机制:

    • 你创建一个 packaged_task 对象时,它内部就包含了一个准备好的 future。
    • packaged_task 本身也是一个可调用对象(它有 operator())。
    • 当你调用这个 packaged_task 对象时,它会执行内部绑定的函数,然后自动将函数的返回值(或抛出的异常)存储到 future 中,使 future 变为“就绪”状态。这样,你就可以在另一个线程中等待这个 future,并获取结果。

std::async vs std::packaged_task 对比

它们最大的区别在于“谁来执行任务”:

std::async:执行与绑定合一。

  • 当你调用 std::async 时,你不仅是“打包”了任务,你还同时“命令”C++运行时去执行它(要么马上在新线程,要么延迟)。

std::packaged_task:执行与绑定分离。

  • 当你创建 packaged_task 时,你只是“打包”了任务,任务并不会被执行

  • 你得到了一个可以被传来传去的 task 对象。执行这个任务的时间和地点(即在哪个线程上)完全由你决定。

为什么这种分离很重要? 因为它允许你实现 std::async 无法做到的高级模式,例如:

  • 线程池:你可以创建100个 packaged_task,把它们全都塞进一个队列,然后让一个固定的线程池(比如8个线程)去队列里取任务并执行

  • 特定线程执行(清单 4.9):你可以把一个 packaged_task 发送到一个特定的线程(例如,GUI线程)去执行。

packaged_task 的模板与用法

模板参数:std::packaged_task<R(Args...)> , 它接受一个函数签名作为模板参数。例如: std::packaged_task<std::string(std::vector<char>*, int)>

  • R (即 std::string):这是返回值类型。它决定了 get_future() 返回的 future 类型,即 std::future<std::string>
  • Args… (即 std::vector<char>*, int):这是参数列表。它决定了 packaged_task 对象自己的 operator() 接受什么参数,即 task(my_vector, 42);

打包好的task对象有两个主要成员函数: - .get_future():返回与该任务关联的 std::future 对象。 - .operator() (Args…):调用该任务,传递参数并执行绑定的函数。 - 或者直接传递给 std::thread 构造函数。

标准用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 1. 一个要执行的函数
int my_task(std::string s) { return s.length(); }

// 2. 打包任务,指定函数签名
std::packaged_task<int(std::string)> task(my_task);

// 3. 关键:在任务被“移走”之前,获取 future
std::future<int> result = task.get_future();

// 4. 将任务交给新线程执行
// 注意:packaged_task 和 thread 一样,不可拷贝,只能移动 (std::move)
std::thread t(std::move(task), "hello");
t.detach();

// 5. 在主线程或其他地方等待结果
std::cout << "Result: " << result.get() << std::endl; // 会阻塞直到 t 运行了 task

下面是一个 packaged_task 最典型的应用场景:任务分发

假设有一个多线程程序,但只有 gui_thread 才能更新UI。如果一个后台线程(比如网络线程)想在UI上显示“下载完成”,它该怎么办?它不能直接调用UI函数。

解决方案是, 后台线程必须把“更新UI”这个任务(一个函数)发送给 gui_thread 去执行。并且,后台线程可能还想知道 gui_thread 何时完成了这个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 全局的、受互斥量保护的任务队列
std::mutex m;
std::deque<std::packaged_task<void()>> tasks; // 只接受“无参、无返回”的任务

// 1. GUI 线程的执行函数
void gui_thread()
{
while(!gui_shutdown_message_received()) // 2. GUI 循环, 只要没收到关闭消息就继续
{
get_and_process_gui_message(); // 3. 处理自己的事 (如鼠标点击)

std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty()) // 4. 检查任务队列
continue;

task = std::move(tasks.front()); // 5. 从队列中“窃取”一个任务
tasks.pop_front();
}
task(); // 6. 关键:在 GUI 线程上“执行”这个任务
}
}

// 7. 任何其他线程都可以调用的“发布”函数
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f); // 8. 打包任务
std::future<void> res = task.get_future(); // 9. 获取 future

std::lock_guard<std::mutex> lk(m); // 10. 加锁队列
tasks.push_back(std::move(task)); // 11. 将“任务”移入队列

return res; // 12. 立即将“future”返回给调用者
}

总之, std::packaged_task 是一个强大的中间件。它将任务的定义(你想做什么)与任务的执行(何时、何地做)以及结果的获取(future)分离开来。这使它成为实现线程池任务队列特定线程调度(如GUI)等高级并发模式的基础构件。

使用 std::promise

这是创建 std::future 的第三种,也是最底层、最灵活的方式。

  • std::async:自动创建 future + 自动运行任务。

  • std::packaged_task:自动创建 future + 手动运行任务(task())。

  • std::promise:自动创建 future + 手动设置值(set_value())。

std::promise 实现了 “承诺/期望” (Promise/Future) 模型。它将设置值的“入口”(承诺)与获取值的“出口”(期望)完全分离开来。

  • std::promise<T>:一个“承诺”对象,它承诺在未来某个时刻会提供一个 T 类型的值。它就是那个“生产者”或“事件触发者”。
    • 在之前的std::async和std::packaged_task中, 结果的设置是自动完成的(任务执行完毕后自动设置结果), 且一般就是函数的返回值。
    • 而在 std::promise 中,生产者线程需要显式地手动调用 promise 对象的 set_value() 方法来提供结果, 可以是任何值, 也可以在任何时间点调用(不一定是函数返回时)。
  • std::future<T>:一个“期望”对象,它期望从对应的 promise 那里获取一个 T 类型的值。它就是那个“消费者”或“等待者”。

工作机制:

  1. 创建:你首先创建一个 std::promise<T> 对象, 例如std::promise<int> p;

  2. 获取 Future:你立即从 promise 中获取其唯一关联的 future 对象std::future<int> f = p.get_future();

  3. 分发:这是最关键的一步。你将 f 和 p 分发到不同的线程

    • std::future f -> 发送给等待线程(消费者)。
    • std::promise p -> 发送给(或保留在)工作线程(生产者)。
    • (注意:std::promise 和 std::future 一样,是“只移动”的,所以跨线程传递时需要 std::move)。
  4. 等待:消费者线程在需要结果时调用 f.get(),它会在这里阻塞

  5. 履约:生产者线程在计算出结果(例如 result = 42)或者决定发送某种事件/状态通知后,手动显式调用 p.set_value(result)

  6. 唤醒:set_value() 的调用是一个原子事件。它会使 f 的状态变为“就绪”,并立即唤醒正在 f.get() 上阻塞的消费者线程,消费者线程随后会收到值 42。

使用场景

std::promise 适用于异步事件,特别是那些不由函数返回值触发的事件。

“多网络连接”是一个完美的例子:一个高性能服务器需要同时处理上千个网络连接。

错误方案:为每个连接创建一个线程(如 std::async)。这会创建上千个线程,耗尽系统资源,导致上下文切换风暴,性能崩溃。

正确方案:使用 IO多路复用事件循环。一个(或少数几个)线程处理所有的网络读写事件。

promise 如何解决同步问题?

场景:“逻辑线程A”想通过网络发送一个数据包,并且需要确认数据包何时被真正发送。流程:

  • 逻辑线程A: std::promise<bool> p; std::future<bool> f = p.get_future();

  • 逻辑线程A:创建一个 outgoing_packet 对象,这个对象同时包含 datastd::move(p)

  • 逻辑线程A:将这个 packet 放入一个队列,交给“网络线程”。

  • 逻辑线程A:调用 f.get()。它在这里阻塞,等待网络线程的确认。

  • 网络线程:在其事件循环中,从队列取出 packet,将 data 发送到操作系统Socket。

  • 网络线程:当操作系统确认“发送完成”时,它调用 packet.promise.set_value(true);

  • 唤醒:set_value() 使 f 变为就绪,逻辑线程A 从 f.get() 唤醒,得知发送成功。

也就是说, 逻辑线程A 和 网络线程 之间通过 promise/future 实现了异步事件通知,而不是通过函数调用栈或任务完成来传递结果。

std::promise 提供了一种灵活的机制,允许你在任何时间点任何线程手动触发事件,并将结果传递给等待的线程,而不依赖于函数调用栈或任务执行的完成。这使得它非常适合处理复杂的异步工作流事件驱动的编程模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
std::unordered_map<int, std::promise<payload_type>> incoming_promises_;
// 网络/IO线程处理连接集合的事件循环
void process_connections(connection_set& connections)
{
while(!done(connections)) // 1. 如果还有连接未完成
{
for(connection_iterator ... ) // 2. 遍历所有连接
{
if(connection->has_incoming_data()) // 3. 检查【入站】事件
{
data_packet data = connection->incoming(); // 读取数据

// 4. 查找与此数据ID关联的 "承诺", 这个get_promise() 应该加锁访问保存的promise哈希表, promise和id应该在连接建立时由外部线程注册
std::promise<payload_type>& p =
connection->get_promise(data.id);

p.set_value(data.payload); // 履约:将数据发给等待者
}

if(connection->has_outgoing_data()) // 5. 检查【出站】事件
{
outgoing_packet data =
connection->top_of_outgoing_queue();

connection->send(data.payload); // 发送数据

// 6. 履约:告诉请求者“发送已完成”, 这里的data.promise 应该是 outgoing_packet 结构的一部分, 它在创建 outgoing_packet 时由外部线程传入
data.promise.set_value(true);
}
}
}
}
// 可能的其他部分代码
std::future<payload_type> Connection::send_request_async(...)
{
// ... 其他初始化代码 ...

{ // 互斥量作用域开始
std::lock_guard<std::mutex> lock(mutex_); // << 业务线程加锁 >>

// 业务线程安全地修改共享表:
// 1. 获取下一个 ID
// 2. 将 Promise 对象插入共享的 map 中
incoming_promises_.emplace(request_id, std::move(p));
} // 互斥量作用域结束,业务线程解锁

// ... 后续操作,如将发送请求推入出站队列 ...

return f;
}

std::promise<payload_type>& Connection::get_promise(int id)
{
std::lock_guard<std::mutex> lock(mutex_); // << I/O 线程加锁 >>

// I/O 线程安全地查找共享表
auto it = incoming_promises_.find(id);

// ... 查找和错误处理 ...

return it->second;
} // I/O 线程解锁

这个流程描述了一个典型的 业务线程 (T1) 如何通过 I/O 线程 (T2) 向数据中心完成一次异步的请求-响应循环。

  • 业务线程 (T1):发起请求,等待最终结果。
  • I/O 线程 (T2):运行 I/O 事件循环,负责网络读写和结果通知。
  • Connection 对象 (共享):包含 mutex、incoming_promises_ 映射表(用于入站匹配)和 outgoing_queue_(用于出站任务)。
步骤 角色 动作描述 核心机制
1 业务线程 (T1) 创建请求任务:创建两个 Promise/Future 对:a. 响应匹配:P_Resp(用于等待外部响应,需注册 ID);b. 发送确认:P_Sent(用于等待本地发送完成)。 std::promise
2 任务打包 (T1) 将请求数据、ID 和 P_Sent 一起打包成 outgoing_packet。 出站 Promise:P_Sent 随任务数据一起传递
3 注册响应 (T1) 加锁并将响应 Promise (P_Resp) 与 ID 注册到共享的 incoming_promises_ 映射表中。 入站 Promise 存储在共享 Map 中
4 提交任务 (T1) 将打包好的 outgoing_packet 放入 outgoing_queue_;T1 在 F_Resp.get() 上阻塞,等待外部响应。 outgoing_queue_
5 I/O 线程执行 (T2) 循环处理:T2 检查 outgoing_queue_ 并取出任务进行发送。 I/O Loop
6 兑现出站 (T2) 执行发送(connection->send());发送完成后,T2 调用 data.promise.set_value(true) 以兑现 P_Sent。 outgoing_packet 内嵌的 promise
7 确认完成 (T1) T1 的另一个 future (F_Sent) 被唤醒,确认请求已离站;随后 T1 可继续等待或处理最终响应(由 F_Resp 提供)。 本地同步完成
8 外部返回 外部系统处理完 T2 发出的请求,生成响应,并在数据包中原样带回 T1 提供的 ID。 网络协议:保证 ID 的回显
9 I/O 线程接收 (T2) T2 在事件循环中通过 if(connection->has_incoming_data()) 接收到响应包。 I/O Loop
10 查找匹配 (T2) 加锁:T2 从响应包中提取 ID,并用该 ID 到共享的 incoming_promises_ 映射表中查找等待响应的 P_Resp。 connection->get_promise(data.id)
11 兑现入站 (T2) 通知:T2 将响应的 Payload 存入找到的 P_Resp。 p.set_value(data.payload)
12 业务线程完成 (T1) 获取结果:T1 在 F_Resp.get() 上的阻塞被解除,立即获取到 Payload 数据。 std::future 自动唤醒
13 清理 (T2) T2 将该 ID 及其 P_Resp 从共享映射表中移除,释放资源。 Map 清理

之所以两个promise/future对分开,是因为它们代表了两个不同的异步事件: - 发送完成:这是一个本地事件,表示数据包已成功发送出去。直接将promise与outgoing_packet绑定即可。 - 响应到达:这是一个远程事件,表示外部系统已处理请求并返回了结果。需要通过ID在共享映射表中查找对应的promise, 因为响应可能会乱序到达, 必须通过ID进行匹配。 ### 为“future”存储“异常”

这一节讨论的是一个对于并发编程至关重要的“健壮性”问题:如果在后台异步执行的任务(无论是通过 std::async、std::packaged_task 还是 std::promise)没有成功返回一个值,而是抛出了一个异常,那么等待这个结果的线程会怎么样?

在普通的单线程(同步)代码中,异常会沿着调用栈向上传播,你可以用 try…catch 块来捕获它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
double square_root(double x) {
if(x < 0) {
throw std::out_of_range("x<0");
}
return sqrt(x);
}

void sync_call() {
try {
double y = square_root(-1); // 异常在这里被抛出
} catch (std::out_of_range& e) {
// 异常在这里被捕获
}
}

但在异步(多线程)代码中,情况完全不同:

1
2
3
4
5
6
7
8
9
void async_call() {
std::future<double> f = std::async(square_root, -1);
// ... (主线程继续执行)

// 异常在“后台线程”中被抛出。
// 它无法“跳”到主线程的调用栈。

double y = f.get(); // 那么,这里会发生什么?
}

这个问题的解决方案是:future 会“运输”异常

C++的“期望”机制提供了一个优雅的解决方案:如果后台任务抛出了异常,这个异常会被捕获并存储在 future 的共享状态中。

当等待线程调用 .get() 时,它不会得到一个值,而是会重新抛出 (re-throw) 那个被存储的异常

因此, 你可以在等待线程中使用 try…catch 块来捕获这个异常,就像在同步代码中一样。

1
2
3
4
5
6
7
8
9
10
void async_call_correct() {
std::future<double> f = std::async(square_root, -1);
// ... (主线程继续执行)

try {
double y = f.get(); // 异常在这里被“重新抛出”
} catch (std::out_of_range& e) {
// 异常在这里被主线程成功捕获!
}
}

三种机制的异常处理方式

future 只是一个“容器”,异常是如何被“放”进去的,取决于你是如何创建这个 future 的。

std::asyncstd::packaged_task (自动处理)

std::async 和 std::packaged_task 会自动为你处理异常。

  • std::async:当你调用 std::async(my_func, …) 时,C++库的实现(在后台线程)实际上是在类似 try { my_func(...); } catch(...) { ... }代码块中执行你的函数。如果 my_func 抛出异常,async 会自动捕获它,并将其存储到返回的 future 中。

  • std::packaged_task:当你调用 task() 时,packaged_task 的 operator() 内部也会做同样的事情。它会 try…catch 你绑定的函数,如果捕获到异常,就自动将其存储到关联的 future 中。

std::promise (手动处理)

std::promise 是最底层的机制,它不会自动 try…catch 你的代码。你(“生产者”线程)必须手动捕获异常,并手动将其存入 promise

你有两种方法来做到这一点:

方法 1:set_exception(std::current_exception()) (最常用)

  • 你必须在 try…catch(…) 块中调用计算函数。
  • 在 catch(…) 块中,你调用 std::current_exception(),它会捕获当前正在处理的异常,并将其打包成一个 std::exception_ptr。
  • 你将这个 exception_ptr 传递给 promise.set_exception()
    1
    2
    3
    4
    5
    6
    7
    8
    extern std::promise<double> some_promise;
    try{
    some_promise.set_value(calculate_value()); // 尝试设置值
    }
    catch(...){ // 如果 calculate_value() 抛出任何异常
    // 捕获活动异常,并将其存入 promise
    some_promise.set_exception(std::current_exception());
    }

还有一个非常重要的隐式异常:如果 promise 或 packaged_task 被销毁了,但它承诺的值(或异常)却从未被设置,会发生什么?

假设生产者线程创建了 std::promise p 和 std::future f。消费者线程拿到了 f,并阻塞在 f.get() 上。

生产者线程因为某个错误(或者就是忘了)退出了,导致 p 被析构,而 p.set_value() 或 p.set_exception() 从未被调用。后果是, 如果没有任何机制,消费者线程将永久死锁,永远等待一个不会到来的值。

解决方案:

std::promise 和 std::packaged_task 的析构函数会检查:“我是否在还未‘履约’的情况下就被销毁了?”

如果是,析构函数会自动future 的共享状态中存储一个特殊的异常std::future_error。这个异常带有一个错误码:std::future_errc::broken_promise

正在 f.get() 上等待的消费者线程会被唤醒,并抛出 std::future_error 异常。这就避免了死锁,并明确地通知消费者:“你的值永远不会来了,因为生产者放弃了”。

多个线程的等待

这一节解决了 std::future 的一个核心局限性:如果有多个线程都需要等待同一个一次性事件的结果,该怎么办?

问题的根源在于 std::future (唯一期望) 存在两大局限:

  • 它是“只移动”的 (Move-only):就像 std::unique_ptr,std::future 代表了对结果的独占所有权
    • 你不能“拷贝”一个 std::future 分给两个线程。你只能将它从一个地方 std::move 到另一个地方。
  • get() 只能调用一次:当唯一的那个线程调用 get() 之后,future 内部的值就被“取走”了,future 对象变为空(valid() 返回 false)。
    • 这使得 std::future 无法被多个线程共享。

结论:std::future 只适用于一个线程(唯一的“所有者”)等待结果的场景。

为了解决这个问题,C++标准库提供了 std::shared_future (共享期望)。

类比:std::future 对应 std::unique_ptr,而 std::shared_future 对应 std::shared_ptr。

它代表了对异步结果的共享所有权, 是“可拷贝”的 (Copyable)。

  • 你可以创建 std::shared_future 的多个拷贝,并将这些拷贝分发给任意数量的线程。

  • 所有这些拷贝都指向同一个内部的“共享状态”(即那个“一次性事件”的结果)。

  • 当事件发生(例如 promise 被 set_value)时,所有的 shared_future 拷贝会同时变为“就绪”状态

  • 每个线程都可以在它自己的拷贝上调用 get() 来获取结果(get() 在 shared_future 上返回 const T&,并且可以被多次调用)。

如何创建 std::shared_future

你不能直接创建 std::shared_future。它必须从一个 std::future 转换而来,因为 std::future 是那个“唯一所有者”。

方法 1:构造时显式 std::move (最清晰)

std::future 是“唯一”的,shared_future 是“共享”的。要从“唯一”变为“共享”,你必须交出“唯一所有权”。

1
2
3
4
5
6
7
8
9
std::promise<int> p;
std::future<int> f = p.get_future();
assert(f.valid()); // 1. f 是合法的 (持有状态)

// 2. 将 f 的所有权“移动”给 sf
std::shared_future<int> sf(std::move(f));

assert(!f.valid()); // 3. f 此时不再合法 (它已交出所有权)
assert(sf.valid()); // 4. sf 现在是合法的

方法 2:隐式 std::move (从右值)

如果 std::future 是一个临时对象(右值),C++会自动进行移动

p.get_future() 返回的就是一个临时的 std::future 对象。

1
2
3
std::promise<std::string> p;
// 1. p.get_future() 返回的临时 future 被自动移动构造 sf
std::shared_future<std::string> sf(p.get_future());

方法 3:使用 .share() 成员函数 (最便捷)

std::future 提供了一个 .share() 成员函数,它为你执行 std::move 并返回一个新的 std::shared_future。

1
2
3
4
5
6
7
std::promise<int> p;
std::shared_future<int> sf = p.get_future().share();

// 这种方式在类型很复杂时特别有用,可以配合 auto:
std::promise<std::map<MyKey, MyData>::iterator> p;
// auto 会自动推导出那个非常长的 shared_future 类型
auto sf = p.get_future().share();

通过使用 std::shared_future,你可以轻松地实现多个线程等待同一个异步结果的场景,而不需要复杂的同步机制。

使用同步操作简化代码

这里的同步不同于阻塞机制, 是广义上的、指代线程间协调结果传递的机制,特别是非互斥量的同步工具。

它指的是使用 std::future、std::promise、std::packaged_task 等这些工具,以一种更高层、更抽象的方式来管理线程间的协作,从而简化代码结构。

使用“future”的函数化编程 (FP)

在并发上下文中,FP 模式的核心思想是避免共享可变状态, 尽量使用纯函数 (Pure Function)。 - 纯函数指的是一个函数,其输出只依赖于其输入参数,并且不会改变任何外部状态(没有“副作用”)。例如 sin(x)、sqrt(x)、3 + 3。

如果你的代码主要由纯函数构成,那么并发将变得极其简单。

没有共享的可变数据 = 没有数据竞争 = 不需要互斥量 = 没有死锁

两个 sin(x) 和 cos(y) 的调用可以安全地在两个线程上完全并行执行,因为它们不共享任何东西。

C++11 通过 Lambda、std::bind 和 auto 使得编写 FP 风格的代码变得更加容易。更重要的是, 还有 std::future 这个 FP 并发模式的“粘合剂

在 FP 模式中,如果函数不共享内存,它们如何协同工作呢? 答案是:一个函数的输出成为另一个函数的输入

std::future(期望)就是实现这种“数据流”的完美工具。它充当了一个异步通信通道

它允许一个线程(任务A)的计算结果(future)被另一个线程(任务B)作为依赖项来等待。

线程B等待的是“数据”(future.get()),而不是“锁”(mutex.lock())。这是一种更高级、更声明式的同步。

示例:并行快速排序

快速排序是一种经典的排序算法。它的基本思想是: 1. 选择一个“基准”元素 (pivot)。 2. 将数组划分为两部分:小于基准的元素和大于基准的元素。 3. 递归地对这两部分进行排序。 4. 最后将排序好的两部分与基准元素合并。

首先是一个简单的顺序版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin()); // 1
T const& pivot=*result.begin(); // 2

auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;}); // 3

std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point); // 4
auto new_lower(
sequential_quick_sort(std::move(lower_part))); // 5
auto new_higher(
sequential_quick_sort(std::move(input))); // 6

result.splice(result.end(),new_higher); // 7
result.splice(result.begin(),new_lower); // 8
return result;
}

现在,我们使用 std::async 和 std::future 将其并行化。

思路:第4步(排序低区)和第5步(排序高区)是完全独立的,它们可以并行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty()) { return input; }

std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();

auto divide_point = std::partition(...);

std::list<T> lower_part;
lower_part.splice(..., divide_point);

// 1. 【变化点 1】:异步执行“低区”排序
std::future<std::list<T>> new_lower(
std::async(&parallel_quick_sort<T>, std::move(lower_part)));

// 2. 【变化点 2】:当前线程“复用”自身,同步执行“高区”排序
auto new_higher(
parallel_quick_sort(std::move(input)));

// 3. 拼接“高区”(它已经完成了)
result.splice(result.end(), new_higher);

// 4. 【变化点 3】:等待“低区”结果并拼接
result.splice(result.begin(), new_lower.get());

return result;
}
std::future 允许我们用 FP 风格编写并发代码。我们不再思考“哪个线程在何时锁定了哪个互斥量”,而是转为思考“任务A依赖于任务B的结果”。

std::future 充当了 B 和 A 之间的“依赖通道”,A 通过 B.get() 来声明这种依赖关系,C++运行时会自动处理底层的阻塞和唤醒,极大简化了代码。

使用消息传递的同步操作