6. 无锁并发数据结构设计

ZaynPei Lv6

无锁并发数据结构的定义

首先, 在探讨无锁和有锁之前, 我们先回顾一下阻塞 (Blocking) vs. 非阻塞 (Non-blocking)

阻塞 (Blocking) 数据结构/算法:这是我们前面章节(特别是第3、4、6章)主要讨论的方式。它们依赖于阻塞式的同步原语,如: - 互斥量 (std::mutex):调用 lock() 可能导致线程被挂起。 - 条件变量 (std::condition_variable):调用 wait() 会挂起线程。 - 期望 (std::future):调用 get() 或 wait() 可能挂起线程

当一个线程因为等待某个条件(如锁被释放、通知到达、结果就绪)而被阻塞时,操作系统通常会完全挂起该线程,剥夺它的CPU时间片,并将其交给其他可运行的线程。只有当等待的条件满足时,操作系统才会将其唤醒重新调度

这种方式的优点是CPU资源不会在等待时被浪费。 缺点是线程挂起和唤醒本身有上下文切换开销;可能导致死锁、优先级反转等问题。

非阻塞 (Non-blocking) 数据结构/算法:不使用任何可能导致线程被操作系统长期挂起的阻塞式库函数。线程即使在等待,通常也是在执行指令(例如,在一个循环中检查某个条件)。

例如之前介绍的自旋锁 (Spinlock), 优点是避免了线程挂起和唤醒的开销, 但缺点是如果等待时间较长, 会浪费CPU资源.

非阻塞数据结构 (Non-blocking ≠ Lock-Free)

我们回顾一下之前实现的自旋锁 (Spinlock):

1
2
3
4
5
6
7
8
9
10
class spinlock_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() {
while(flag.test_and_set(std::memory_order_acquire)); // 循环直到成功设置 flag
}
void unlock() {
flag.clear(std::memory_order_release);
}
};

显然, 它是非阻塞的. lock() 函数没有调用任何会让操作系统挂起线程的函数。它只是在一个 while 循环中不断地执行 test_and_set 原子操作,消耗CPU时间片,直到成功获取锁(test_and_set 返回 false)。

然而,它仍然是一个。在任何时刻,只有一个线程能够成功退出 lock() 循环并进入临界区

如果持有锁的线程被操作系统意外挂起(例如,时间片用完),所有其他正在自旋等待锁的线程将无法取得任何进展,它们只能继续空转,直到持有锁的线程被唤醒并释放锁。这违反了无锁的定义。

因此, 非阻塞数据结构/算法 (Non-blocking Data Structures/Algorithms) 并不等同于无锁数据结构/算法 (Lock-Free Data Structures/Algorithms)。

无锁数据结构 (Lock-Free)

这是比“非阻塞”更强的保证: 如果一个数据结构的操作是无锁的,它保证系统整体(即所有正在操作该数据结构的线程中,至少有一个)总能在有限步骤内完成其操作,而不管其他线程的速度如何,或者是否有线程被挂起。

换句话说, 系统作为一个整体总是在前进不会因为某个线程被挂起而导致所有线程都卡住

实现特征:通常严重依赖原子操作,特别是比较并交换 (Compare-and-Swap, CAS),并且经常包含循环。 > CAS 循环失败的原因必须是因为其他线程修改了数据(即其他线程取得了进展),而不是因为持有锁的线程被挂起。

可能的缺点:线程饥饿 (Starvation). 无锁不保证每个线程都能在有限时间内完成操作。 可能存在一个“运气不好”的线程,它每次尝试 CAS 操作时,都正好被其他线程抢先修改了数据,导致它永远在循环重试,无法完成自己的任务。

无等待数据结构 (Wait-Free)

这是最强的非阻塞保证: 如果一个数据结构的操作是无等待的,它保证每一个线程都能在有限步骤内完成其操作,而不管其他线程的速度如何是否被挂起、或者是否存在争用

换句话说:每个线程都有自己取得进展的保证,没有饥饿问题

实现难度:无等待算法通常比无锁算法复杂得多。它们需要保证即使在最坏的争用情况下,每个线程的操作也能在固定的步数内完成。这往往需要非常精巧的设计,有时甚至在没有争用的情况下也需要执行更多的步骤。

这种算法在实际应用中较少见,因为它们的复杂性和开销通常超过了它们所提供的好处。因此, 本章后续内容将主要关注无锁数据结构/算法的设计与实现。

无锁数据结构的利与弊 (Why pursue Lock-Free?)

优点 (Pros):

  • 最大化并发 (Maximize Concurrency):理论上,无锁允许线程在大部分时间里并行执行,避免了锁带来的序列化瓶颈。无等待则提供了更强的并发保证。

  • 鲁棒性 (Robustness):如果一个线程在操作无锁结构的中途崩溃或被杀死,它不会破坏数据结构的整体可用性(只可能丢失该线程正在进行的操作)。其他线程仍然可以继续安全地访问和修改数据结构。这对于高可用系统很重要。(相比之下,持有锁的线程崩溃会导致锁永远无法释放)。

  • 无死锁 (No Deadlock):因为没有锁,所以不会发生因锁依赖关系导致的死锁。

缺点 (Cons):

  • 设计极其复杂 (Complexity):正确实现无锁(尤其是无等待)算法非常困难。需要深刻理解原子操作、内存模型、指令重排以及各种微妙的竞争条件。

  • ABA 问题 (ABA Problem):CAS 操作可能因为一个值被修改(A->B)然后又被改回(B->A)而错误地成功,它完全不知道这种情况的发生,尽管底层数据的状态可能已经完全不同。

  • 性能开销 (Performance Overhead):

    • 原子操作本身:原子操作通常比非原子操作慢

    • 缓存一致性开销 (Cache Ping-Pong):在高争用下,多个 CPU 核心频繁地通过原子操作修改同一个内存位置(例如,无锁栈的 head 指针),会导致缓存行在不同核心之间不断失效和同步(“乒乓”),这会成为严重的性能瓶颈,甚至可能比基于锁的实现更慢。

      • 例如, A 线程修改了 head 指针, 导致 B 线程的缓存行失效, B 线程再次修改 head 指针, 导致 A 线程的缓存行失效, 如此反复.
    • 活锁 (Livelock):在无锁(非无等待)算法中,多个线程可能陷入不断重试但都无法成功的状态,互相干扰,导致虽然 CPU 在忙碌但系统整体没有进展。(类似两个人过独木桥,互相谦让但都过不去)。活锁通常是暂时的,取决于线程调度,但会影响性能。

  • 内存管理困难 (Memory Management):如何安全地回收(删除)无锁结构中的节点是一个巨大的挑战,因为你无法确定何时没有其他线程再持有指向该节点的指针。

结论:无锁设计并非总是更好。它是一种权衡。只有在锁成为明显瓶颈,并且你有能力正确、高效地实现并充分测试无锁算法时,才值得考虑。在许多情况下,精心设计的基于锁的方案(如第6章的细粒度锁)可能已经足够好,甚至性能更优。性能测量 (Profiling) 是做出最终决定的关键。

无锁数据结构示例

在本节中,我们将通过几个经典的无锁数据结构示例,来说明无锁设计的基本思路和实现方法。这些示例将涵盖无锁栈、无锁队列等常见数据结构。

