MPMC队列

ZaynPei Lv6

MPMC (Multiple-Producer, Multiple-Consumer),即多生产者、多消费者队列,是并发编程中最灵活的数据结构。它允许:

  • 任意数量的生产者线程同时向队列中添加元素。
  • 任意数量的消费者线程同时从队列中取出元素。

你可以把它想象成一个繁忙的物流中心,有多个卸货平台(生产者)和多个装货平台(消费者),它们可以全天候、并行地工作,而不会导致货物混乱。

应用场景

由于其极高的灵活性,MPMC 队列是许多通用并发系统的基石:

  • 线程池的任务队列 (Task Queue):这是 MPMC 最经典、最核心的应用。系统中的任何线程(生产者)都可以将一个任务(例如一个函数对象)提交到队列中。线程池中任何一个空闲的工作线程(消费者)都可以从队列中获取一个任务来执行。

  • 并行数据处理流水线:在一个复杂的数据处理流程中,如果某个处理阶段本身就是并行的(有多个工作线程),并且下一个阶段也是并行的,那么这两个阶段之间就需要一个 MPMC 队列来传递数据。

  • 高性能日志系统:在大型应用中,多个业务线程(生产者)会高频地产生日志消息。为了不阻塞业务线程,这些日志消息被快速扔进一个 MPMC 队列。后台有多个专门的 I/O 线程(消费者)负责从队列中取出日志并写入文件或网络。

核心挑战:双端竞争与 ABA 问题

MPMC 的实现难度远超其他类型的队列,因为它面临着最复杂的并发挑战。

双端竞争 (Double-Ended Contention)

  • 生产者端:多个生产者线程会同时竞争,试图更新队列的 tail(尾部)指针。

  • 消费者端:多个消费者线程会同时竞争,试图更新队列的 head(头部)指针。

在 SPSC 中,两端都没有竞争。在 MPSC/SPMC 中,只有一端存在竞争。而在 MPMC 中,两端都存在激烈的竞争,这使得简单的原子指针更新变得不可行,必须使用更复杂的同步原语,如 CAS (Compare-And-Swap) 循环。

ABA 问题

这是无锁编程中一个非常著名且致命的陷阱,在 MPMC 的朴素实现中极易出现。

假设有两个线程 T1 和 T2, 线程 T1 读取内存地址 P 的值为 A。然后 T1 被操作系统挂起。

挂起指的是操作系统临时中断某个线程的执行,将其从运行状态切换到等待或休眠状态。被挂起的线程不会继续执行自己的代码,直到操作系统再次调度它恢复运行。在并发场景下,线程被挂起后,其他线程可能会修改共享数据,这就为 ABA 问题的发生创造了条件。

与此同时,线程 T2 修改 P 的值为 B,然后又修改回 A。

T1 恢复执行,它检查 P 的值,发现仍然是 A。T1 错误地认为从它上次读取到现在什么都没有改变,于是继续执行后续操作。

在 MPMC 队列中,这种情况可能发生在 head 或 tail 指针上。这种情况是非常危险的,因为它可能导致数据结构的状态被错误地解释,进而引发数据损坏或程序崩溃。

假设一个基于链表的队列,T1 准备对头节点 head(其地址为 A)执行出队操作。在 T1 被挂起时,T2 可能已经将 A 节点出队、销毁,然后又有一个新节点恰好在相同的内存地址 A 处被分配。当 T1 恢复时,它看到的 head 地址仍然是 A,但这个 A 已经是一个全新的、不相关的节点了。如果 T1 继续操作,就会导致数据损坏或程序崩溃。

实现策略

实现一个正确且高效的 MPMC 无锁队列是世界级的难题。工业界主要采用以下几种经过验证的算法。

经典算法:Michael-Scott 队列

这是教科书中最常介绍的 MPMC 无锁队列算法,基于无锁链表实现。

  • 数据结构:一个单向链表,包含 head 和 tail 两个原子指针。

  • 哨兵节点 (Sentinel Node):队列初始化时包含一个“哑节点”(dummy node),head 和 tail 都指向它。这个哨兵节点简化了边界条件(空队列/单元素队列),并有效地将生产者和消费者的竞争点分离开。

  • 入队 (Enqueue):

    1. 创建一个新节点。

    2. 使用 CAS 循环,尝试将新节点链接到当前 tail 节点的 next 指针上。

    3. 成功后,再尝试更新 tail 指针指向新的尾节点。

  • 出队 (Dequeue):

    1. 使用 CAS 循环,尝试将 head 指针移动到它的下一个节点 (head->next)。

    2. 成功后,旧的 head(现在是哨兵节点)就可以被安全地回收了。

解决 ABA 问题:通常通过“标记指针” (Tagged Pointer) 或版本计数器 (Version Counter) 来解决。即将指针和 一个计数器打包成一个更大的原子类型(如 128 位),每次修改指针时都增加计数器。CAS 操作需要同时比较指针和计数器,确保两者都未被改变。

现代高性能实现 (基于环形缓冲区)

虽然 Michael-Scott 队列是无界的(unbounded),但其性能通常受限于内存分配和链表遍历。现代很多高性能 MPMC 队列是有界的(bounded),并且基于环形缓冲区,因为这能更好地利用 CPU 缓存。

其实现远比 SPSC 环形缓冲区复杂,核心思想是给每个槽位 (slot) 加上版本号或序列号,以协调生产者和消费者。

