异步/并发编程模型

ZaynPei Lv6

在C++并发编程中,核心挑战之一是在不同线程间安全、高效地传递数据。特别是当一个线程(生产者/Provider)需要将一个计算结果(值或异常)传递给另一个线程(消费者/Consumer)时。std::future, std::promise, std::packaged_task, 和 std::async 共同构成了一个异步编程模型,其设计哲学是关注点分离 (Separation of Concerns),将任务的执行、结果的传递和线程的管理清晰地解耦。

核心组件:数据通信的 foundational elements

std::future 和 std::promise 是实现异步数据传递最基础的两个构建块,它们分别代表了共享状态的“读取端”和“写入端”。

std::future:未来的凭证/读取端口

std::future 是一个模板类,可以看作是一个异步结果的代理或占位符。当你启动一个异步任务时,你会立即得到一个 std::future 对象。这个对象本身并没有包含计算结果,但它承诺在未来的某个时刻,你可以通过它来获取结果。

主要操作: - get():这是 std::future 最核心的函数, 它会阻塞当前线程,直到异步任务完成并返回结果(或抛出异常)。 - get() 只能被调用一次。调用后,std::future 对象的状态会变为无效。 - wait():阻塞当前线程,直到结果可用,但不获取结果. 这个函数可以多次调用。 - wait_for() / wait_until():等待一段时间或等到一个指定的时间点。如果在超时前结果准备好了,它会返回 std::future_status::ready。 - valid():检查 std::future 对象是否与一个共享状态关联,即是否有效。在调用 get() 之后,valid() 会返回 false。

你不能直接创建一个 std::future,它总是通过以下三种方式之一获得: - std::promise 的 get_future() 方法。 - std::packaged_task 的 get_future() 方法。 - std::async 函数的返回值。

std::promise:一个承诺/写入端口

std::promise 对象可以被看作是一个可以被写入一次的容器,它承诺在未来的某个时刻会提供一个 T 类型的值(或异常), 从而使关联的 std::future 变为就绪状态。

主要操作:

  • 关联Future: 调用std::future get_future()来创建共享状态,并返回一个与该 promise 相关联的 std::future 对象。此方法对于每个 promise 对象只能调用一次。
  • 设置结果:
    • void set_value(const T& value) / void set_value(T&& value): 将一个值存入共享状态,并使关联的 std::future 变为就绪状态。
    • void set_exception(std::exception_ptr p): 将一个异常存入共享状态,并使关联的 std::future 变为就绪状态。
    • 这些设置操作同样是一次性的。对同一个 promise 多次调用 set_value 或 set_exception 会抛出异常。

工作流程: - 在发起任务的线程(消费者线程)中创建一个 std::promise 对象。 - 消费者线程通过调用 promise.get_future() 来获取一个与之关联的 std::future 对象, 并等待该 future 变为就绪状态。 - 消费者线程将 std::promise 对象(通常通过移动 std::move)传递给将要执行异步任务的新线程(生产者线程)。 - 通常通过 std::thread 的构造函数传递 promise, 此时工作函数需要接收 promise 对象作为参数。 - 新线程执行计算,当得到结果后,调用 promise.set_value(result) 将结果存入 promise。如果发生错误,可以调用 promise.set_exception(exception_ptr) 来存入一个异常。 - 一旦 set_value() 或 set_exception() 被调用,与之关联的 std::future 就会变为“就绪” (ready) 状态。 - 此时,在消费者线程中对 future 调用的 get() 将会立即返回结果(或抛出异常),不再阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

void compute_task(std::promise<int> p) {
try {
std::cout << "Worker thread is performing calculations..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
int result = 42;
// 承诺兑现,将结果放入 promise
p.set_value(result);
} catch (...) {
// 如果发生异常,可以将异常放入 promise
p.set_exception(std::current_exception());
}
}

int main() { // 消费者线程
// 1. 创建一个 promise 对象
std::promise<int> p; // 这里的int是任务的返回类型
// 2. 从 promise 获取 future,这是结果的“读取端”
std::future<int> f = p.get_future(); // 连接 promise 和 future
// 3. 创建一个新线程,并将 promise 的所有权转移给它
std::thread worker(compute_task, std::move(p)); // compute_task函数需要接收promise对象作为参数

std::cout << "Main thread is waiting for the result..." << std::endl;
// 4. 在主线程中调用 get() 等待结果
int result = f.get(); // // 这里会阻塞,直到 worker 线程调用了 p.set_value()

std::cout << "The result from worker thread is: " << result << std::endl;
worker.join();
return 0;
}