无锁栈 (Lock-Free Stack)

这一节的目标是展示如何使用第5章介绍的原子操作(特别是 compare_exchange_weak)来构建一个无锁的线程安全栈。这个过程将揭示无锁设计的核心思想以及它所带来的主要挑战之一——内存管理。

基本思路:链表原子头指针 - 数据结构:和许多栈实现一样,我们选择单向链表作为底层结构。栈顶元素位于链表的头部。 - 核心指针:只需要一个指向链表头节点的指针 head。 - 原子性要求:因为多个线程可能同时尝试修改 head(push 新节点或 pop 移除节点),所以 head 指针必须是原子的:std::atomic<node*> head;

无锁 push 操作

push 操作的目标是将一个新节点原子性地添加到链表的头部。单线程步骤回顾:

  • 创建新节点 new_node。
  • new_node->next = head;
  • head = new_node;

并发挑战:步骤 2 和 3 之间存在竞争。如果线程 A 读取了 head (H1),然后线程 B 快速完成了 push (将 head 改为 H2),接着线程 A 执行步骤 3 将 head 设为 new_node_A(其 next 仍然指向 H1),那么线程 B 的 push 操作就丢失了!(此时的新head是 new_node_A, 而不是 new_node_B) > 本质上还是读-改-写的竞争。

无锁解决方案 (CAS):使用 compare_exchange_weak 原子地执行步骤 2 和 3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template<typename T>
class lock_free_stack
{
private:
struct node
{
T data;
node* next;

node(T const& data_): // 1. 构造函数直接存数据
data(data_) // (这里假设 T 的拷贝构造是安全的)
{}
};

std::atomic<node*> head; // 原子头指针
public:
void push(T const& data)
{
// 2. 创建新节点 (注意:这里在锁外完成耗时操作)
node* const new_node = new node(data);

// 3. 读取当前的 head,并设置 new_node 的 next
// 这里的 load 可以是 relaxed,因为 CAS 会重新检查
new_node->next = head.load();

// 4. 关键:CAS 循环
// 尝试原子地将 head 从 "new_node->next"(期望值) 更新为 "new_node"(新值)
while(!head.compare_exchange_weak(new_node->next, new_node));
}
// ... pop() 待实现 ...
};
- 节点构造:新节点 new_node 在进入原子操作循环之前就完全构造好了,包括其数据 data。这一点至关重要:一旦 head 指向 new_node,它就可能被其他线程 pop,所以它必须是“完整的”。 - 设置 next:读取当前的 head 指针,并将其设置为新节点的 next 指针。 - CAS 循环:head.compare_exchange_weak(expected, desired) 是核心。 - expected: new_node->next (即上一步读取到的 head 值)。 - desired: new_node (我们想要设置的新 head)。 - 原子操作: - 如果 head 的当前值 仍然等于 expected(意味着从上一步读取后没有其他线程修改过 head),那么就将 head 更新为 desired (new_node),操作成功,compare_exchange_weak 返回 true,while 循环结束。 - 如果 head 的当前值 不等于 expected(意味着在上一步读取后,有其他线程已经修改了 head,比如另一个 push 或 pop),那么操作失败,compare_exchange_weak 返回 false。并且,compare_exchange_weak 会自动将 expected (即 new_node->next) 更新为 head 当前的值

  • 循环重试:当 CAS 失败时,new_node->next 已经被自动更新为最新的 head 值,while 循环条件为 true,因此会再次尝试 compare_exchange_weak,使用更新后的 expected 值。这个循环会一直进行,直到 CAS 成功为止。

为何使用 weak? 因为操作已经在 while 循环中,即使 compare_exchange_weak 发生“伪失败”(即值匹配但仍然返回 false),循环也会自动重试,不会影响正确性。在某些平台上,weak 在循环中可能比 strong 更高效。

无锁 pop 操作

pop 操作的目标是原子性地移除并返回头节点的数据。单线程步骤回顾:

  • old_head = head;
  • head = old_head->next;
  • result = old_head->data;
  • delete old_head;
  • return result;

并发挑战:

  • Use-After-Free:如果线程 A 和 B 同时读取 head (H1),然后 A 成功将 head 更新为 H1->next 并 delete H1,此时 B 仍然持有指向 H1 的指针,如果 B 尝试访问 H1->next,就会访问已删除内存(未定义行为)。
    • 例如B先读取head为H1, 然后被操作系统挂起 (这是操作系统调度层面的, 无法避免); A完成pop并delete H1; B恢复后尝试访问H1->next, 导致错误.
  • Double Pop:如果未使用原子操作,两个线程可能都认为自己成功 pop 了同一个节点。

无锁解决方案 (CAS + 暂时忽略删除):使用 CAS 来原子地更新 head,确保只有一个线程能成功“声明”对头节点的所有权。暂时不处理删除节点的问题(即故意造成内存泄露),以避免 Use-After-Free。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data; // 1. 使用 shared_ptr 管理数据 (为了异常安全和方便返回)
node* next;

node(T const& data_):
data(std::make_shared<T>(data_)) // 2. 在 push 时就创建 shared_ptr
{}
};

std::atomic<node*> head;
public:
void push(T const& data) { /* ... 如上 ... */ }

std::shared_ptr<T> pop()
{
node* old_head = head.load();
// 3. 循环直到成功将 head 从 old_head 更新为 old_head->next
// 必须检查 old_head 是否为 null (空栈)
while(old_head &&
!head.compare_exchange_weak(old_head, old_head->next));

// 4. 如果 old_head 非空 (pop 成功),返回其数据;否则返回空 shared_ptr
return old_head ? old_head->data : std::shared_ptr<T>();

// !!! 警告:这里没有 delete old_head,存在内存泄露 !!!
}
};
  • shared_ptr 数据 (①, ②):为了解决第3章讨论过的 pop 返回值时的异常安全问题,我们在 push 时就将数据存储在 shared_ptr 中。这样 pop 操作只需要返回(或 swap)shared_ptr,这个操作本身是安全的,不会因为拷贝 T 而抛异常导致数据丢失。

  • CAS 循环:

    • old_head = head.load() 读取当前头。
    • while 循环条件首先检查 old_head 是否为 nullptr(空栈)。如果是,循环不会执行,函数直接返回空 shared_ptr。如果非空,尝试 compare_exchange_weak(old_head, old_head->next)。
      • 成功:head 被原子地更新为下一个节点,compare_exchange_weak 返回 true,使 while 条件为 false,循环终止。当前线程成功“拥有”了 old_head。
      • 失败:意味着 head 在 load 后被其他线程修改了。CAS 会将 old_head 更新为当前的 head 值。循环继续,用新的 old_head 重试。
  • 返回数据:如果 old_head 不是 nullptr(意味着 CAS 最终成功了或者一开始栈就不空),就返回存储在 old_head 中的 shared_ptr

内存泄露:最关键的问题是,成功弹出的 old_head 节点从未被 delete!我们暂时牺牲了内存来换取避免 Use-After-Free 导致的未定义行为。

安全性分析与总结

