条件变量和多线程数据共享

ZaynPei Lv6

条件变量 (Condition Variable) 是一种同步原语,它允许一个或多个线程等待(阻塞),直到另一个线程修改了某个共享状态并通知它们。

它的核心思想不是为了“锁住”资源,而是提供一种“等待-通知” (Wait-Notify) 机制。线程可以高效地等待某个特定条件 (Condition) 变为真,而无需通过循环不断地检查(这种低效的方式称为忙等待 (Busy-Waiting))。

为了安全地检查和修改这个共享状态,条件变量必须与一个互斥锁 (Mutex) 协同工作。

一个典型的条件变量工作流程包含以下三个核心部分:

  • 互斥锁 (std::mutex):用于保护被检查的共享数据(即“条件”)。
  • 共享数据/条件:一个或多个线程需要等待其状态发生改变的变量。例如,一个表示任务队列是否为空的布尔值或整数。
  • 条件变量 (std::condition_variable):负责阻塞等待线程唤醒它们

其主要操作有两个:

  • wait(lock): 等待操作。调用该函数的线程会执行以下原子操作:
    • 释放传入的 lock(互斥锁)
    • 阻塞当前线程,使其进入等待状态。
    • 当被其他线程通过 notify 唤醒时,它会重新获取 lock,然后 wait 函数才会返回。
  • notify_one() / notify_all(): 通知操作。
    • notify_one(): 唤醒一个正在等待的线程。具体唤醒哪一个是不确定的。
    • notify_all(): 唤醒所有正在等待的线程。

生产者-消费者模型

这是并发编程中最经典的模型之一,用于解耦生产者(创建数据或任务的线程)和消费者(处理数据或任务的线程)。 - 生产者 (Producer):负责生成数据并将其放入一个共享的缓冲区(如队列)。 - 消费者 (Consumer):负责从缓冲区中取出数据并进行处理。 - 共享缓冲区 (Shared Buffer):连接生产者和消费者的中间数据结构。

这个模型需要解决以下两个核心同步问题: - 缓冲区为空时:消费者不能进行消费,必须等待,直到生产者放入了新的数据。 - 缓冲区为满时:生产者不能继续生产,必须等待,直到消费者取走了数据,为新数据腾出空间。 - 互斥访问:任何时刻,只能有一个线程(无论是生产者还是消费者)在访问缓冲区,以避免数据损坏。

互斥锁可以解决第3个问题,而条件变量则完美地解决了第1和第2个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class BlockingQueue {
public:
// 构造函数,指定队列容量
explicit BlockingQueue(size_t capacity) : capacity_(capacity) {}

// 生产者调用:向队列中放入一个元素
void produce(const T& item) {
std::unique_lock<std::mutex> lock(mtx_); // std::unique_lock 是一个 RAII 风格的锁管理器,它在构造时自动加锁,在析构时(即离开作用域时)自动解锁,非常安全

// 等待直到队列不满
// 使用 Lambda 谓词,自动处理虚假唤醒
cond_producer_.wait(lock, [this] {
return buffer_.size() < capacity_;
});

// 将元素放入缓冲区
buffer_.push(item);
std::cout << "生产者 " << std::this_thread::get_id() << " 生产了 " << item << ", 队列大小: " << buffer_.size() << std::endl;

// 通知一个等待的消费者
cond_consumer_.notify_one();
}

// 消费者调用:从队列中取出一个元素
T consume() {
std::unique_lock<std::mutex> lock(mtx_);

// 等待直到队列不空, 等待时会释放 lock 锁, 让其他线程有机会修改共享状态
cond_consumer_.wait(lock, [this] {
return !buffer_.empty();
});

// 从缓冲区取出元素
T item = buffer_.front();
buffer_.pop();
std::cout << "消费者 " << std::this_thread::get_id() << " 消费了 " << item << ", 队列大小: " << buffer_.size() << std::endl;

// 通知一个等待的生产者
cond_producer_.notify_one();

return item;
}

private:
size_t capacity_;
std::queue<T> buffer_; // 底层使用 std::queue 存储数据(共享资源)
std::mutex mtx_; // 互斥锁,用于保护对 buffer_ 的访问
std::condition_variable cond_producer_; // 用于生产者等待的条件变量
std::condition_variable cond_consumer_; // 用于消费者等待的条件变量
};