这里的main 线程创建了 promise 和 future,将 promise 移动到子线程,然后自己持有 future 等待结果; worker 线程接收 promise 对象,执行任务,最后通过 p.set_value(42) 履行承诺,这个动作会唤醒在 f.get() 处阻塞的 main 线程。

高级抽象:任务与结果的绑定

std::packaged_task 和 std::async 是在 promise/future 基础上构建的更高级抽象,它们将任务的执行与结果的传递机制更紧密地结合起来。

std::packaged_task<T(Args…)>:可调用对象与未来的封装

std::packaged_task 是一个模板类,它将任意可调用对象 (Callable Object) 与 promise/future 机制进行封装。它的主要目的是将一个函数的执行与其返回值的异步传递自动化。其模板参数是一个函数签名,例如 int(int, double)。

内部机制: - 构造: 当你用一个函数(如 my_func)创建一个 std::packaged_task<T(Args…)> task(my_func) 时,task 内部会创建一个 std::promise。 - 获取Future: 调用 task.get_future() 会返回与这个内部 promise 相关联的 std::future。 - 执行: packaged_task 对象本身是可调用的 (operator())。当你执行 task(args…) 时,它会调用其内部包装的函数 my_func(args…), 捕获 my_func 的返回值, 并自动使用该返回值调用内部 promise 的 set_value() 方法。如果 my_func 抛出异常,它会捕获异常并调用 set_exception()。

工作流程: - 创建一个可调用对象(例如一个函数或 Lambda)并用这个可调用对象来初始化一个 std::packaged_task。 - 通过 task.get_future() 获取与之关联的 std::future。 - 将 packaged_task 对象移动到新线程。 - 在新线程中,像普通函数一样执行这个 task 对象。 - 任务执行完毕后,其返回值会自动被 packaged_task 捕获并传递给关联的 future。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <thread>
#include <future>
#include <cmath>

// 一个耗时的计算任务
long long calculate_factorial(int n) {
long long res = 1;
for (int i = 2; i <= n; ++i) {
res *= i;
}
std::cout << "Task finished calculation." << std::endl;
return res;
}

int main() {
// 1. 将函数包装成一个 packaged_task, 模板参数是函数的签名:long long(int)
std::packaged_task<long long(int)> task(calculate_factorial);

// 2. 从 task 获取 future
std::future<long long> f = task.get_future();

// 3. 将 task 移动到新线程
// 注意:这里传递的是 task 本身,而不是函数
std::thread worker(std::move(task), 10); // 计算 10!

std::cout << "Main thread is waiting for the factorial result..." << std::endl;
// 4. 等待并获取结果
long long result = f.get();

std::cout << "Factorial of 10 is: " << result << std::endl;

worker.join();
return 0;
}

相比

std::async

std::async 是一个函数模板,用于以异步方式(可能在一个单独的线程中)启动一个任务。它将线程创建、任务执行和结果返回的所有细节都封装了起来, 相比于直接使用 std::thread,std::async 提供了一种更高层次、更方便的抽象,特别适用于那些需要从异步任务中获取返回值的场景。

简单来说,std::async 做了两件事: - 启动一个可调用对象(如函数、lambda表达式):它允许你安排一个任务去执行,而不必立即等待它完成。 - 返回一个 std::future 对象:这个 std::future 对象是一个“未来的凭证”,它最终会持有异步任务的返回值。你可以通过这个 future 在稍后的时间点获取结果。

这种模型被称为基于任务的并行 (Task-Based Parallelism),你关心的是“要完成什么任务”,而不是“具体要在哪个线程上完成”。

基本用法

std::async 的基本语法如下:

1
2
3
4
template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async( Function&& f, Args&&... args );
// 这里std::decay_t是类型萃取, 用于去掉引用、cv限定符和数组类型, 得到原始类型; 最终std::future的模板参数就是传入函数的返回类型
让我们通过一个简单的例子来理解它的工作方式: 假设我们有一个耗时的计算任务,我们不希望主线程被阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