在异常安全方面: - push:new node(data) 可能抛异常(内存分配或 make_shared),但此时栈未被修改,安全。

  • pop:主要操作是原子操作和 shared_ptr 操作,通常不抛异常。空栈情况已处理。安全。

  • 数据竞争:head 指针通过原子操作和 CAS 保护,没有数据竞争。对 node->data 的访问通过返回 shared_ptr 保证安全。对 node->next 的访问发生在 CAS 成功“拥有”节点之后(暂时安全,因为没 delete)。暂时没有数据竞争(但有内存泄露)。

  • 死锁:没有使用锁,无死锁。

在并发进展方面:push 和 pop 都使用了 while 循环 + CAS。

这是无锁 (Lock-Free) 的:如果多个线程竞争,CAS 保证了至少有一个线程的操作会成功(修改了 head),从而使系统整体取得进展。

但它不是无等待 (Wait-Free) 的:一个线程可能运气不好,每次 CAS 都失败,导致饥饿 (Starvation)。

总之, 本节成功地使用原子 head 指针CAS 循环实现了无锁的 push 和 pop 操作,解决了 head 指针上的竞争问题,并保证了基本的线程安全(在忽略内存泄露的前提下)。

然而,内存泄露是不可接受的。pop 操作无法安全删除节点的问题,是无锁数据结构设计中面临的核心挑战之一。如何安全地回收这些节点,同时避免 Use-After-Free,将是后续小节要解决的关键问题,涉及更复杂的内存管理技术(如基于计数的回收、风险指针、引用计数)。

停止内存泄露:使用无锁数据结构管理内存

在上一节中,我们实现了一个无锁栈,但留下了一个严重的内存泄露问题:pop 操作只是将节点从链表中移除,但从未 delete 它。这样做是为了避免使用后释放(Use-After-Free) 的未定义行为——即一个线程删除了节点 N,而另一个线程仍然持有指向 N 的指针并试图访问 N->next。

本节的目标就是解决这个内存泄露问题,探讨如何在无锁环境中安全地回收不再使用的节点内存。

核心问题:何时可以安全删除

根源:pop 操作的核心在于 compare_exchange_weak(old_head, old_head->next) 循环。在这个循环中,线程会先 load 一个 head 指针 (old_head),然后在尝试 CAS 之前,可能会需要读取 old_head->next。

危险:如果在线程 T1 读取了 old_head (指向节点 N) 之后、读取 old_head->next 之前,另一个线程 T2 成功地 pop 了节点 N 并将其 delete,那么 T1 对 old_head->next 的访问就是 Use-After-Free。

安全删除条件:一个节点 N 只有在确定没有任何线程(无论是正在 pop 循环中,还是将来可能访问它)会再持有指向它并试图解引用的指针时,才能被安全地 delete。

一个草率的尝试: 基于全局计数的回收

这是一种尝试性的、简单但有缺陷的解决方案。

核心思想:用一个原子计数器 threads_in_pop 来追踪当前有多少个线程正在执行 pop 函数

当一个线程成功 pop 出一个节点 old_head 后,它暂时不删除该节点。

它检查 threads_in_pop 的值。如果只有它自己在 pop (threads_in_pop == 1),那么似乎可以安全删除。

如果还有其他线程在 pop(它们可能仍持有指向 old_head 或其他待删除节点的指针),则将 old_head 添加到一个全局的“待删除”链表 (to_be_deleted) 中,推迟删除。

当一个线程发现自己是最后一个离开 pop 函数的线程时(即它将 threads_in_pop 减为 0),它负责清空并删除整个“待删除”链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
template<typename T>
class lock_free_stack
{
private:
// ... node 结构体和 head 原子指针同上 ...
std::atomic<unsigned> threads_in_pop; // 原子变量计数器, 是全局的
std::atomic<node*> to_be_deleted; // 待删除链表头指针

static void delete_nodes(node* nodes)
{
while(nodes) // 删除整个链表
{
node* next = nodes->next; // 先保存下一个节点
delete nodes; // 删除当前节点
nodes = next; // 继续下一个节点
}
}

void try_reclaim(node* old_head) // 尝试回收节点内存
{
if(threads_in_pop==1) // 如果只有当前线程在 pop
{
node* nodes_to_delete = to_be_deleted.exchange(nullptr); // 取走全局待删除链表
if(!--threads_in_pop) // 再次检查是否只有一个线程调用pop()
{
delete_nodes(nodes_to_delete); // 删除待删除链表
}
else if(nodes_to_delete) // 在此期间有其他线程在 pop, 这些新线程可能已经看到了 nodes_to_delete 中的某些节点, 删除它们不安全, 因此需要再将取走的节点放回待删除链表
{
chain_pending_nodes(nodes_to_delete); // 放回待删除链表
}
delete old_head; // 删除自己刚刚 pop 出来的节点。注意这里仍然是安全的,因为只有本线程成功通过 CAS 获取了它。
}
else // 多个线程在 pop
{
chain_pending_node(old_head); // 把 old_head 加入待删除链表 8
--threads_in_pop;
}
}

void chain_pending_nodes(node* nodes) // 将多个节点加入待删除链表
{
node* last = nodes;
while (node* const next = last->next) // 让next指针指向链表的末尾
{
last = next;
}
chain_pending_nodes(nodes, last); // 此时的last是链表的最后一个节点
}

void chain_pending_nodes(node* first,node* last) // 将[first,last]区间的节点加入待删除链表
{
last->next = to_be_deleted; // 将待删除链表接在last后面
while(!to_be_deleted.compare_exchange_weak( // 用循环来保证last->next的正确性
last->next,first));
}

void chain_pending_node(node* n) // 将单个节点加入待删除链表
{
chain_pending_nodes(n,n); // 调用上面的函数, 传入相同的节点作为first和last
}
public:
std::shared_ptr<T> pop()
{
++threads_in_pop; // 2 在做事之前,计数值加1
node* old_head=head.load();
while(old_head &&
!head.compare_exchange_weak(old_head,old_head->next));
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data); // 从节点中直接提取数据,而非拷贝指针
}
try_reclaim(old_head);
return res;
}
};
该方案的致命缺陷:高争用下的内存泄露

这个方案依赖于存在一个“静止” (quiescent) 状态,即 threads_in_pop 能够降到 1 或 0 的时刻

然而, 在一个繁忙的高争用系统中,可能永远都有多个线程在同时执行 pop 操作。threads_in_pop 可能永远不会降到 1 以下。

后果是:try_reclaim 函数中的检查 1 (threads_in_pop == 1) 永远为 false。所有被 pop 的节点都会被不断地添加到 to_be_deleted 链表,但检查 2 (!–threads_in_pop) 永远不会满足,因此 delete_nodes永远不会被调用。to_be_deleted 链表无限增长,导致严重的内存泄露。

因此, 基于全局线程计数的内存回收方案虽然想法简单,但在实际的高并发(高争用)场景下是不可靠的,会因无法找到安全的回收时机而导致内存泄露。

这表明我们需要更健壮的内存管理技术,这些技术不依赖于全局的“静止”状态,而是能够精确地追踪哪些节点确实不再被任何线程引用

这直接引出了后续两种更高级(也更复杂)的解决方案:

  • 风险指针 (Hazard Pointers):线程显式地“声明”它们正在使用的指针

  • 引用计数 (Reference Counting):为每个节点维护引用计数

