条件变量 (Condition Variable)
是一种同步原语,它允许一个或多个线程等待(阻塞),直到另一个线程修改了某个共享状态并通知它们。
它的核心思想不是为了“锁住”资源,而是提供一种“等待-通知”
(Wait-Notify) 机制。线程可以高效地等待某个特定条件 (Condition)
变为真,而无需通过循环不断地检查(这种低效的方式称为忙等待
(Busy-Waiting))。
为了安全地检查和修改这个共享状态,条件变量必须与一个互斥锁 (Mutex)
协同工作。
一个典型的条件变量工作流程包含以下三个核心部分:
- 互斥锁
(std::mutex):用于保护被检查的共享数据(即“条件”)。
- 共享数据/条件:一个或多个线程需要等待其状态发生改变的变量。例如,一个表示任务队列是否为空的布尔值或整数。
- 条件变量
(std::condition_variable):负责阻塞等待线程和唤醒它们。
其主要操作有两个:
- wait(lock):
等待操作。调用该函数的线程会执行以下原子操作:
- 释放传入的 lock(互斥锁)。
- 阻塞当前线程,使其进入等待状态。
- 当被其他线程通过 notify 唤醒时,它会重新获取 lock,然后 wait
函数才会返回。
- notify_one() / notify_all():
通知操作。
- notify_one():
唤醒一个正在等待的线程。具体唤醒哪一个是不确定的。
- notify_all(): 唤醒所有正在等待的线程。
生产者-消费者模型
这是并发编程中最经典的模型之一,用于解耦生产者(创建数据或任务的线程)和消费者(处理数据或任务的线程)。
- 生产者 (Producer):负责生成数据并将其放入一个共享的缓冲区(如队列)。
- 消费者 (Consumer):负责从缓冲区中取出数据并进行处理。 - 共享缓冲区
(Shared Buffer):连接生产者和消费者的中间数据结构。
这个模型需要解决以下两个核心同步问题: -
缓冲区为空时:消费者不能进行消费,必须等待,直到生产者放入了新的数据。 -
缓冲区为满时:生产者不能继续生产,必须等待,直到消费者取走了数据,为新数据腾出空间。
-
互斥访问:任何时刻,只能有一个线程(无论是生产者还是消费者)在访问缓冲区,以避免数据损坏。
互斥锁可以解决第3个问题,而条件变量则完美地解决了第1和第2个问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| #include <iostream> #include <thread> #include <vector> #include <queue> #include <mutex> #include <condition_variable>
template<typename T> class BlockingQueue { public: explicit BlockingQueue(size_t capacity) : capacity_(capacity) {}
void produce(const T& item) { std::unique_lock<std::mutex> lock(mtx_);
cond_producer_.wait(lock, [this] { return buffer_.size() < capacity_; });
buffer_.push(item); std::cout << "生产者 " << std::this_thread::get_id() << " 生产了 " << item << ", 队列大小: " << buffer_.size() << std::endl;
cond_consumer_.notify_one(); }
T consume() { std::unique_lock<std::mutex> lock(mtx_);
cond_consumer_.wait(lock, [this] { return !buffer_.empty(); });
T item = buffer_.front(); buffer_.pop(); std::cout << "消费者 " << std::this_thread::get_id() << " 消费了 " << item << ", 队列大小: " << buffer_.size() << std::endl;
cond_producer_.notify_one();
return item; }
private: size_t capacity_; std::queue<T> buffer_; std::mutex mtx_; std::condition_variable cond_producer_; std::condition_variable cond_consumer_; };
int main() { BlockingQueue<int> bq(5);
std::thread producer1([&]() { for (int i = 0; i < 10; ++i) { bq.produce(i); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }); std::thread producer2([&]() { for (int i = 10; i < 20; ++i) { bq.produce(i); std::this_thread::sleep_for(std::chrono::milliseconds(150)); } });
std::thread consumer1([&]() { for (int i = 0; i < 10; ++i) { bq.consume(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } }); std::thread consumer2([&]() { for (int i = 0; i < 10; ++i) { bq.consume(); std::this_thread::sleep_for(std::chrono::milliseconds(250)); } });
producer1.join(); producer2.join(); consumer1.join(); consumer2.join();
return 0; }
|
上述示例的
cond_producer_.wait(lock, ...)是生产者的核心等待逻辑。wait
函数会检查传入的 Lambda 表达式 [this] { return buffer_.size() <
capacity_; }。如果条件为真 (队列不满):wait
函数立即返回,线程继续向下执行; 如果条件为假
(队列已满):线程会原子地释放 lock, 进入阻塞/等待状态,等待被 notify
唤醒。
notify_one()
的核心作用是唤醒一个正在等待的线程。它就像一个信号,告诉等待中的线程:“你等待的条件可能已经满足了,快醒来检查一下吧!”.
唤醒的线程会重新获取 lock
锁,然后检查条件是否满足。如果条件满足,线程会继续执行;如果条件不满足,线程会再次进入等待状态。
例如, 如果没有
cond_consumer_.notify_one(),那些因队列为空而睡眠的消费者线程将永远不会知道有新数据到来,它们会一直“睡”下去,导致程序死锁。
虚假唤醒
虚假唤醒指的是,一个正在条件变量上等待 (cv.wait())
的线程,在没有任何其他线程调用 notify_one() 或 notify_all()
的情况下,被意外地唤醒。
换句话说,线程“无缘无故”地从等待状态中醒来,但它所等待的那个条件
(Condition) 实际上仍然不满足。
这是一个真实存在且需要正确处理的并发问题。POSIX 标准和 C++
标准都明确允许这种情况发生,因此程序员必须在代码中防范它。这种情况出现的原因通常与操作系统内核的线程调度实现有关(例如系统中断:等待中的线程可能会被一些不相关的系统事件(如
POSIX 信号)中断,导致其从内核的等待队列中被唤醒。)
处理虚假唤醒的“黄金法则”是:永远在循环中调用
wait()。下面的一种错误情况是:
1 2 3 4 5 6 7 8
| std::unique_lock<std::mutex> lock(mtx); if (buffer_.empty()) { cond_consumer_.wait(lock); }
T item = buffer_.front(); buffer_.pop();
|
正确的做法是使用 while
循环来包裹 wait, 确保在被 notify 唤醒后, 条件再次被检查:
1 2 3 4 5 6 7 8 9
| std::unique_lock<std::mutex> lock(mtx);
while (buffer_.empty()) { cond_consumer_.wait(lock); }
T item = buffer_.front(); buffer_.pop();
|
更推荐的做法是使用 wait 的谓词版本: C++
标准库为我们提供了更优雅的解决方案, wait
函数有一个重载版本,可以接受一个谓词 (Predicate),通常是一个 Lambda
表达式。
1 2 3 4 5 6 7 8 9
| std::unique_lock<std::mutex> lock(mtx);
cond_consumer_.wait(lock, [this] { return !buffer_.empty(); });
T item = buffer_.front(); buffer_.pop();
|
这个版本在功能上等价于 while
循环,但代码更简洁,意图更清晰,并且能有效避免程序员忘记写循环。