int main() {
// 创建一个容量为 5 的阻塞队列
BlockingQueue<int> bq(5);

// 创建两个生产者线程
std::thread producer1([&]() {
for (int i = 0; i < 10; ++i) {
bq.produce(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
}
});
std::thread producer2([&]() {
for (int i = 10; i < 20; ++i) {
bq.produce(i);
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
});

// 创建两个消费者线程
std::thread consumer1([&]() {
for (int i = 0; i < 10; ++i) {
bq.consume();
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费耗时
}
});
std::thread consumer2([&]() {
for (int i = 0; i < 10; ++i) {
bq.consume();
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
});

// 等待所有线程结束
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();

return 0;
}
上述示例的cond_producer_.wait(lock, ...)是生产者的核心等待逻辑。wait 函数会检查传入的 Lambda 表达式 [this] { return buffer_.size() < capacity_; }。如果条件为真 (队列不满):wait 函数立即返回,线程继续向下执行; 如果条件为假 (队列已满):线程会原子地释放 lock, 进入阻塞/等待状态,等待被 notify 唤醒。

notify_one() 的核心作用是唤醒一个正在等待的线程。它就像一个信号,告诉等待中的线程:“你等待的条件可能已经满足了,快醒来检查一下吧!”. 唤醒的线程会重新获取 lock 锁,然后检查条件是否满足。如果条件满足,线程会继续执行;如果条件不满足,线程会再次进入等待状态。

例如, 如果没有 cond_consumer_.notify_one(),那些因队列为空而睡眠的消费者线程将永远不会知道有新数据到来,它们会一直“睡”下去,导致程序死锁。

虚假唤醒

虚假唤醒指的是,一个正在条件变量上等待 (cv.wait()) 的线程,在没有任何其他线程调用 notify_one() 或 notify_all() 的情况下,被意外地唤醒。

换句话说,线程“无缘无故”地从等待状态中醒来,但它所等待的那个条件 (Condition) 实际上仍然不满足。

这是一个真实存在且需要正确处理的并发问题。POSIX 标准和 C++ 标准都明确允许这种情况发生,因此程序员必须在代码中防范它。这种情况出现的原因通常与操作系统内核的线程调度实现有关(例如系统中断:等待中的线程可能会被一些不相关的系统事件(如 POSIX 信号)中断,导致其从内核的等待队列中被唤醒。)

处理虚假唤醒的“黄金法则”是:永远在循环中调用 wait()。下面的一种错误情况是:

1
2
3
4
5
6
7
8
// 线程可能会在虚假唤醒后错误地继续执行
std::unique_lock<std::mutex> lock(mtx);
if (buffer_.empty()) { // 一开始检查过是空, 进入函数体, 等待被 notify 唤醒
cond_consumer_.wait(lock); // 如果在这里被虚假唤醒,if 语句已执行过,不会再次检查, 但是如果是虚假唤醒, 完全有可能还是空
}
// 危险!程序可能在这里尝试从空队列中取数据
T item = buffer_.front();
buffer_.pop();
正确的做法是使用 while 循环来包裹 wait, 确保在被 notify 唤醒后, 条件再次被检查:
1
2
3
4
5
6
7
8
9
std::unique_lock<std::mutex> lock(mtx);
// 使用 while 循环来包裹 wait
while (buffer_.empty()) {
cond_consumer_.wait(lock);
}
// 安全!从 wait 返回后,循环条件会再次被检查。
// 只有当 buffer_.empty() 为 false 时,循环才会退出。
T item = buffer_.front();
buffer_.pop();
更推荐的做法是使用 wait 的谓词版本: C++ 标准库为我们提供了更优雅的解决方案, wait 函数有一个重载版本,可以接受一个谓词 (Predicate),通常是一个 Lambda 表达式。
1
2
3
4
5
6
7
8
9
std::unique_lock<std::mutex> lock(mtx);
// 这个 wait 内部已经为我们实现好了 while 循环
// 它只会在 lambda 表达式返回 true 时才会返回
cond_consumer_.wait(lock, [this] {
return !buffer_.empty();
});
// 绝对安全!
T item = buffer_.front();
buffer_.pop();
这个版本在功能上等价于 while 循环,但代码更简洁,意图更清晰,并且能有效避免程序员忘记写循环。

On this page
条件变量和多线程数据共享