解决方案 1:风险指针 (Hazard Pointers)

之所以存在内存泄漏, 根本问题在于,一个线程(回收者)无法知道是否有其他线程(读取者)仍然持有指向它想要 delete 的节点的指针,并且可能在稍后解引用该指针。

风险指针的思路:反转责任。不再让回收者猜测,而是要求读取者在即将访问一个可能被回收的指针之前,先显式地声明:“我现在正在使用这个指针 P,请暂时不要删除它!”。这个声明就是“风险指针”。

具体机制为:

  • 风险指针列表:存在一个全局可见的列表(通常是固定大小的数组),其中每个活跃的读取线程都“拥有”一个或多个风险指针槽位 (Hazard Pointer Slots)。每个槽位可以存储一个 void*(或原子化的 std::atomic<void*>)。(在后续的示例代码中,我们假设每个线程只有一个风险指针槽位)。

    • 虽然这里是void*,但实际存储的通常是指向数据结构节点(如 stack::node)的指针。
  • 读取者设置风险指针:当一个读取线程(例如在 pop 中)从共享结构(如 head 指针)读取了一个节点指针 ptr,并且在解引用它(例如访问 ptr->next)之前,它必须将 ptr 的值写入它自己的风险指针槽位。

  • 读取者清除风险指针:当读取线程不再需要访问 ptr 时(例如,它成功地将 head CAS 到了下一个节点,或者它决定放弃当前操作),它必须将自己的风险指针槽位清除(例如,设置为 nullptr)。

  • 回收者检查风险指针:当一个线程(回收者)成功地将一个节点 N 从数据结构中移除(例如 pop 成功)并想要 delete N 时,它不能立即删除。它必须遍历所有线程的所有风险指针槽位

    • 如果节点 N 的地址出现在任何一个风险指针槽位中,说明有其他线程正在(或即将)使用它。回收者不能删除 N,必须将其放入一个“待回收”列表 (reclaim_later)。
    • 如果节点 N 的地址没有出现在任何风险指针槽位中,说明此刻没有线程声明正在使用它,回收者可以安全地 delete N。
  • 处理“待回收”列表:回收者需要定期地(或者在每次 pop 成功后)尝试处理“待回收”列表。对于列表中的每个节点,它再次执行步骤 4 和 5(检查所有风险指针)。如果检查通过,就将其从列表中移除并 delete。

下面是使用风险指针技术实现的 pop 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
template<typename T> // 假设在 lock_free_stack 类中
class lock_free_stack {
// ... node, head ...

// 假设这些辅助函数已存在:
std::atomic<void*>& get_hazard_pointer_for_current_thread(); // 获取当前线程的风险指针槽位引用
bool outstanding_hazard_pointers_for(node* p); // 检查是否有任何线程的风险指针指向节点 p
void reclaim_later(node* p); // 将节点 p 添加到待回收列表
void delete_nodes_with_no_hazards(); // 处理待回收列表,删除没有风险指针指向的节点

public:
std::shared_ptr<T> pop() {
// 先获取当前线程自己的风险指针槽位的引用
std::atomic<void*>& hp = get_hazard_pointer_for_current_thread(); // 风险指针槽位
node* old_head = head.load(); // 初始读取 head

// 外层循环:处理 CAS 失败重试
do {
node* temp;
// 1. 内层循环:安全地设置风险指针
do {
temp = old_head; // 保存读取到的 head
hp.store(old_head); // 设置风险指针!
old_head = head.load(); // 重新读取 head
} while (old_head != temp); // 如果 head 在设置 HP 期间改变了,重试内层循环

// 到这里,HP 已安全设置为 old_head (head 在期间未变)
// 现在可以尝试 CAS 更新 head
} while (old_head && // 检查 old_head 非空 (处理空栈)
!head.compare_exchange_strong(old_head, old_head->next)); // CAS 操作

hp.store(nullptr); // 2. CAS 成功或栈空,清除风险指针

std::shared_ptr<T> res;
if (old_head) { // 如果成功 pop 了一个节点
res.swap(old_head->data); // 移出数据

// 3. 检查是否有其他线程的 HP 指向 old_head
if (outstanding_hazard_pointers_for(old_head)) {
reclaim_later(old_head); // 4. 如果有,推迟删除
} else {
delete old_head; // 5. 如果没有,立即删除
}
delete_nodes_with_no_hazards(); // 6. 尝试清理待回收列表
}
return res;
}
};
- 内层循环 (标记 ①):这是关键的安全保障。从 head.load() 读取 old_head 到 hp.store(old_head) 之间存在时间窗口。如果在此期间 head 被改变(其他线程 pop 了 old_head 并可能删除了它),那么 hp.store 就会设置一个指向无效内存的风险指针。 - 解决方案:在设置 HP 后立即重新读取 head,并与设置 HP 前读取的值 (temp) 进行比较。如果两者不同,说明 head 在此期间被修改了,设置的 HP 可能无效,必须重试内层循环,直到成功设置 HP 且 head 在此期间保持不变。

  • 外层循环:这是标准的 CAS 循环,用于原子地更新 head 指针。如果 CAS 失败(compare_exchange_strong 返回 false),它会自动将 old_head 更新为当前 head 的值,然后外层循环会继续,重新执行内层循环以安全地设置新的风险指针。

  • 清除 HP (标记 ②):一旦 CAS 成功(当前线程“拥有”了 old_head)或者发现栈为空 (old_head 为 nullptr),就不再需要保护对 old_head 的访问了,必须清除风险指针,允许其他线程回收它(如果是其他线程 pop 的)。

  • 回收决策 (标记 ③, ④, ⑤):在成功 pop 后,调用 outstanding_hazard_pointers_for 检查所有其他线程的 HP。根据结果决定是立即 delete 还是调用 reclaim_later。

  • 清理 (标记 ⑥):调用 delete_nodes_with_no_hazards 尝试清理全局的“待回收”列表。

辅助函数

get_hazard_pointer_for_current_thread() (获取当前线程的风险指针槽位): 这个函数的目标是为调用它的线程分配并返回一个专属的风险指针槽位(类型为 std::atomic<void*>&),该线程可以用这个槽位来声明它正在使用的指针。实现的关键在于线程本地存储 (thread_local) 和槽位分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 全局风险指针数组 (固定大小)
unsigned const max_hazard_pointers = 100;
struct hazard_pointer {
std::atomic<std::thread::id> id; // 槽位所有者的线程ID
std::atomic<void*> pointer; // 存储的风险指针
};
hazard_pointer hazard_pointers[max_hazard_pointers]; // 全局数组, 存储所有线程的风险指针槽位