基本思路 (Ticket-Based)是维护两个原子计数器:enqueue_ticket 和 dequeue_ticket。

生产者: 1. 原子地获取并递增 enqueue_ticket,得到一个唯一的入队“票号”。 2. 计算该票号对应的缓冲区索引 idx = ticket % capacity。 3. 自旋等待,直到 buffer[idx] 的版本号等于其票号(这表示消费者已经消费完该槽位的旧数据,可以写入了)。 4. 写入数据。 5. 将 buffer[idx] 的版本号加一,以通知消费者数据已准备好。

消费者: 1. 原子地获取并递增 dequeue_ticket,得到一个唯一的出队“票号”。 2. 计算该票号对应的缓冲区索引 idx = ticket % capacity。 3. 自旋等待,直到 buffer[idx] 的版本号等于其票号(这表示生产者已经写入新数据,可以消费了)。 4. 读取数据。 5. 将 buffer[idx] 的版本号加一,以通知生产者数据已被消费。

这种设计将对 head/tail 指针的直接竞争,转化为了对槽位版本号的等待,在很多场景下能提供更高的吞吐量。

示例代码

下面是基于经典的 Michael-Scott 队列算法 的 MPMC 无锁队列的简化实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#ifndef MPMC_QUEUE_H
#define MPMC_QUEUE_H

#include <atomic>
#include <thread> // For std::this_thread::yield()

template<typename T>
class MPMCQueue {
private:
struct Node {
T data;
std::atomic<Node*> next;

Node(T val) : data(std::move(val)), next(nullptr) {}
}; // 节点结构, 在类内定义, 但是类并不包含节点实例

// 缓存行对齐以避免伪共享
alignas(64) std::atomic<Node*> head_; // 头指针, 指向链表的第一个节点
alignas(64) std::atomic<Node*> tail_; // 尾指针, 指向链表的最后一个节点

public:
MPMCQueue() {
// 关键设计:创建一个哨兵节点 (dummy node), 为了简化边界条件
// 队列初始化时,head 和 tail 都指向这个空节点
Node* sentinel = new Node(T{}); // 这里的T{}是默认构造的哨兵数据
head_.store(sentinel);
tail_.store(sentinel);
}

~MPMCQueue() {
// 清理所有剩余的节点
T dummy;
while (try_dequeue(dummy)) {}
// 删除最后的哨兵节点
delete head_.load();
}

// 禁止拷贝和赋值
MPMCQueue(const MPMCQueue&) = delete;
MPMCQueue& operator=(const MPMCQueue&) = delete;

/**
* @brief [多生产者线程调用] 尝试将一个元素入队。
*/
void enqueue(T value) {
Node* new_node = new Node(std::move(value));

// CAS 循环:持续尝试,直到成功将新节点链接到链表尾部
while (true) {
Node* last = tail_.load(std::memory_order_acquire); // 获取当前尾节点, 一开始是哨兵节点
Node* next = last->next.load(std::memory_order_acquire); // 获取尾节点的下一个节点

// 检查 tail_ 是否在我们读取后被其他线程改变了(一致性检查, 防止 ABA 问题)
if (last == tail_.load(std::memory_order_relaxed)) {
if (next == nullptr) {
// 这是正常情况:tail_ 指向的是真正的尾节点
// 尝试将新节点链接到尾部, 确保只有一个线程能够成功链接到旧的尾部。如果失败(被其他线程抢先链接了),循环重新开始。
if (last->next.compare_exchange_weak(next, new_node, std::memory_order_release)) {
// 链接成功!现在尝试更新 tail_ 指针
// 即使下面这步失败也没关系,其他线程会帮忙推进 tail_
tail_.compare_exchange_weak(last, new_node, std::memory_order_release);
return; // 成功入队
}
} else { // next != nullptr, 即已经有其他线程在添加节点
// 帮助其他线程:tail_ 指针落后了,我们帮它更新到真正的尾节点
tail_.compare_exchange_weak(last, next, std::memory_order_release);
}
}
}
}

/**
* @brief [多消费者线程调用] 尝试从队列中取出一个元素。
*/
bool try_dequeue(T& value) {
// CAS 循环:持续尝试,直到成功取出一个节点
while (true) {
Node* first = head_.load(std::memory_order_acquire);
Node* last = tail_.load(std::memory_order_acquire);
Node* next = first->next.load(std::memory_order_acquire);

if (first == head_.load(std::memory_order_relaxed)) { // 一致性检查,防止 ABA 问题
if (first == last) {
// 队列为空,或者 tail_ 指针落后了
if (next == nullptr) {
return false; // 队列确定为空
}
// tail_ 落后,帮助其他线程推进它
tail_.compare_exchange_weak(last, next, std::memory_order_release);
} else {
// 队列不为空,尝试移动 head_ 指针
// 我们要取出的值在 next 节点中(因为 first 是哨兵节点)
if (head_.compare_exchange_weak(first, next, std::memory_order_release)) {
value = std::move(next->data);

// ### UNSAFE ###
// 危险!这里是这个实现最不安全的地方。
// 在一个真实的 MPMC 队列中,你不能立即删除 'first' 节点,
// 因为其他线程可能仍然持有指向它的指针。
// 必须使用险象指针等技术来确保安全回收。
delete first; // 在此教学实现中,我们简化为直接删除

return true; // 成功出队
}
}
}
}
}
};

#endif // MPMC_QUEUE_H