// 一个模拟耗时计算的函数
int time_consuming_calculation(int x) {
std::cout << "Worker thread is calculating..." << std::endl;
// 模拟耗时2秒
std::this_thread::sleep_for(std::chrono::seconds(2));
return x * x;
}

int main() {
std::cout << "Main thread started." << std::endl;

// 1. 启动异步任务
// 通过 std::async 启动 time_consuming_calculation 函数,并传递参数 5。
// 这会立即返回一个 std::future<int> 对象。
std::future<int> future_result = std::async(time_consuming_calculation, 5);

// 2. 主线程继续执行其他任务
// 在异步任务执行的同时,主线程可以做别的事情。
std::cout << "Main thread is doing other work while waiting for the result..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟其他工作

// 3. 获取异步任务的结果
// 调用 future_result.get() 来获取结果。
// 如果此时异步任务还没有完成,.get() 会阻塞当前线程,直到结果可用。
std::cout << "Main thread is waiting for the result..." << std::endl;
int result = future_result.get(); // .get() 会阻塞直到任务完成

std::cout << "The result is: " << result << std::endl;
std::cout << "Main thread finished." << std::endl;

return 0;
}
- std::async(time_consuming_calculation, 5):这行代码请求异步执行 time_consuming_calculation(5)。它可能会立即创建一个新线程来运行这个函数。 - std::future future_result:std::async 返回一个 std::future 对象。int 类型表示我们期望从异步任务中获得一个整数返回值。 - future_result.get():这是关键部分。当主线程需要计算结果时,它调用 .get()。 - 如果此时工作线程已经计算完毕,.get() 会立即返回结果。 - 如果工作线程仍在计算,.get() 会阻塞主线程,直到计算完成并返回结果。 - 注意:get() 只能被调用一次。再次调用会导致未定义行为。

启动策略 (Launch Policy)

std::async 的行为可以通过一个可选的“启动策略”参数来控制。这是一个非常重要的特性。

1
2
// 明确指定启动策略
auto future = std::async(std::launch::async, my_function);
主要的启动策略有两种: - std::launch::async: 保证异步执行。系统必须创建一个新的线程来执行任务。当你需要真正的并行计算时,这是最常用的策略。 - std::launch::deferred: 延迟执行。任务不会立即在任何线程上启动。相反,它只会在返回的 std::future 对象上调用 .get() 或 .wait() 时,才会在调用 .get() 的那个线程上同步执行。用于实现惰性求值(Lazy Evaluation),即直到你真正需要结果时才进行计算。

默认策略:std::launch::async | std::launch::deferred

这是 std::async 的默认行为。它给予了标准库实现的灵活性,可以根据系统负载或其他条件自行决定是创建一个新线程(async)还是延迟执行(deferred)。然而, 这种不确定性可能导致程序行为难以预测。如果你需要保证并行性,强烈建议明确指定 std::launch::async。

std::async 与 std::thread 的对比

特性 std::thread std::async
抽象层次 低层次,直接操作线程 高层次,关注于”任务”
返回值 没有直接获取返回值的机制(需借助 std::promise 或共享变量) 通过返回的 std::future 对象轻松获取返回值
异常处理 如果线程函数抛出异常且未被捕获,程序会调用 std::terminate 终止 异常会被 std::future 捕获,并在调用 .get() 时重新抛出
线程管理 需要手动调用 join() 或 detach(),否则程序在 std::thread 对象析构时终止 std::future 的析构函数会阻塞,直到异步任务完成,避免了线程被意外销毁
系统开销 总是创建一个新的操作系统线程 (默认策略下) 可能不会创建新线程,由系统决定,可能更高效

总结

默认选择 std::async: 对于大多数“执行一个函数并获取其结果”的场景,std::async 因其简洁、安全(自动异常传递和资源管理)而成为首选。

需要任务队列时选择 std::packaged_task: 当你需要将任务对象化,以便存储、传递和由通用的执行器(如线程池)稍后执行时,std::packaged_task 是正确的工具。

需要手动信号控制时选择 std::promise: 当结果的产生不与单个函数的返回直接对应,而是取决于复杂的逻辑或外部事件时,std::promise 提供了必要的底层控制能力。