// RAII 类,用于管理线程对 HP 槽位的所有权
class hp_owner {
hazard_pointer* hp; // 指向该线程拥有的槽位

public:
hp_owner(hp_owner const&) = delete;
hp_owner operator=(hp_owner const&) = delete;

hp_owner() : hp(nullptr) {
// 构造时:遍历全局数组,尝试获取一个空闲槽位
for (unsigned i = 0; i < max_hazard_pointers; ++i) {
std::thread::id old_id; // 期望 ID 是空的 (默认构造)
// 6. 原子地尝试将槽位 i 的 id 从空改为当前线程 id
if (hazard_pointers[i].id.compare_exchange_strong(
old_id, std::this_thread::get_id()))
{
hp = &hazard_pointers[i]; // 获取成功,保存槽位指针
break; // 7. 停止搜索
}
}
// 1. 如果找不到空闲槽位,抛异常 (资源耗尽)
if (!hp) {
throw std::runtime_error("No hazard pointers available");
}
}

// 返回该线程拥有的风险指针槽位的引用
std::atomic<void*>& get_pointer() {
return hp->pointer;
}

// 2. 析构时:释放槽位所有权
~hp_owner() {
hp->pointer.store(nullptr); // 8. 清空指针
hp->id.store(std::thread::id()); // 9. 清空所有者 ID
}
};

// 3. 获取风险指针的接口函数
std::atomic<void*>& get_hazard_pointer_for_current_thread() {
// 4. 每个线程第一次调用时会创建一个 hp_owner 实例
thread_local static hp_owner hazard;
// 5. 返回该线程专属的风险指针槽位
return hazard.get_pointer();
}
- thread_local static hp_owner hazard (④): 这是核心。thread_local 确保每个线程都有自己独立的 hazard 对象。static 确保它只被构造一次(在该线程首次调用此函数时)。

  • hp_owner 构造函数 (⑥, ⑦, ①): 当线程首次调用 get_hazard_pointer_for_current_thread() 时,hazard 对象被构造。构造函数会遍历全局 hazard_pointers 数组,使用 compare_exchange_strong 原子地尝试“认领”一个 id 为空的槽位。一旦成功,它就保存指向该槽位的指针 hp 并退出。如果遍历完都找不到空槽位(说明活跃线程数超过了 max_hazard_pointers),则抛出异常。

  • hp_owner 析构函数 (②, ⑧, ⑨): 当线程退出时,其 thread_local 的 hazard 对象会被析构。析构函数负责将风险指针清空 (⑧),并将槽位的 id 设回空 (⑨),以便其他新线程可以复用这个槽位。

  • 返回值 (⑤): 函数最终返回 hazard.get_pointer(),这是对该线程已成功认领的那个 std::atomic<void*> 槽位的引用。

outstanding_hazard_pointers_for(void* p) (检查指定指针是否危险): 这个函数由回收者调用,用于判断一个它想要删除的节点指针 p 当前是否被任何其他线程的风险指针所引用。

1
2
3
4
5
6
7
8
9
10
bool outstanding_hazard_pointers_for(void* p) {
// 遍历全局风险指针数组
for (unsigned i = 0; i < max_hazard_pointers; ++i) {
// 原子地读取槽位 i 的指针值,并与 p 比较
if (hazard_pointers[i].pointer.load() == p) {
return true; // 找到了匹配,说明指针 p 当前是危险的
}
}
return false; // 遍历完没有找到匹配,指针 p 当前是安全的
}
- 逻辑: 非常直接。它线性扫描 hazard_pointers 数组中的每一个槽位。对每个槽位,它原子地 (load()) 读取当前存储的指针值,并与传入的、想要删除的节点指针 p 进行比较。 - 效率: 这是一个 O(N) 操作,其中 N 是 max_hazard_pointers。这是风险指针机制的主要性能开销点之一,尤其是在 N 很大时。

reclaim_later(T* data) (将节点加入待回收列表): 当 pop 操作成功移除一个节点 old_head,并且 outstanding_hazard_pointers_for(old_head) 返回 true 时,回收者不能立即删除 old_head,必须调用此函数将其放入“待回收”列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 通用删除函数模板
template<typename T>
void do_delete(void* p) {
delete static_cast<T*>(p);
}

// 待回收链表的节点结构
struct data_to_reclaim {
void* data; // 指向要删除的原始节点
std::function<void(void*)> deleter; // 正确的删除器 (类型擦除)
data_to_reclaim* next; // 指向链表下一个节点

// 1. 构造函数:保存指针和对应的删除器
template<typename T>
data_to_reclaim(T* p) :
data(p),
deleter(&do_delete<T>), // 使用函数指针指向正确的删除函数
next(nullptr)
{}

// 2. 析构函数:调用正确的删除器
~data_to_reclaim() {
deleter(data);
}
};

// 全局待回收链表的头指针 (原子)
std::atomic<data_to_reclaim*> nodes_to_reclaim;

// 3. 将节点原子地添加到待回收链表头部
void add_to_reclaim_list(data_to_reclaim* node) {
node->next = nodes_to_reclaim.load(); // 读取当前头
// 使用 CAS 循环尝试更新头指针
while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
}

// 4. reclaim_later 接口函数 (模板)
template<typename T>
void reclaim_later(T* data) {
// 5. 创建 data_to_reclaim 节点 (包含指针和删除器),并添加到链表
add_to_reclaim_list(new data_to_reclaim(data));
}
- data_to_reclaim 结构: 这是待回收链表的节点。 - data: 存储指向原始待删除节点(例如 lock_free_stack::node)的 void。 - deleter (①): 关键。因为回收机制是通用的 (void),我们需要一种方法在最终删除时调用正确类型的 delete。这里使用了 std::function (或函数指针) 来存储一个类型擦除的删除器。构造函数 (①) 会根据传入的 T* 自动设置正确的 do_delete。 - next: 指向链表中的下一个 data_to_reclaim 节点。

  • 析构函数 (②): 当 data_to_reclaim 对象本身被 delete 时(在 delete_nodes_with_no_hazards 中),它的析构函数会调用存储的 deleter 来真正释放原始节点 data 的内存。

  • nodes_to_reclaim: 一个原子指针,指向待回收链表的头节点。必须是原子的,因为多个线程可能同时调用 reclaim_later。

  • add_to_reclaim_list (③): 实现了一个无锁的链表头插法。它读取当前的头,设置新节点的 next 指向它,然后使用 CAS 循环尝试原子地更新 nodes_to_reclaim 指针。

  • reclaim_later (④, ⑤): 模板函数,负责创建 data_to_reclaim 节点(分配内存并设置好数据指针和删除器),然后调用 add_to_reclaim_list 将其加入全局链表。

delete_nodes_with_no_hazards() (尝试清理待回收列表): 这个函数由回收者线程(通常是在 pop 成功后)调用,尝试遍历待回收列表,并删除那些当前不再被任何风险指针引用的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void delete_nodes_with_no_hazards() {
// 6. 原子地取走整个待回收链表,并将全局头设为 nullptr
data_to_reclaim* current = nodes_to_reclaim.exchange(nullptr);

while (current) { // 遍历取走的链表
data_to_reclaim* const next = current->next; // 先保存下一个节点

// 7. 检查当前节点的数据指针是否还在任何 HP 槽位中
if (!outstanding_hazard_pointers_for(current->data)) {
// 8. 如果不在,安全删除 data_to_reclaim 节点
// (其析构函数会调用 deleter 删除原始数据)
delete current;
} else {
// 9. 如果还在,不能删除,将其重新添加回全局待回收链表
add_to_reclaim_list(current);
}
current = next; // 处理下一个节点
}
}
- 原子夺取链表 (⑥): nodes_to_reclaim.exchange(nullptr) 是关键的第一步。它原子地将全局链表头设为 nullptr,并返回之前的头指针。这确保了只有一个线程在处理这一批待回收节点,避免了对链表本身的并发修改(除了 add_to_reclaim_list 的头插)。

  • 遍历与检查 (⑦): 对取下的链表进行遍历。对于每个 data_to_reclaim 节点 current,调用 outstanding_hazard_pointers_for(current->data) 检查其存储的原始数据指针是否危险。

  • 安全删除 (⑧): 如果检查结果为 false(不危险),就 delete current。这会触发 data_to_reclaim 的析构函数,进而调用正确的 deleter 删除原始节点。

  • 重新添加 (⑨): 如果检查结果为 true(危险),就调用 add_to_reclaim_list(current) 将这个 data_to_reclaim 节点重新放回全局待回收链表,等待下一次清理。

性能问题与优化

主要瓶颈:outstanding_hazard_pointers_for 需要遍历所有可能的风险指针槽位(max_hazard_pointers 个),并且 delete_nodes_with_no_hazards 对待回收链表中的每个节点都要执行一次这个遍历。在高线程数和高回收压力下,这会非常慢。

优化策略:

  • 批量回收 (Batching):delete_nodes_with_no_hazards 不在每次 pop 后都调用,而是等到“待回收”链表的长度达到某个阈值(例如 R * max_hazard_pointers,R > 1)时才触发一次。这可以摊销扫描 HP 列表的成本。
  • 线程本地回收列表 (Thread-local Reclaim Lists):每个线程将自己 pop 的节点放入自己的待回收列表。只有当本地列表达到阈值时,才执行 HP 检查并尝试回收。这避免了对全局回收列表头部的争用。线程退出时需要将本地列表合并到全局列表或其他机制处理。

总结: 风险指针是一种有效的无锁内存回收机制。它通过让读取线程显式声明正在使用的指针,使得回收线程能够安全地判断何时可以删除节点。

优点:解决了 Use-After-Free 问题,使得无锁数据结构可行。相比某些计数方法,对读取路径的性能影响相对可控(主要是设置/清除 HP 的原子操作)。

缺点:实现复杂,回收操作(扫描 HP 列表)开销较大,存在固定大小(HP 列表)的限制,需要合理配置 max_hazard_pointers。

解决方案2: 引用计数 (Reference Counting)

继风险指针之后,本节介绍了另一种解决无锁内存管理问题的主流技术——引用计数 (Reference Counting)。

核心思想:为每一个可能被共享的节点维护一个引用计数器。这个计数器追踪当前有多少“引用”(指针)指向该节点。当引用计数降为 0 时,表明没有任何指针再指向该节点,因此可以安全地将其删除。

与风险指针的区别在于, 风险指针是读取者主动声明“我正在用这个”节点,回收者被动检查所有声明。而引用计数是每个节点被动记录有多少引用指向它,回收者(通常是最后一个释放引用的线程)根据计数值主动决定是否删除。

挑战:如何原子性地、无锁地管理这个引用计数?简单的 ++count 和 –count 本身就需要同步。

方案 1:使用 std::shared_ptr

最直接的想法是利用 C++ 内置的、自带原子引用计数std::shared_ptr

实现思路:将链表的 head 指针和每个节点的 next 指针都改为 std::shared_ptr<node> 类型。同时,head 本身需要是原子的,即 std::atomic<std::shared_ptr<node>> head;(或者使用之前介绍的 shared_ptr 原子自由函数)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template<typename T>
class lock_free_stack // 基于 shared_ptr 的理想化版本
{
private:
struct node {
std::shared_ptr<T> data;
std::shared_ptr<node> next; // next 也是 shared_ptr
node(T const& data_) : data(std::make_shared<T>(data_)) {}
};

std::shared_ptr<node> head; // head 也是 shared_ptr (假设其原子操作无锁)
public:
void push(T const& data) {
std::shared_ptr<node> const new_node = std::make_shared<node>(data);
new_node->next = std::atomic_load(&head); // 原子加载 head
// 原子 CAS 更新 head
while (!std::atomic_compare_exchange_weak(&head,
&new_node->next, new_node));
}
std::shared_ptr<T> pop() {
std::shared_ptr<node> old_head = std::atomic_load(&head);
// 原子 CAS 更新 head
while (old_head && !std::atomic_compare_exchange_weak(&head,
&old_head, old_head->next));
// 如果 pop 成功 (old_head 非空),返回其 data
// 内存管理完全由 shared_ptr 自动处理!
return old_head ? old_head->data : std::shared_ptr<T>();
}
// ... 析构函数不再需要手动清理 ...
};
优点:代码极其简洁!内存管理(节点的分配和回收)完全由 shared_ptr 自动处理。当最后一个指向某节点的 shared_ptr 被销毁(无论是 head 更新还是 pop 返回的 shared_ptr 最终被销毁),该节点会被自动 delete。

致命缺点:此方案依赖于 std::atomic_is_lock_free(&some_shared_ptr) 返回 true,即平台必须提供对 shared_ptr 的无锁原子操作。C++ 标准不保证这一点!在许多平台上,对 shared_ptr 的原子操作实际上是基于锁的(内部使用互斥量)。如果原子操作有锁,那么整个数据结构就不再是无锁的了,违背了本章的目标。

方案 2:手动实现分离引用计数 (Split Reference Counting)

如果不能依赖无锁的 shared_ptr 原子操作,我们就需要手动实现引用计数。

挑战:直接在节点内放一个 std::atomic<int> count 并不能完全解决问题。在 pop 中,存在一个微妙的竞争:初始化 count=1, 线程 T1 创建局部指针读取 head 指向节点 N,线程 T2 也读取 head 指向 N, 但是现在 count 并没有增加, 因为两个线程都只是读取 head, 并没有去尝试修改所指节点的值, 也就没有机会去增加 count。现在 T1 成功 CAS 将 head 指向 N->next,然后 T1 原子地递减 N 的 count 到 0 并 delete N。此时 T2 仍然持有指向 N 的指针,但 N 已被删除!

解决方案:分离引用计数, 将引用计数分为两部分,以区分“指针引用”和“内部访问引用”。

  • 外部计数 (External Count):追踪有多少指针(head 指针其他节点的 next 指针线程临时持有的指针)指向该节点。
    • 这个计数在每次读取节点指针时增加(例如,在 pop 中读取 head 指针时增加)。
    • 在节点被成功从数据结构中移除(pop 成功 CAS 更新 head)后,外部计数会减少,因为数据结构不再持有对该节点的引用。
  • 内部计数 (Internal Count):追踪有多少线程正在内部访问该节点(例如,在 pop 的 CAS 循环中读取了该节点指针准备访问其 next)。
    • 这个计数在每次线程准备访问节点时增加(例如,在 pop 中成功增加 head 的外部计数后,准备访问该节点时增加)。
    • 在节点访问完成后减少。

安全删除条件:一个节点只有在外部计数为 0 且 内部计数为 0 时才能被安全删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
template<typename T>
class lock_free_stack
{
private:
struct node;

struct counted_node_ptr // 这是一个带外部计数的指针
{
int external_count; // 维护的是 ptr 所指节点的外部计数
node* ptr;
};

struct node // 这是节点结构
{
std::shared_ptr<T> data;
std::atomic<int> internal_count; // 这是内部计数, 初始化为0
counted_node_ptr next; // 带外部计数的 next 指针

node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};

std::atomic<counted_node_ptr> head; // 带外部计数的 head 指针

void increase_head_count(counted_node_ptr& old_counter) // 增加 head 所指节点的外部计数
{
counted_node_ptr new_counter; // 声明一个新的计数器

do
{
new_counter = old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter)); // 尝试更新 head

old_counter.external_count = new_counter.external_count; // 更新传入的 old_counter 的外部计数
}

public:
~lock_free_stack()
{
while(pop()); // 清空栈
}

void push(T const& data) // push 操作
{
counted_node_ptr new_node; // 创建新节点
new_node.ptr=new node(data); // 初始化新节点
new_node.external_count=1; // 外部计数初始化为1, 因为 head 会指向它

new_node.ptr->next=head.load(); // 设置 next 指针
while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
}

std::shared_ptr<T> pop()
{
counted_node_ptr old_head = head.load(); // 读取 head
for(;;)
{
increase_head_count(old_head); // 这里增加的 EC 代表的就是当前持有的这个 old_head 临时指针
node* const ptr = old_head.ptr; // 获取节点指针, 只是方便地获取裸指针进行解引用。
if(!ptr)
{
return std::shared_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next)) // 尝试更新 head
{
std::shared_ptr<T> res;
res.swap(ptr->data);

int const count_increase = old_head.external_count - 2; // 它代表在 CAS 成功移除 head 指针后,除了 head 指针和当前线程的临时引用之外,剩余的外部引用数量。这些主要是其他线程在 increase_head_count 中增加的临时外部引用。

if(ptr->internal_count.fetch_add(count_increase)== -count_increase){ // 节点最终被删除的时刻,是当最后一个持有(无论是外部还是内部)引用的线程,在释放其引用时,发现总计数(合并计算后体现在 internal_count 上)变为 0 的那一刻。
delete ptr;
}

return res; // 返回弹出的数据
}
else if(ptr->internal_count.fetch_sub(1)==1) // CAS 失败, 因为其他线程修改了 head, 准备重来一次
// 而此时 ptr 仍然被当前线程持有, 所以要把它的内部计数减1
{
delete ptr;
}
}
}
};

总结

使用 std::shared_ptr 进行无锁引用计数是最理想的,但依赖于平台对 shared_ptr 原子操作的无锁支持。

手动实现的分离引用计数是一种可行的替代方案,但极其复杂。核心难点是需要原子地同时操作指针和它的外部计数(通常需要 DWCAS 支持才能无锁),并且需要仔细设计内部/外部计数的增减逻辑,以确保在 CAS 成功和失败路径下都能正确判断何时可以安全删除节点。这种复杂性是无锁内存管理的主要障碍之一。

应用于无锁栈上的内存模型

在前面的小节中,我们构建了一个使用分离引用计数的无锁栈,并且所有原子操作都使用了默认的 std::memory_order_seq_cst 内存顺序。虽然这样做保证了正确性(因为 seq_cst 提供了最强的保证),但在性能敏感的无锁编程中,它可能是不必要且低效的。

本节的目标是重新审视这个实现,并根据操作之间真正必要的依赖关系,尝试放松内存顺序要求(例如,使用 acquire, release, relaxed),以期提高性能,同时维持正确性。

放松内存顺序的基本方法是: 1. 识别依赖关系:分析代码,找出哪些操作的结果(特别是内存写入)必须对其他线程的哪些操作(特别是内存读取)可见,以保证程序的逻辑正确性(避免数据竞争、保证读取到有效数据)。 2. 确定同步点:找出负责在线程间建立这种可见性保证的关键原子操作。 3. 选择最小顺序:为每个原子操作选择最弱(最宽松)的内存顺序标签,只要它能满足第1步中识别出的所有必要依赖关系。

分析 push 操作

核心操作:

  • 创建新节点 new_node (包含 data 和 next=nullptr)。
  • new_node->ptr->next = head.load(…) (读取旧 head 以设置 next)。
  • head.compare_exchange_weak(…, new_head, …) (CAS 更新 head)。

依赖关系:

  • push -> pop:当一个 pop 线程通过 increase_head_count 中的 CAS 成功读取到 push 写入的 new_head 值时,它需要能够安全地访问 new_head.ptr->data 和 new_head.ptr->next (这个 next 是在步骤 2 中设置的)。

  • 这意味着 push 中对 new_node (包括其 data 和 next) 的写入必须先行于 (happen-before) pop 线程对这些成员的读取。

同步点:push 中的 head.compare_exchange_weak (CAS) 成功时,是 new_node 对 pop 线程“可见”的时刻。这个 CAS 操作需要建立同步。

内存顺序选择:

  • head.compare_exchange_weak (成功):为了与 pop 中的 acquire 操作同步,这里需要 memory_order_release 语义。它确保在 CAS 之前的所有写入(对 new_node 的初始化)对成功读取该值的 acquire 操作是可见的。

  • head.compare_exchange_weak (失败):如果 CAS 失败,说明 head 被修改了,push 操作会继续循环。失败本身不需要与任何其他线程同步。可以使用 memory_order_relaxed。

  • head.load() (用于设置 next):这次 load 发生在 release CAS 之前。它的结果只用于设置 new_node->ptr->next 和作为 CAS 的 expected 值。它不需要从其他线程“获取”任何信息,也不需要“释放”任何信息。可以使用 memory_order_relaxed。

优化后的 push 实现如下:

1
2
3
4
5
6
7
8
9
10
void push(T const& data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
// next 指针的设置读取 head 时使用 relaxed
new_node.ptr->next = head.load(std::memory_order_relaxed);
// CAS 循环:成功用 release,失败用 relaxed
while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
std::memory_order_release, std::memory_order_relaxed));
}

分析 pop 操作

pop 的逻辑更复杂,涉及多个原子操作和依赖关系。

核心操作:

  • old_head = head.load(…) (初始加载)。
  • increase_head_count(old_head): 内部循环 CAS (head.compare_exchange_strong) 以增加 external_count。
  • ptr = old_head.ptr。
  • if (ptr == nullptr) … (空栈判断)。
  • head.compare_exchange_strong(old_head, ptr->next, …) (主 CAS,尝试移除节点)。
    • 成功路径: res.swap(ptr->data), ptr->internal_count.fetch_add(count_increase, …) (更新内部计数), delete ptr (如果计数为0)。
    • 失败路径: ptr->internal_count.fetch_add(-1, …) (或 release_ref), delete ptr (如果计数为0)。

依赖关系:

  • pop <- push: pop 需要看到 push 对 ptr->data 和 ptr->next 的写入。

  • pop (访问) <- pop (删除): pop 线程 T1 在访问 ptr->next 或 ptr->data 时,必须确保没有其他 pop 线程 T2 已经将 ptr 删除。

  • pop (修改 ptr->data) -> pop (删除 ptr): 对 ptr->data 的修改 (通过 swap) 必须先行于对 ptr 的 delete 操作(由将引用计数减为 0 的那个线程执行)。

同步点与内存顺序选择:

  • increase_head_count 中的 CAS:
    • 目的:安全地读取 head 并增加外部计数,同时确保读取到的 ptr 是有效的,并且能看到 push 写入的 ptr->next 和 ptr->data。
    • 顺序: 这个 CAS 读取 head。为了与 push 的 release CAS 同步,它在成功时需要 memory_order_acquire 语义。失败时可以relaxed
  • 主 CAS (head.compare_exchange_strong(old_head, ptr->next, …)):
    • 目的: 原子地将 head 指向下下个节点。
    • 依赖: 它需要读取 ptr->next。ptr->next 的可见性已经由 increase_head_count 的 acquire CAS 保证了。
    • 同步: 这个 CAS 写入 head。这个写入需要与后续的 pop 操作同步吗?不一定需要强同步,因为后续的 pop 会执行自己的 acquire 操作。因此,这个 CAS 可以使用 memory_order_relaxed。
  • internal_count.fetch_add(count_increase, …) (成功路径):
    • 目的: 将外部计数转移到内部计数,并检查是否可以删除。
    • 依赖: 必须发生在 res.swap(ptr->data) 之后(序前)。
    • 同步: 如果这个 fetch_add 导致计数为 0,当前线程将执行 delete ptr。为了确保 swap 操作先行于 delete(即使在弱排序内存模型下),这个 fetch_add 需要 memory_order_release 语义。它确保了在 fetch_add 之前的所有内存操作(包括 swap)对于之后(在同一个线程中)的 delete 操作是可见且有序的。
  • internal_count.fetch_add(-1, …) (失败路径,或 release_ref):
    • 目的: 减少当前线程持有的内部引用。如果计数变为 0,则删除节点。
    • 同步: 如果这个操作导致计数为 0,当前线程需要 delete ptr。删除操作需要确保能看到所有先前对该节点的操作。原文的最终实现(清单 7.12)在这里使用了 memory_order_relaxed,但在 delete 之前增加了一个额外的 ptr->internal_count.load(std::memory_order_acquire)。
    • 理由 (推测): 可能是为了确保在执行 delete 之前,确实获取了所有其他线程对该 internal_count 的 release 修改(虽然这里没有明显的配对 release)。或者,这是一种防御性编程,确保在 delete 前有一个 acquire 屏障。这部分逻辑比较微妙,原文解释也略显模糊。但遵循最终代码是安全的。

优化后的 pop 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::shared_ptr<T> pop() {
counted_node_ptr old_head = head.load(std::memory_order_relaxed); // relaxed
for (;;) {
increase_head_count(old_head); // 内部 CAS: acquire on success, relaxed on failure
node* const ptr = old_head.ptr;
if (!ptr) { return std::shared_ptr<T>(); }

// 主 CAS 使用 relaxed
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed))
{
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase = old_head.external_count - 2;
// 成功路径 fetch_add 使用 release
if (ptr->internal_count.fetch_add(count_increase,
std::memory_order_release) == -count_increase)
{
delete ptr;
}
return res;
}
// 失败路径 fetch_add 使用 relaxed
else if (ptr->internal_count.fetch_add(-1,
std::memory_order_relaxed) == 1)
{
// 在 delete 前加一个 acquire load (来自清单 7.12)
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}

总结:

放松内存顺序是一项优化,目标是减少不必要的同步开销,提高性能。它需要非常仔细地分析操作之间的数据依赖和控制依赖,以确定最低要求的内存顺序。

基本原则:

  • 如果一个写操作 W 的结果需要被另一个线程的读操作 R 看到,W 通常需要 release (或 acq_rel, seq_cst),R 通常需要 acquire (或 acq_rel, seq_cst, consume)。

  • 如果一个原子操作的结果只在当前线程内部使用,或者它不需要建立跨线程的先行关系,那么 relaxed 通常是足够的。

  • RMW 操作(如 fetch_add)本身构成了释放序列的一部分,这有助于同步传递。

复杂性:正确地推理和验证放松后的内存顺序极其困难,容易出错。相比之下,seq_cst 虽然可能慢一些,但更容易保证正确性。因此,放松内存顺序应谨慎进行,并最好有充分的性能测试数据支持。

无锁的线程安全队列

与栈的不同之处: - 栈 (LIFO):push 和 pop 都作用于同一个端点(head 指针)。主要挑战在于同步对 head 的并发修改以及弹出节点的内存管理。

  • 队列 (FIFO):
    • push 操作作用于尾部 (tail)。
    • pop 操作作用于头部 (head)。
    • 新的挑战:需要确保对一端的修改(例如 push 修改了 tail 或最后一个节点的 next)能够正确、安全地被另一端的操作(例如 pop 读取 head->next)所观察到。
    • 潜在的优势:由于操作发生在不同端点,理论上 push 和 pop 之间存在更大的并行可能性。

单生产者/单消费者 (SPSC) 的简单情况

如果队列严格限制为只有一个线程执行 push,一个线程执行 pop,那么无锁实现可以非常简单(甚至比无锁栈更简单)。实现思路:

  • 使用哑节点 (Dummy Node)来解耦头尾。

  • head 和 tail 都是原子指针 (std::atomic<node*>)。

  • push (生产者):

    1. 准备新节点 p 和数据 new_data。
    2. 获取当前 old_tail = tail.load()。
    3. 将 new_data 存入 old_tail->data(原子地或非原子地,取决于 data 是否原子)。
    4. 原子地设置 old_tail->next = p(需要 old_tail->next 也是原子的,或者确保只有生产者修改它)。
    5. 原子地更新 tail.store(p)。
  • pop (消费者):

    1. 读取 old_head = head.load()。
    2. 检查是否为空 (old_head == tail.load())。
    3. 原子地更新 head.store(old_head->next)。
    4. 读取 old_head->data。
    5. delete old_head (在 SPSC 中是安全的,因为只有消费者会删除,生产者不会持有旧 head 的引用)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
template<typename T>
class lock_free_queue
{
private:
struct node
{
std::shared_ptr<T> data;
node* next;

node():
next(nullptr)
{}
};

std::atomic<node*> head;
std::atomic<node*> tail;

node* pop_head()
{
node* const old_head=head.load();
if(old_head==tail.load()) // 1
{
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node),tail(head.load())
{}

lock_free_queue(const lock_free_queue& other)=delete;
lock_free_queue& operator=(const lock_free_queue& other)=delete;

~lock_free_queue()
{
while(node* const old_head=head.load())
{
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop()
{
node* old_head=pop_head();
if(!old_head)
{
return std::shared_ptr<T>();
}

std::shared_ptr<T> const res(old_head->data); // 2
delete old_head;
return res;
}

void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p=new node; // 3
node* const old_tail=tail.load(); // 4
old_tail->data.swap(new_data); // 5
old_tail->next=p; // 6
tail.store(p); // 7
}
};

SPSC 安全性分析:在严格的 SPSC 约束下,上述实现是可以工作的。生产者只修改 tail 附近,消费者只修改 head 附近,它们的操作通过 head == tail 检查和 next 指针链条隐式同步。内存回收也简单。

问题:现实世界很少是严格的 SPSC。我们需要 MPMC(多生产者,多消费者)。

多生产者/多消费者 (MPMC) 的复杂情况