5. 基于锁的并发设计结构

ZaynPei Lv6

为并发设计

首先, 什么是并发数据结构?一个并发数据结构是指一个可以被多个线程同时访问的数据结构。

也就是说, 其最低要求是线程安全 (Thread Safety)

这意味着,即使多个线程并发地对数据结构执行操作(无论是相同还是不同的操作),数据结构也必须保持其内部一致性:

  • 无数据丢失或损坏。
  • 所有的不变量 (Invariants) 必须始终保持(或者说,没有线程能看到不变量被破坏的中间状态)。
  • 无数据竞争 (Data Races)。

然而, 仅仅“线程安全”是不够的:许多简单的方法可以实现线程安全(例如,用一个巨大的互斥锁锁住整个数据结构),但这样做往往会牺牲性能。

因此, 本节的核心论点是:设计并发数据结构的真正意义不仅仅在于线程安全,更在于允许真正的并行处理,从而提高程序的性能, 高效地利用多核处理器, 实现”真正的并发”。

“真并发” (True Concurrency):目标是让多个线程能够在同一时间点上有效地数据结构的不同部分执行不同的(非冲突的)操作上取得进展。这与仅仅保证“一次只有一个线程能安全访问”形成了鲜明对比。

序列化 (Serialization), 或者说串行化则是并发的天敌:当你使用单一的全局互斥量来保护整个数据结构时(就像第3章、第4章中的简单栈和队列实现),虽然保证了线程安全,但也引入了序列化。

序列化意味着:无论有多少线程想要访问数据结构,它们都必须排队等待获取那唯一的锁。在任何时刻,只有一个线程能够实际工作。这实际上将并发访问变成了串行访问(一个接一个),完全抵消了使用多线程带来的潜在性能优势。CPU 核心闲置,线程都在等待

总之, 这一章设计的核心目标是:

  • 减少保护区域 (Reduce Protected Sections):尽量缩小必须持有锁的代码范围。
  • 减少序列化 (Reduce Serialization):尽量避免让所有线程都争抢同一个锁。
  • 提升并发访问的潜力 (Increase Potential for Concurrency):通过更精细的控制,允许多个线程在不冲突的情况下同时操作数据结构。

这需要我们超越简单的单锁模型,仔细思考如何分解数据结构,使用更细粒度的锁或其他同步机制,以最大限度地减少线程等待,最大化并行处理的机会。

指导原则第一方面:确保访问安全 (Ensure Safety)

要想设计真正并发的数据结构,首先必须确保访问安全/线程安全。这意味着无论有多少线程同时访问数据结构,都不会导致数据损坏或不一致。

这部分原则主要是对第3章中已经讨论过的线程安全问题的回顾和强调,因为安全是并发设计的基石。

  1. 确保不变量不被破坏时可见 (Protect Invariants):

    • 核心:必须保证任何线程在任何时候都不会访问到数据结构处于“中间状态”(即不变量暂时被破坏时)的数据。
    • 实现:通常通过锁(互斥量)来保护那些修改数据结构并可能暂时破坏不变量的代码段。
  2. 小心接口造成的条件竞争 (Design Interfaces Carefully):

    • 核心:即使每个单独的成员函数都是线程安全的(内部加锁),函数之间的组合调用也可能产生条件竞争(例如第3章 stack 的 empty/top 问题)。

    • 建议:接口应该提供原子性的操作,完成一个完整的逻辑功能,而不是提供一系列需要用户组合起来才能完成功能的“步骤”函数。

  3. 注意异常安全 (Ensure Exception Safety):

    • 核心:当数据结构的操作(特别是在持有锁时)抛出异常,必须保证数据结构的状态仍然是有效的(不变量未被永久破坏),并且锁能够被正确释放。

    • 实现:

      • 使用 RAII 锁管理(如 std::lock_guard, std::unique_lock)来确保锁在异常时自动释放。

      • 仔细设计操作步骤,确保在可能抛出异常的操作(如内存分配、拷贝/移动用户数据)失败时,数据结构能回滚到一致状态,或者至少保持有效状态。

  4. 死锁风险降至最低 (Minimize Deadlock Risk):

    • 核心:死锁是并发设计中的常见陷阱,尤其是在使用多个锁时。

    • 建议:

      • 限制锁的范围:尽可能缩短持有锁的时间。

      • 避免嵌套锁:一个线程已持有锁A时,避免再去获取锁B。

      • 按固定顺序加锁:如果必须获取多个锁,所有线程都按同一顺序获取。

      • 避免在持有锁时调用用户代码:防止用户代码尝试获取其他锁导致死锁。

  5. 考虑特殊成员函数:

    • 构造函数/析构函数:通常需要独占访问。用户必须保证在构造完成前析构开始后,没有其他线程访问该对象。

    • 拷贝构造/赋值/swap():如果你的数据结构支持这些操作,作为设计者,你需要明确它们在并发环境下的行为:它们是线程安全的吗?它们需要独占访问吗?用户在使用这些操作时需要注意什么?

指导原则第二方面:确保真正的并发访问 (Enable True Concurrency)

这部分原则是本章的新重点,关注如何在保证安全的前提下,最大化数据结构的并行处理能力。

  1. 锁的范围能否缩小? (Minimize Lock Scope)

    • 问题:当前持有锁执行的操作中,是否有部分可以安全地移到锁的范围之外执行?

    • 示例:内存分配 (new)、数据的拷贝/准备等耗时操作,如果可以,应在获取锁之前或释放锁之后进行(如在 push 之前 make_shared 的例子)。

  2. 能否使用多个锁? (Fine-grained Locking)

    • 问题:数据结构的不同部分是否可以由不同的互斥量来保护?

    • 目标:实现细粒度锁。如果线程A操作数据结构的A部分(锁A),线程B操作B部分(锁B),它们就可以并行执行。

    • 挑战:增加了复杂性,需要仔细管理多个锁,并警惕死锁(如果一个操作需要同时获取多个锁)。

  3. 是否所有操作都需要同级锁? (Different Lock Types)

    • 问题:对于“只读”操作和“读写”操作,是否需要相同的(独占)锁保护?

    • 目标:利用读写锁(如 std::shared_mutex)。允许多个“读者”线程并发访问,只有“写者”线程需要独占访问。

    • 适用场景:“读多写少”的数据结构(如查询表)。

  4. 能否通过修改结构来增加并发? (Modify Structure for Concurrency)

    • 问题:当前数据结构的内部组织方式是否本身就限制了并发?能否通过简单的修改来减少冲突点?

    • 示例:后面将要介绍的队列实现,通过引入一个“哑节点”,将 push(操作尾部)和 pop(操作头部)的操作目标分离开,从而允许它们在大部分时间里使用不同的锁并行执行。

所有这些并发指导原则都服务于一个核心思想:如何在保证安全(通过必要的锁和同步)的前提下,最大限度地减少线程因锁而产生的等待(序列化),从而最大化线程能够并行执行(真并发)的机会?

这是一个权衡 (Trade-off) 的过程。更细粒度的锁、更复杂的同步机制可以带来更高的并发潜力,但也显著增加了设计的复杂性、出错(死锁、竞争)的风险以及可能的额外开销。设计者需要根据数据结构的具体使用场景和性能需求来做出明智的选择。

基于锁的并发数据结构

首先, 基于锁的设计天然就面临着挑战:

  • 锁的本质是互斥 (Mutual Exclusion):它通过阻止并发访问来保证安全。
  • 并发的目标是并行 (Parallelism):我们希望允许多个线程同时工作。

这两者本身就存在一定的矛盾。简单地使用一个大锁保护整个数据结构虽然安全,但会完全序列化访问,牺牲并发性。因此,本节的目标是探索如何在必要的保护(使用锁)与最大化的并发之间找到平衡点。

本节的核心策略是:最小化锁的影响

对于基于锁的数据结构,提高并发性的关键策略在于最小化锁的持有时间范围

  • 持有锁的时间最短:只在绝对必要(访问或修改共享状态)时才持有锁。耗时的操作(如内存分配、复杂计算、I/O)应尽可能在锁外完成。

  • 锁的粒度尽可能小 (Fine-grained Locking):如果数据结构的不同部分可以独立修改,考虑使用多个锁分别保护不同部分,而不是用一个锁保护所有部分。

线程安全栈——使用锁

这一节的目标是重新审视我们在第3章实现的那个线程安全的栈,并使用上一节提出的指导原则来对其进行严格的分析,特别是评估它的安全性并发性

首先,我们回顾一下这个栈的实现。它本质上是用一个 std::mutex 包装了一个标准的 std::stack。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <exception>
#include <stack> // 引入 std::stack
#include <mutex>
#include <memory> // 为了 std::shared_ptr

struct empty_stack: std::exception
{
const char* what() const throw(); // 假设已实现
};

template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m; // mutable 允许 const 成员函数 lock
public:
threadsafe_stack(){} // 默认构造

// 拷贝构造函数
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m); // 锁住源对象
data = other.data; // 在锁内执行拷贝
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 禁止赋值

// push 操作
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // 1. 可能调用 T 的移动构造
}

// pop 操作 (返回 shared_ptr)
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 2. 检查空 (在锁内)
// 3. 创建 shared_ptr (可能调用 T 的拷贝/移动构造)
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top())));
data.pop(); // 4. 从栈中移除 (noexcept)
return res;
}

// pop 操作 (写入引用)
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 检查空 (在锁内)
value = std::move(data.top()); // 5. 移动赋值给 value (可能调用 T 的移动赋值)
data.pop(); // 6. 从栈中移除 (noexcept)
}

// empty 操作
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

安全性分析

  1. 不变量保护: 通过在每一个成员函数(push, pop, empty, 拷贝构造)的入口处使用 std::lock_guard<std::mutex> lock(m);,该实现保证了在任何时刻只有一个线程能够访问底层的 std::stack<T> data。 因此,当一个线程正在修改 data(例如 push 或 pop)并可能暂时破坏 std::stack 的内部不变量时,没有其他线程能够看到这种中间状态。

  2. 接口条件竞争:

    • empty()/pop() 竞争:pop() 函数在获取锁之后、执行任何操作之前,会显式地检查 data.empty() 。因此,即使外部调用者先检查了 empty(),pop() 内部的检查也能保证操作的安全性,避免了 TOCTTOU 竞争。(已处理)

    • top()/pop() 竞争:标准 std::stack 将这两个操作分开,导致竞争。这里的 threadsafe_stack 通过提供合并的 pop 操作(直接返回弹出的值,无论是通过 shared_ptr 还是引用)来避免了这个接口竞争。(已处理)

  3. 异常安全:

    • 锁管理: 使用 std::lock_guard 保证了即使在操作中(如内存分配或用户类型操作)抛出异常,互斥锁 m 也会被自动、安全地释放。

    • push(T new_value): 异常可能来自 data.push() 内部(内存分配失败或 T 的移动构造函数抛异常)。而 std::stack::push 通常提供强异常保证(失败时栈不变)。因此 push 是异常安全的。

    • pop() (shared_ptr 版本):

      • empty_stack 异常 (标记 2): 抛出前未修改数据,安全。
      • make_shared<T>(...) (标记 3): 可能因内存分配失败或 T 的拷贝/移动构造函数抛异常。如果发生,data.pop() (标记 4) 不会被执行,栈保持不变。标准库保证 make_shared 失败时无内存泄漏。因此是异常安全的(强保证)。
      • data.pop() (标记 4): 本身保证不抛异常 (noexcept)。
    • pop(T& value) (引用版本):

      • empty_stack 异常: 安全。
      • value = std::move(data.top()) (标记 5): 可能因 T 的移动赋值运算符抛异常。如果发生,data.pop() (标记 6) 不会被执行,栈保持不变。因此是异常安全的(强保证)。
      • data.pop() (标记 6): 本身保证不抛异常 (noexcept)。
    • empty(): 只读操作,使用 lock_guard,异常安全。

    • 结论: 该栈设计具有良好的异常安全性。

  4. 死锁风险:

    • 主要风险点:在持有互斥锁 m 的同时,调用了用户提供的代码:
      • push 中的 T 的移动构造函数 (间接在 data.push 内), 虽然是隐式调用。
      • pop (shared_ptr) 中的 T 的拷贝/移动构造函数 (在 make_shared 内), 同样是隐式调用。
      • pop (引用) 中的 T 的移动赋值运算符。
      • 拷贝构造函数中的 T 的拷贝赋值运算符 (在 data = other.data 内)。
      • ( 因为T的类型是用户定义的,用户可能在这些操作中尝试获取其他锁,从而引发死锁 )
    • 死锁场景:如果这些用户代码(直接或间接地)尝试再次锁定同一个 threadsafe_stack 实例的互斥锁 m,就会发生死锁(如果用 std::mutex 则是未定义行为)。
    • 责任:实际上, 避免这种由用户代码引起的死锁是用户的责任,数据结构本身无法完全阻止。用户不应该在 T 的操作中对包含它的栈进行操作。(存在风险,依赖用户)
  5. 特殊成员函数:

    • 拷贝构造: 明确定义了,并且通过锁住源对象 (other.m) 来保证拷贝过程的线程安全。
    • 赋值: 被显式删除 (= delete),避免了复杂的、难以保证线程安全的赋值语义。
    • 构造/析构: 它们本身不是线程安全的。用户必须确保在对象完全构造好之前没有其他线程访问它,并且在对象开始析构后也没有线程再访问它。这是标准的C++对象生命周期规则,适用于所有对象,不仅仅是并发数据结构。(用户责任

并发性分析

  1. 锁的范围: 几乎每个成员函数都立即获取锁,并在函数结束时才释放锁。锁的范围覆盖了函数的整个执行过程

  2. 多个锁: 只使用了一个互斥量 m 来保护整个 std::stack data。

  3. 不同锁级别: 没有使用读写锁等不同级别的锁。

  4. 结构修改: 没有对 std::stack 的内部结构进行修改以提高并发性,只是简单地包装

结论: 这个设计通过将所有操作完全序列化 (Serialization) 来保证线程安全。在任何时刻,最多只有一个线程可以对栈执行任何操作(无论是 push, pop 还是 empty)。

因此并发性极差。它没有利用多核处理器的能力。如果多个线程频繁访问栈,它们大部分时间都会阻塞在等待锁上。

而且, 这个实现还存在缺乏等待机制的问题. 当栈为空时,pop 操作会抛出 empty_stack 异常。如果一个“消费者”线程需要等待“生产者”线程向栈中 push 数据,它该怎么办?

目前的实现只能够不断循环调用 try_pop 或 empty (忙等待/轮询), 不断调用 pop 并捕获异常。

这些方案浪费CPU资源,并且可能因为频繁的锁竞争而降低整体性能。

更好的方案则需要数据结构内部提供高效的等待机制,例如使用条件变量。

总之, threadsafe_stack 是一个线程安全数据结构的基本示例。它满足了安全性的基本要求(虽然有用户代码导致死锁的风险)。

然而,它在并发性方面表现很差,因为它将所有操作都序列化了。并且,它缺乏有效的等待机制,不适合典型的生产者-消费者场景。

线程安全队列——使用锁和条件变量

本节实际上是重新审视并分析了第4章已经实现的那个队列, 比起栈, 它引入了条件变量来处理线程等待的问题。

这个队列的核心是使用一个 std::mutex 来保护底层的 std::queue,并使用一个 std::condition_variable 来实现等待功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;

public:
threadsafe_queue() {}

// Push 操作
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value)); // 原文可能是 data,应为 new_value
data_cond.notify_one(); // 1. 唤醒一个等待者
}

// Wait and Pop 操作 (阻塞,写入引用)
void wait_and_pop(T& value) // 2
{
std::unique_lock<std::mutex> lk(mut); // 必须用 unique_lock
// 等待,直到 lambda 返回 true (队列非空)
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}

// Wait and Pop 操作 (阻塞,返回 shared_ptr)
std::shared_ptr<T> wait_and_pop() // 3
{
std::unique_lock<std::mutex> lk(mut); // 必须用 unique_lock
// 4. 等待
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
// 可能抛异常点 1: make_shared 分配内存
// 可能抛异常点 2: T 的拷贝/移动构造
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop(); // 不抛异常
return res;
}

// Try Pop 操作 (非阻塞,写入引用)
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut); // 非阻塞,可用 lock_guard
if(data_queue.empty())
return false; // 不等待,立即返回
value = std::move(data_queue.front()); // 可能调用 T 的移动赋值
data_queue.pop();
return true;
}

// Try Pop 操作 (非阻塞,返回 shared_ptr)
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut); // 非阻塞,可用 lock_guard
if(data_queue.empty())
return std::shared_ptr<T>(); // 5. 返回空指针表示失败
// 可能抛异常点 1: make_shared
// 可能抛异常点 2: T 的拷贝/移动构造
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}

// Empty 操作
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

与 threadsafe_stack 的对比与分析

这个队列的设计与上述的栈有很多相似之处,但也引入了关键的不同.

相似性:

  • 仍然使用单一互斥量 (mut) 保护整个底层容器 (data_queue)。

  • 接口设计上,同样通过合并 front/pop 操作到 wait_and_pop/try_pop 中,避免了接口固有的条件竞争。

  • try_pop 的逻辑与栈的 pop 非常相似(除了失败时不抛异常,而是返回 false 或 nullptr)。

  • 同样存在因调用用户代码(T的构造/赋值)而导致的死锁风险

  • 构造/析构函数同样不是线程安全的,需要用户保证生命周期管理

关键不同点:

  • 等待机制: wait_and_pop 使用 std::condition_variable::wait 来高效地等待队列非空。这解决了栈实现中消费者需要“忙等待”或轮询的问题。

  • 通知机制: push 操作 (标记 ①) 在添加元素后调用 data_cond.notify_one() 来唤醒一个可能正在 wait_and_pop 中等待的消费者线程。

  • 锁类型: wait_and_pop 必须使用 std::unique_lock,因为 wait 操作需要在等待期间释放锁,并在被唤醒后重新获取锁。push, try_pop, empty 等非阻塞操作则可以使用更高效的 std::lock_guard。

然而, wait_and_pop 引入条件变量的同时也带来了一个新的异常安全挑战

假设多个消费者线程阻塞在 wait_and_pop 的 data_cond.wait() 上。

一个生产者线程调用 push,然后调用 notify_one()唤醒了一个消费者线程 C1。C1 从 wait 返回,成功获取了锁,并且检查到队列非空。

问题点: 在 C1 执行 std::make_shared<T>(...)value = std::move(...) 时,如果 T 类型的构造函数或赋值运算符抛出异常, 后果是 C1 抛出异常,data_queue.pop() 不会被执行,数据项仍然留在队列中。

但是,唤醒 C1 的那个 notify_one 信号已经被“消耗”了。

如果没有其他生产者再 push 新数据并发送新的 notify_one 信号,那么其他原本在等待的消费者线程(C2, C3…)将永远不会被唤醒来处理那个留在队列中的数据项!它们会永久阻塞

这个问题目前有三种可能的解决方案:

  • notify_all(): 在 push 中总是调用 notify_all()。当 C1 异常退出时,其他线程 C2, C3… 也会被唤醒,其中一个会成功处理数据。但是显然效率低下(惊群效应 Thundering Herd)。

  • 在 catch 中重新 notify_one(): 在 wait_and_pop 内部加上 try…catch 可能抛异常的操作,在 catch 块中调用 notify_one() 唤醒另一个线程,然后 throw; 重新抛出异常。缺点是代码复杂。

  • 在队列中存储 std::shared_ptr: 通过将数据项包装在 std::shared_ptr 中,可以确保即使在异常情况下,数据项也能被正确处理。

  • ```cpp template class threadsafe_queue{ private: mutable std::mutex mut; std::queue<std::shared_ptr> data_queue; std::condition_variable data_cond; public: threadsafe_queue() {}

      void wait_and_pop(T& value)
      {
          std::unique_lock<std::mutex> lk(mut);
          data_cond.wait(lk,[this]{return !data_queue.empty();});
          value=std::move(*data_queue.front());  // 1
          data_queue.pop();
      }
    
      bool try_pop(T& value)
      {
          std::lock_guard<std::mutex> lk(mut);
          if(data_queue.empty())
          return false;
          value=std::move(*data_queue.front());  // 2
          data_queue.pop();
          return true;
      }
    
      std::shared_ptr<T> wait_and_pop()
      {
          std::unique_lock<std::mutex> lk(mut);
          data_cond.wait(lk,[this]{return !data_queue.empty();});
          std::shared_ptr<T> res=data_queue.front();  // 3
          data_queue.pop();
          return res;
      }
    
      std::shared_ptr<T> try_pop()
      {
          std::lock_guard<std::mutex> lk(mut);
          if(data_queue.empty())
          return std::shared_ptr<T>();
          std::shared_ptr<T> res=data_queue.front();  // 4
          data_queue.pop();
          return res;
      }
    
      void push(T new_value)
      {
          std::shared_ptr<T> data(
          std::make_shared<T>(std::move(new_value)));  // 5
          std::lock_guard<std::mutex> lk(mut);
          data_queue.push(data);
          data_cond.notify_one();
      }
    
      bool empty() const
      {
          std::lock_guard<std::mutex> lk(mut);
          return data_queue.empty();
      }

    }; 关键改动:

  • push 函数现在在获取锁之前就创建 std::shared_ptr<T>。这包含了可能抛异常的内存分配和 T 的构造。

  • 锁内操作: 在锁保护下,push 只需要将 shared_ptr (这是一个很小的对象) 拷贝/移动到 std::queue 中。这个操作通常很快,并且不太可能抛异常。

  • pop 操作: 在 wait 返回后,pop 操作只需要从队列中拷贝/移动 shared_ptr。拷贝 shared_ptr 是原子操作且不涉及用户代码,非常安全。之后如果需要返回 T& ,则需要解引用并移动赋值,这里仍然存在 T 的移动赋值可能抛异常的风险,但原始数据(shared_ptr)已被安全取出。

优点: - 解决了异常安全问题: wait_and_pop 在锁内执行的操作(拷贝 shared_ptr)基本不会抛异常,大大降低了“丢失唤醒”的风险。 - 提高了性能/并发潜力: 将耗时的内存分配 (make_shared) 和 T 的构造移到了 push 函数的锁范围之外。这意味着 push 操作持有锁的时间大大缩短,其他线程(执行 push, pop 或 empty)等待锁的时间也相应减少,提高了并发的可能性。

并发性分析和总结

核心限制: 无论是原先的线程安全队列还是使用指针优化后的队列,它们都仍然使用单一互斥量 (mut) 来保护整个 data_queue。

后果就是所有对队列的操作(push, wait_and_pop, try_pop, empty)在同一时间点仍然只能有一个线程在执行。

尽管使用指针优化后的队列通过减少锁持有时间略微提高了并发潜力,但本质上这个队列仍然是一个序列化访问的结构,并发性受限于单锁瓶颈。

基于单锁和条件变量的 threadsafe_queue (特别是存储 shared_ptr 的版本) 是一个实用且常用的并发数据结构。它安全、解决了等待问题,并且通过优化减少了锁争用。

要实现更高程度的并发(允许多个 push 或多个 pop 同时进行),就需要打破单一锁的限制,采用更细粒度的锁策略。这正是下一节将要探讨的内容,通过重新实现队列的底层链表结构来引入多个锁。

线程安全队列——使用细粒度锁和条件变量

继上一节指出单锁队列并发性不足后,本节的目标是打破单锁瓶颈,通过使用多个锁(细粒度锁)来显著提升队列的并发性能。

首先问题回顾:上一节的队列虽然安全且解决了等待问题,但所有操作(push, pop, empty)都被同一个互斥量 mut 序列化了。即使一个线程想 push(操作队尾),另一个线程想 pop(操作队头),它们也必须排队等待同一个锁

核心思路:队列的 push 和 pop 操作在逻辑上是作用于队列的两端。我们能否设计一种锁策略,使得对队头队尾的操作能够并行进行?

初步尝试:简单的链表与头尾锁

如果依旧使用deque或std::queue作为底层容器,实现细粒度锁会非常复杂,因为这些容器的内部结构并不支持独立地锁定头部和尾部。

因此在底层结构上, 我们选择放弃 std::queue,自己实现一个简单的单向链表来表示队列, head 指针指向第一个节点, tail 指针指向最后一个节点, 每个节点包含数据和指向下一个节点的 next 指针。

细粒度锁的初步想法: - 用 head_mutex 保护 head 指针。pop 操作需要获取 head_mutex(尾进头出) - 用 tail_mutex 保护 tail 指针。push 操作需要获取 tail_mutex。

致命缺陷 (空队列 & 单元素队列):

  • 空队列:head 和 tail 都为 nullptr。push 第一个元素时,需要同时修改 head 和 tail,必须获取两个锁。pop 需要读取 head。
  • 单元素队列:head 和 tail 指向同一个节点。pop 需要读取 head->next (为 nullptr) 并将 head 设为 nullptr。push 需要读取 tail->next (为 nullptr) 并修改 tail->next 指向新节点,然后更新 tail。
    • 冲突:push 和 pop 同时在竞争访问和修改同一个节点的 next 指针!简单的头尾锁无法解决这个冲突。

解决方案:引入哑节点 (Dummy Node)

为了解耦头尾操作,特别是处理空队列和单元素队列的边界情况,引入了“哑节点”技术:

机制:队列永远包含至少一个节点——这个节点就是“哑节点”,它不存储有效数据。tail 指针始终指向这个哑节点head 指针指向队列中的第一个实际数据节点

空队列:当队列为空时,head 指针也指向哑节点 (即 head == tail)。

操作流程 (简化版):

  • push(value):push 主要操作的是 tail 指针和原 tail 指向的节点。
    1. 创建一个新的哑节点 p。
    2. 将 value 存入当前的哑节点(tail 指向的那个)。
    3. 将当前哑节点的 next 指向新的哑节点 p。
    4. 将 tail 指针更新为指向 p。
  • pop():pop 主要操作的是 head 指针。
    1. 读取 head 指向节点的数据。
    2. 将 head 指针更新为 head->next。
    3. (旧的 head 节点会被 unique_ptr 自动删除, 因为它超出了作用域)。

优点:通过哑节点作为缓冲区,push 和 pop 操作(在队列非空时)访问和修改的是不同的节点和指针,为使用不同的锁提供了基础。

细粒度锁与条件变量的完整实现

基于哑节点,现在可以设计真正的细粒度锁队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#include <memory>               // For std::unique_ptr, std::shared_ptr, std::make_shared
#include <mutex> // For std::mutex, std::lock_guard, std::unique_lock
#include <condition_variable> // For std::condition_variable
template<typename T>
class threadsafe_queue
{
private:

struct node
{
// 数据使用 shared_ptr 包装,以便在锁外安全地复制(返回)。
std::shared_ptr<T> data;
// 使用 unique_ptr 管理下一个节点,实现 RAII 自动内存管理。
std::unique_ptr<node> next;
};

// --- 核心成员变量 ---

std::mutex head_mutex; // 只保护 'head' 指针的访问
std::unique_ptr<node> head; // 队列头(独占所有权)

std::mutex tail_mutex; // 保护 'tail' 指针以及 'tail' 指向的节点
node* tail; // 队列尾(非所有权的观察指针)

std::condition_variable data_cond; // 用于 'wait_and_pop' 的条件变量

// --- 私有辅助函数 (Private Helper Functions) ---

node* get_tail() // 访问 tail 指针的线程安全方法
{
std::lock_guard<std::mutex> tail_lock(tail_mutex); // 先锁定 tail_mutex 再访问 tail
return tail;
}

std::unique_ptr<node> pop_head() // 弹出头节点,返回节点指针 (假设已锁定 head_mutex)
{
std::unique_ptr<node> old_head = std::move(head); //
head = std::move(old_head->next);
return old_head;
}

std::unique_lock<std::mutex> wait_for_data() // 等待直到队列非空, 返回已锁定的 head_mutex
{
// 1. 获取 head_mutex。必须用 unique_lock 以便 'wait' 使用。
std::unique_lock<std::mutex> head_lock(head_mutex);

// 2. 在条件变量上等待。
// lambda 谓词是“队列非空” (head != tail)。
// wait() 会原子地:
// a. 检查谓词,如果为 false (队列空),释放 head_lock 并休眠。
// b. 被唤醒时,重新获取 head_lock,再次检查谓词(防止虚假唤醒)。
// c. 谓词检查 (head.get() != get_tail()) 会在持有 head_lock
// 的情况下调用 get_tail(),这会获取 tail_mutex。
// 锁顺序 (head -> tail) 是安全的,不会死锁。
data_cond.wait(head_lock, [&]{ return head.get() != get_tail(); });

// 3. 此时,队列非空,且 head_lock 仍被持有。
// 通过 std::move 将锁的所有权返回给调用者。
return std::move(head_lock); // 3
}

std::unique_ptr<node> wait_pop_head() // 等待并弹出头节点,返回节点指针。
{
// 1. 等待数据,并获取已锁定的 head_lock。
std::unique_lock<std::mutex> head_lock(wait_for_data());

// 2. 在锁的保护下,调用内部(无锁)的 pop_head()。
return pop_head();
}

std::unique_ptr<node> wait_pop_head(T& value) // 等待并弹出头节点,将数据移入 'value'。
{
// 1. 等待数据,并获取已锁定的 head_lock。
std::unique_lock<std::mutex> head_lock(wait_for_data());

// 2. (关键:异常安全) 先移动数据。
// 如果 T 的移动赋值操作抛异常,pop_head() 不会被调用,
// 锁会释放,但节点仍留在队列中,数据不丢失。
value = std::move(*head->data);

// 3. 数据安全移动后,再弹出节点。
return pop_head();
}

std::unique_ptr<node> try_pop_head() // 尝试弹出头节点,返回节点指针。
{
// 1. 锁定 head_mutex。
std::lock_guard<std::mutex> head_lock(head_mutex);

// 2. 执行线程安全的“空队列”检查 (锁顺序 head -> tail)。
if(head.get() == get_tail())
{
return std::unique_ptr<node>(); // 返回 nullptr
}

// 3. 队列非空,在锁保护下弹出。
return pop_head();
}

std::unique_ptr<node> try_pop_head(T& value) // 尝试弹出头节点,将数据移入 'value'。
{
// 1. 锁定 head_mutex。
std::lock_guard<std::mutex> head_lock(head_mutex);

// 2. 执行线程安全的“空队列”检查。
if(head.get() == get_tail()) // 锁的固定顺序 head -> tail , 避免死锁
{
return std::unique_ptr<node>(); // 返回 nullptr
}

// 3. (关键:异常安全) 先移动数据,再弹出节点。
// 理由同 wait_pop_head(T& value)。
value = std::move(*head->data);

// 4. 弹出节点。
return pop_head();
}


public:

threadsafe_queue():
head(new node), tail(head.get()) // 初始化时创建哑节点, head 和 tail 指向它
{}

// 禁止拷贝和赋值, 因为有独占所有权的指针成员和互斥量成员
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(const threadsafe_queue& other) = delete;

// --- 公共接口 (Public Interface) ---

void push(T new_value)
{
// --- 1. 锁外准备 (性能优化) ---
// 在锁外创建 T 的实例 (make_shared) 和新节点 (new node)。
// 内存分配是耗时操作,不应在锁内执行。
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node); // 新的哑节点
node* const new_tail = p.get(); // 保存新节点的原始指针

// --- 2. 锁内操作 (快速) ---
{
// 只锁定 tail_mutex, push 操作与 pop 操作 (锁 head_mutex) 可以并发。
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data; // 将数据 *填充* 到 *当前的* 虚拟节点中
tail->next = std::move(p); // 将 *新的* 虚拟节点链接到链表末尾, 所有权转移
tail = new_tail; // 更新 tail 指针指向新的虚拟节点
} // tail_lock 在此释放

// --- 3. 通知 (锁外) ---
data_cond.notify_one();
}

std::shared_ptr<T> wait_and_pop() // 等待并弹出一个元素,返回 std::shared_ptr。
{
// 1. 调用辅助函数,它会阻塞直到有数据,并安全地弹出一个节点, 返回旧头节点。
std::unique_ptr<node> const old_head = wait_pop_head(); // 现在old_head管理旧头节点的所有权

// 2. 从弹出的节点中返回数据。
return old_head->data;
} // (RAII: old_head 在此销毁,自动 delete 节点)

void wait_and_pop(T& value) // 等待并弹出一个元素,通过引用写入 'value'。
{
// 1. 调用异常安全的辅助函数。
// 数据在 wait_pop_head(value) 内部已经被移入 'value'。
std::unique_ptr<node> const old_head = wait_pop_head(value);

// 2. (RAII: old_head 在此销毁,自动 delete 节点)
}

std::shared_ptr<T> try_pop() // 尝试弹出头元素,返回 std::shared_ptr<T>。
{
// 1. 调用辅助函数。old_head 要么是弹出的节点,要么是 nullptr。
std::unique_ptr<node> old_head = try_pop_head();

// 2. 在锁外检查结果并返回数据 (或空指针)。
return old_head ? old_head->data : std::shared_ptr<T>();
}

bool try_pop(T& value) // 尝试弹出头元素,通过引用写入 'value'。
{
// 1. 调用异常安全的辅助函数。
std::unique_ptr<node> const old_head = try_pop_head(value);

// 2. 利用 unique_ptr 到 bool 的隐式转换返回成功与否。
// (true: old_head 非空, 弹出成功; false: old_head 为空, 队列空)
return old_head;
}

bool empty() // 公共接口, 检查队列是否为空
{
// 1. 锁定 head_mutex。
std::lock_guard<std::mutex> head_lock(head_mutex);

// 2. 执行与 pop 操作 *完全一致* 的线程安全“空队列”检查。
return (head.get() == get_tail());
}
};
##### 流程一:入队(Push)

  1. 锁外准备(性能优化)
  • 创建 shared_ptr<T> new_data(指向待入队数据 A)。
  • 创建 std::unique_ptr<node> p(指向新的空节点 D2)。
  • 这些耗时操作在锁外完成,减少锁持有时间。
  1. 获取锁
  • 仅获取 tail_mutexstd::lock_guard<std::mutex> tail_lock(tail_mutex);
  1. 修改队列(锁内)
  • tail->data = new_data;:将数据 A 填充到当前虚拟节点 D1。
  • tail->next = std::move(p);:将 D1 链接到新的虚拟节点 D2。
  • tail = new_tail;:更新 tail 指针指向 D2。
  1. 释放锁与通知(锁外)
  • 释放 tail_mutex
  • data_cond.notify_one();:通知一个等待的消费者线程。

流程二:非阻塞出队(try_pop)
  1. 调用辅助函数
  • try_pop() 调用 try_pop_head()
  1. 获取锁
  • 获取 head_mutexstd::lock_guard<std::mutex> head_lock(head_mutex);
  1. 安全检查(队列是否为空)
  • if(head.get() == get_tail()):检查队列是否为空。
    • head.get()(D1),get_tail()(D2,需临时获取 tail_mutex)。
  • 锁顺序始终为 head_mutextail_mutex,避免死锁。
  1. 修改队列(锁内)
  • 调用 pop_head()
    • std::unique_ptr<node> old_head = std::move(head);
    • head = std::move(old_head->next);(head 指向 D2)
  1. 释放锁与返回(锁外)
  • 释放 head_mutex
  • 返回 old_head->data(即 A)。
  1. 内存回收(RAII)
  • old_head 在函数末尾被销毁,自动释放 D1 节点。

流程三:阻塞出队(wait_and_pop)
  1. 进入等待
  • 调用 wait_pop_head(value)wait_for_data()
  • 获取 head_mutexunique_lock)。
  • data_cond.wait(head_lock, ...):谓词为 head.get() != get_tail()
  • 若队列为空,自动释放 head_mutex,线程休眠。
  1. 生产者唤醒
  • 另一个线程调用 push(A),最后执行 data_cond.notify_one()
  1. 消费者苏醒
  • 被唤醒后重新获取 head_mutex
  • 再次检查谓词,队列已非空,继续执行。
  1. 安全弹出(锁内)
  • 持有 head_mutex,先移动数据:value = std::move(*head->data);
  • 若移动赋值抛异常,节点未弹出,数据不丢失。
  • 调用 pop_head(),弹出节点。
  1. 释放
  • 释放 head_mutex
  • old_head(unique_ptr)在函数结束时销毁,自动释放节点。

总结:

通过引入哑节点和分离的头尾锁,可以实现并发性能远超单锁版本的线程安全队列。push 和 pop 在大部分情况下可以并行执行,显著提高了吞吐量。

不过实现极其复杂,需要非常小心地处理锁的范围、边界条件(空队列)以及潜在的竞争条件(如 pop_head 中的 get_tail 时机)。

带有等待功能的版本(wait_and_pop)以及需要处理异常安全(特别是引用版本)时,复杂性进一步增加。

这个例子充分展示了细粒度锁带来的性能优势和设计挑战。

基于锁设计更加复杂的数据结构

在上一节中,我们深入探讨了相对简单的线性数据结构——栈和队列——的并发设计,从单锁演进到了细粒度锁。

本节将目光转向更复杂、接口更多样化的数据结构,例如查找表(类似 map)和链表(类似 list)。设计这些结构面临着新的挑战,但也提供了更多的并发可能性。

复杂数据结构的特点及与栈/队列的区别:

  • 栈/队列:接口相对固定(push, pop, empty 等),主要用于特定的数据流模式。大多数操作都会修改结构
  • 复杂结构 (Map, List, Tree 等):
    • 支持更多样化的操作(查找、插入、删除、更新、遍历、获取大小等)。
    • 使用模式更灵活,通常读取操作可能远多于修改操作(例如查找表)。
    • 内部结构可能更复杂(如树的平衡、哈希表的冲突链/开放寻址)。

这种多样性带来了双重影响: - 更大的并发潜力: - 操作不冲突的可能性增加:例如,在哈希表中,对不同桶的插入操作理论上可以完全并行。对同一个桶的多次读取操作也可以并行(如果使用读写锁)。 - 读多写少:查找表等结构通常读取远多于写入,这天然适合使用读写锁来允许多个读者并发。

  • 更高的设计难度:
    • 保护更困难:需要仔细考虑所有可能的操作组合之间的相互影响。例如insert 操作进行时,find 操作是否安全?delete 操作进行时,其他 delete 或 find 是否安全?如果结构需要内部重组(如哈希表扩容、树再平衡),如何保证这些“全局”操作与其他“局部”操作的安全交互?
    • 接口设计更关键:需要更加警惕接口固有的条件竞争,并仔细设计迭代/遍历机制(这是并发容器的一大难点)。

但不管怎么样, 两个原则依旧要遵守: - 安全第一:必须保证不变量、接口安全、异常安全、避免死锁。 - 追求并发:通过最小化锁的范围和持有时间,以及采用细粒度锁或读写锁等策略,来最大化并行执行的机会。 ### 编写一个使用锁的线程安全查询表

查找表(也称为字典或映射),这是一种更常用的数据结构,用于关联(Key)和(Value)。C++标准库提供了 std::map(基于树)和 std::unordered_map(基于哈希表)。

本节的核心是探讨如何设计一个线程安全且高并发的查找表。

查找表的特点与挑战

  • 读多写少:查找表通常被查询(读取)的频率远高于插入、更新或删除(写入)的频率。
  • 接口更复杂:除了基本的增删改查,还可能需要检查空、获取大小、甚至遍历。

主要挑战则在于接口设计与并发性:std::map 和 std::unordered_map 的接口严重依赖迭代器 (Iterators)。

然而在并发环境下,迭代器非常危险。当一个线程持有指向某个元素的迭代器时,另一个线程可能删除该元素,导致迭代器失效 (Invalidation),继续使用它会导致未定义行为。要安全地管理并发迭代器极其困难,通常需要迭代器自身也持有锁,但这会带来新的复杂性和死锁风险。

为了简化设计并保证安全,本节实现的线程安全查找表不提供传统意义上的迭代器接口。遍历等操作需要通过其他方式(如内部迭代函数或快照)实现。

基本操作与接口设计

一个基本的线程安全查找表需要支持以下操作:

  • 添加/更新键值对:

    • 接口竞争:如果提供单独的 add(key, value) 和 update(key, value) 函数,或者 if (!find(key)) add(key, value); 这样的用户代码模式,都会存在 TOCTTOU 竞争。
    • 解决方案:提供一个原子的 add_or_update_mapping(key, value) 操作, 将添加和更新合并为一个操作, 从而避免条件竞争(在锁内进行检查和修改)。
  • 删除键值对:remove_mapping(key)。

  • 查找值:value_for(key, default_value)。

    • 接口选择:当键不存在时如何处理?
      • 返回默认值(如本节实现)。
      • 返回 std::optional<Value> (C++17) 或 std::pair<Value, bool>
      • 返回智能指针 (std::shared_ptr<Value>),不存在时返回 nullptr。
      • 安全原则:绝不能返回 Value&(值的引用),因为这会违反第3章的原则(不允许受保护数据的引用泄漏出锁的作用域)。

并发策略:从单锁到细粒度锁

设计线程安全查找表的第一步是选择合适的并发控制策略。这里介绍两种常见策略:

策略 1:单锁 (Single Lock), 使用一个 std::mutex 或 std::shared_mutex 保护整个查找表。 - 优点:实现简单,易于保证线程安全。使用 shared_mutex 可以允许多个 value_for (读操作) 并发执行。 - 缺点:写操作(add_or_update, remove)被完全序列化。在任何时刻,最多只有一个线程能修改表,这在高写入负载下会成为严重瓶颈。

策略 2:细粒度锁 (Fine-grained Locking) - 本节重点 - 目标:允许多个写操作(只要它们不冲突)和读写操作并发执行。 - 如何实现? 需要选择合适的底层数据结构, 从而可以构建分离的锁保护不同部分的数据。 - 二叉搜索树 (BST):不适合。所有操作(查找、插入、删除)通常都需要从根节点开始,根节点会成为锁竞争的瓶颈。即使使用“手递手”锁,效果也不理想。 - 有序数组:更不适合。插入和删除可能需要移动大量元素,几乎总是需要锁住整个数组。 - 哈希表 (Hash Table):将数据分散到多个桶 (Buckets) 中, 每个桶可以独立锁定。 - 并发优势:操作(增删改查)通常只影响一个桶。因此,我们可以为每个桶分配各自的锁。 - 效果:对不同桶的操作可以完全并行执行!

细粒度锁哈希表的实现

下面是一个使用细粒度锁实现的线程安全查找表的示例代码。该查找表基于哈希表结构,每个桶都有自己的读写锁,允许多个读者并发访问同一个桶,同时确保写操作的独占性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
template<typename Key, typename Value, typename Hash=std::hash<Key>>
class threadsafe_lookup_table {
private:
// 内部类:代表哈希表的一个桶
class bucket_type {
private:
typedef std::pair<Key, Value> bucket_value;
typedef std::list<bucket_value> bucket_data; // 桶内使用链表处理哈希冲突
typedef typename bucket_data::iterator bucket_iterator; // 桶内元素迭代器, 即指向链表节点的指针

bucket_data data; // 桶内存储的键值对链表
mutable boost::shared_mutex mutex; // 1. 每个桶有自己的读写锁!

// 查找桶内元素的辅助函数 (假设已加锁)
bucket_iterator find_entry_for(Key const& key) const{
return std::find_if(data.begin(),data.end(),
[&](bucket_value const& item) {return item.first==key;}
);
}
public:
// 读操作:查找值
Value value_for(Key const& key, Value const& default_value) const {
boost::shared_lock<boost::shared_mutex> lock(mutex); // 3. 获取桶的【共享锁】
bucket_iterator const found_entry = find_entry_for(key); // 查找元素, 返回迭代器
return (found_entry == data.end()) ? default_value : found_entry->second; // 返回值或默认值
}

// 写操作:添加或更新
void add_or_update_mapping(Key const& key, Value const& value) {
std::unique_lock<boost::shared_mutex> lock(mutex); // 4. 获取桶的【独占锁】
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry == data.end()) {
data.push_back(bucket_value(key, value)); // 如果不存在则添加新键值对
} else {
found_entry->second = value; // 如果存在则更新值
}
}

// 写操作:删除
void remove_mapping(Key const& key) {
std::unique_lock<boost::shared_mutex> lock(mutex); // 5. 获取桶的【独占锁】
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry != data.end()) {
data.erase(found_entry);
}
}
}; // bucket_type 结束

std::vector<std::unique_ptr<bucket_type>> buckets; // 6. 存储所有桶的 vector, 以指针形式管理
Hash hasher; // 哈希函数对象, 重载了 operator()

// 根据 Key 获取对应的桶 (无锁操作)
bucket_type& get_bucket(Key const& key) const { // 7
std::size_t const bucket_index = hasher(key) % buckets.size(); // 计算桶索引
return *buckets[bucket_index]; // 返回对应桶的引用
}

public:
// 构造函数:创建指定数量的桶
threadsafe_lookup_table(unsigned num_buckets = 19, /* ... */) : buckets(num_buckets), /* ... */ {
for (unsigned i = 0; i < num_buckets; ++i) {
buckets[i].reset(new bucket_type); // 初始化每个桶. 这里的reset函数将unique_ptr指向新创建的bucket_type对象
}
}
// 禁止拷贝和赋值
threadsafe_lookup_table(const threadsafe_lookup_table&) = delete;
threadsafe_lookup_table& operator=(const threadsafe_lookup_table&) = delete;

// 公共接口:委托给对应的桶
Value value_for(Key const& key, Value const& default_value = Value()) const {
return get_bucket(key).value_for(key, default_value); // 先通过 get_bucket 获取桶, 再调用桶的 value_for
}
void add_or_update_mapping(Key const& key, Value const& value) {
get_bucket(key).add_or_update_mapping(key, value);
}
void remove_mapping(Key const& key) {
get_bucket(key).remove_mapping(key);
}
// ... 可能的其他接口,如 get_map() ...
};
核心设计:

  • threadsafe_lookup_table 包含一个 std::vector ,其中存储了固定数量的 bucket_type 实例(通过 unique_ptr 管理)。
  • get_bucket(key) 使用哈希函数 hasher 计算键 key 应该属于哪个桶,并返回该桶的引用。这个定位过程是无锁的,因为 buckets 向量在构造后是只读的。真正的锁在 bucket_type 内部。每个桶有自己独立的 boost::shared_mutex (或 std::shared_mutex)。
  • 查找表的三个公共接口只是简单地调用 get_bucket 定位到正确的桶,然后调用该桶的相应成员函数(value_for, add_or_update_mapping, remove_mapping)。
  • 桶内操作:
    • value_for (读) 获取桶的共享锁。
    • add_or_update_mapping / remove_mapping (写) 获取桶的独占锁。
    • 桶内部使用 std::list 来处理哈希冲突(链地址法)。

并发性分析:极高。 - 对不同桶的操作(无论是读还是写)可以完全并行执行,因为它们获取的是不同的锁。 - 对同一个桶的多个读操作 (value_for) 也可以并行执行,因为它们获取的是共享锁。 - 只有对同一个桶的写操作 (add_or_update, remove) 或读写混合操作才需要互斥(通过独占锁)。 - 并发度大约与桶的数量成正比。

异常安全分析:基本异常安全。 - value_for:只读,安全。 - remove_mapping:std::list::erase 对迭代器通常是 noexcept 的,安全。 - add_or_update_mapping: - push_back:std::list::push_back 提供强异常保证,安全。 - found_entry->second = value:如果 Value 的拷贝赋值运算符抛异常,桶内链表的状态可能只更新了一半(如果 Value 是复杂类型),但这不影响数据结构的整体一致性,由用户负责 Value 类型的异常安全。

有时, 还需要获取整个查找表的一致性快照(例如,用于序列化或调试)。此时需要同时阻止对所有桶的修改,才能获得一个原子性的快照

具体实现: 1. 创建一个 std::vector<std::unique_lock<boost::shared_mutex>> locks;, 用于存储所有桶的锁。 2. 按固定顺序(例如,桶的索引从 0 到 N-1)遍历 buckets 向量。 3. 对每一个桶的 mutex 创建一个 std::unique_lock(获取独占锁)并添加到 locks 向量中。现在所有桶都被锁定了。 4. 创建一个新的 std::map<Key, Value> res;, 用于存储快照结果。 5. 遍历所有桶,将每个桶 list 中的所有键值对 insert 到 res 中。 6. 函数返回 res。当函数退出时,locks 向量析构,所有 unique_lock 析构,所有桶的锁被自动释放。

死锁避免:必须按固定顺序(如索引顺序)获取所有桶的锁。如果两个线程同时尝试获取快照,它们会按相同顺序请求锁,最多只有一个线程会阻塞,不会发生死锁。

总结:

基于哈希表桶级细粒度锁(特别是读写锁)是实现高并发线程安全查找表的有效策略。它通过将锁的范围限制在单个桶内,极大地提高了并发度,允许对不同桶的操作完全并行。相比单锁实现,并发性能得到了显著提升。

不过需要仔细处理接口设计(避免迭代器)、哈希冲突(如使用链表)以及需要全局一致性的操作(如快照,需锁住所有桶并注意死锁)。

编写一个使用锁的线程安全链表

继查找表之后,本节将探讨另一种基础但重要的数据结构——链表 (Linked List) 的并发设计。链表的设计,特别是涉及到迭代 (Iteration) 时,面临着与哈希表截然不同的挑战。

与哈希表相似, 链表的核心挑战依旧是并发环境下的迭代器 - 链表特性:链表的操作(插入、删除、查找)通常需要遍历节点。 - 标准迭代器的问题: - 失效 (Invalidation):标准库迭代器通常持有指向容器内部节点的指针或引用。如果一个线程持有迭代器 it 指向节点 N,而另一个线程删除了节点 N,迭代器 it 就失效了。继续使用失效的迭代器是未定义行为。 - 生命周期与锁:要使迭代器在并发环境下安全,迭代器本身可能需要持有它所指向(或将要访问)节点的锁。但这极大地增加了复杂性: - 迭代器的生命周期通常独立于容器的操作,管理锁的释放变得困难。 - 持有锁时间过长:如果迭代器在其整个生命周期内都持有锁,会严重阻塞其他线程。

结论:直接暴露标准库风格的迭代器对于并发链表来说极其危险且难以正确实现。

一种避免暴露迭代器的策略是提供成员函数来进行迭代,例如 for_each,它接受一个用户提供的函数 f,并在内部为链表中的每个元素调用 f。

优点在于容器内部可以管理遍历过程中的锁,用户不需要直接处理迭代器和锁。

主要缺点 (严重): - 违反死锁指导原则:for_each 必须在持有内部锁(至少是当前节点的锁)的情况下,调用用户提供的代码 (f)。如果用户代码 f 尝试获取其他锁(甚至是同一个链表上其他操作所需的锁),就可能导致死锁。 - 引用传递风险:为了让用户代码 f 能够操作元素,for_each 通常需要将元素的引用传递给 f。如果用户代码保存了这个引用,并在 for_each 调用结束后(锁已释放)再去访问它,就会发生数据竞争。 - 拷贝开销:可以通过传递元素的拷贝给 f 来避免引用风险,但这对于大型或昂贵的对象来说开销可能过大。

结论:虽然可行,但这种方法将避免死锁和数据竞争的责任推给了用户,使得接口不够安全和易用。

最终, 为了在允许一定并发性的同时安全地支持遍历和修改,本节采用了节点级锁结合“手递手”锁定的策略。

  • 节点级锁 (Node-Level Locking):每一个链表节点内部都包含自己的 std::mutex
  • “手递手”锁定 (Hand-over-Hand / Lock Coupling):在遍历链表时,线程的操作流程如下:
    • 持有当前节点 current 的锁。
    • 找到下一个节点 next。
    • 锁住 next 节点的锁。
    • 解锁 current 节点的锁。(关键步骤!)
    • 现在 next 节点成为新的 current 节点,继续遍历。

效果:就像爬绳子时“一只手抓住上面,另一只手才松开下面”一样,线程在移动到下一个节点之前,会同时持有当前下一个节点的锁,然后释放前一个。这保证了遍历过程的连续性和安全性,同时允许多个线程在链表的不同位置并发地进行遍历或修改。

细粒度锁链表的实现

下面是一个使用节点级锁和手递手锁定策略实现的线程安全链表的示例代码。该链表支持基本的插入、删除和查找操作,同时允许安全地遍历链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
template<typename T>
class threadsafe_list
{
struct node // 1. 节点结构
{
std::mutex m; // 每个节点都有自己的锁
std::shared_ptr<T> data; // 使用 shared_ptr 管理数据
std::unique_ptr<node> next; // 使用 unique_ptr 管理下一个节点

node() : next() {} // 2. 哑节点的构造
node(T const& value) : data(std::make_shared<T>(value)) {} // 3. 数据节点的构造
};

node head; // 哑/哨兵头节点 (简化边界处理)

public:
threadsafe_list() {}
~threadsafe_list() { remove_if([](node const&){ return true; }); } // 清理
// 禁止拷贝和赋值
threadsafe_list(threadsafe_list const&) = delete;
threadsafe_list& operator=(threadsafe_list const&) = delete;

// 添加到头部
void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value)); // 4. 锁外分配
std::lock_guard<std::mutex> lk(head.m); // 只锁头节点
new_node->next = std::move(head.next); // 5. 链接
head.next = std::move(new_node); // 6. 更新头指针
}

// 内部迭代:为每个元素调用 f
template<typename Function>
void for_each(Function f) // 7
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m); // 8. 初始锁住头节点
while(node* const next = current->next.get()) // 9. 找到下一个节点
{
std::unique_lock<std::mutex> next_lk(next->m); // 10. 锁住下一个节点
lk.unlock(); // 11. 解锁当前节点 (Hand-over-Hand!)
f(*next->data); // 12. 调用用户函数 (只持有 next 节点的锁)
current = next; // 前进
lk = std::move(next_lk); // 13. 将 next_lk 的所有权转移给 lk,准备下次循环
}
}

// 内部查找:找到第一个满足 p 的元素
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p) // 14
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if(p(*next->data)) // 15. 检查条件 (只持有 next 节点的锁)
{
return next->data; // 16. 找到,返回数据 (shared_ptr)
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>(); // 未找到
}

// 内部删除:删除所有满足 p 的元素
template<typename Predicate>
void remove_if(Predicate p) // 17
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m); // 锁住当前节点 (初始是 head)
while(node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m); // 锁住下一个节点
if(p(*next->data)) // 18. 检查条件
{
// 需要删除 next 节点
std::unique_ptr<node> old_next = std::move(current->next); // 从 current 断开 next, 此时 old_next 拥有 next 的所有权
current->next = std::move(next->next); // current 跳过 next,指向 next 的下一个
next_lk.unlock(); // 19. 解锁要删除的节点 next
// old_next 在离开作用域时会自动删除节点 (20)
}
else // 不需要删除 next 节点
{
lk.unlock(); // 21. 解锁当前节点 (Hand-over-Hand)
current = next; // 前进
lk = std::move(next_lk); // 移动锁所有权
}
}
}
};
- node 结构 (①):关键在于 std::mutex m,每个节点都有自己的锁。使用 shared_ptr<T> 存数据,便于安全返回和管理生命周期。unique_ptr<node> 管理节点所有权。

  • head 哑节点:构造函数创建了一个 head 节点 (②),它本身不存数据 (data 为空),作为链表的起点。这简化了 push_front 和遍历的逻辑,无需特殊处理空链表或头节点插入/删除。

  • push_front (④, ⑤, ⑥):实现简单高效。只需锁住 head 节点,修改 head.next 和新节点的 next 即可。分配节点在锁外完成。

  • for_each (⑦ - ⑬):完美体现了“手递手”锁定。注意在调用用户函数 f (⑫) 之前解锁了 current 节点 (⑪),调用 f 时只持有 next 节点的锁,减小了持有锁的时间和潜在的死锁风险(如果 f 内部访问链表,只会与当前节点的操作冲突)。

  • find_first_if (⑭ - ⑯):与 for_each 逻辑类似,但找到匹配项后立即返回 (⑯)。返回 shared_ptr<T> 保证了即使节点稍后被删除,返回的数据指针仍然有效。

  • remove_if (⑰ - ㉑):最复杂的操作, 但同样遵循“手递手”锁定模式。

    • 当需要删除 next 节点时 (⑱),它必须持有 current 节点的锁 (lk),因为它需要修改 current->next 来跳过 next 节点。
    • 它也持有 next 节点的锁 (next_lk) 来安全地读取 next->next。
    • 修改完 current->next 后,立即解锁 next_lk (⑲)。
    • 节点的实际删除 (⑳) 发生在 old_next (一个 unique_ptr) 离开作用域时,此时 next 节点的锁已释放。
    • 安全性:虽然 next 节点的锁在节点被删除前释放了,但由于修改 current->next 的操作是在持有 lk(current 的锁)的情况下完成的,没有其他线程可以通过 current 到达那个即将被删除的 next 节点。后续的遍历会直接跳过它。
    • 如果不需要删除 (㉑),则执行与 for_each 相同的“手递手”解锁和前进。

安全性与并发性分析

安全性: - 数据竞争:通过节点级锁和手递手策略避免了对节点内部数据和 next 指针的竞争。 - 死锁:由于总是按从头到尾的顺序获取锁,不可能出现循环等待,不会发生死锁(前提是用户提供的 f 或 p 不会尝试以不兼容的方式锁住链表)。 - 迭代器失效:通过内部迭代函数避免了传统迭代器失效问题。

并发性: - 优点:允许多个线程在链表的不同部分并发操作。例如,线程A可以在链表前端 push_front,同时线程B在链表中部执行 find_first_if,线程C在链表尾部执行 remove_if(只要它们操作的节点不重叠或不相邻太近)。这远优于单锁实现。 - 缺点/瓶颈:“手递手”锁定本质上仍然是顺序依赖的。如果一个线程在处理某个节点时花费了很长时间(例如,用户函数 f 很慢),它会阻塞所有想要通过该节点的其他线程(无论是向前还是尝试从更前面追赶)。它提高了并发度,但并不能实现完全的并行访问

总结:

基于节点级锁和“手递手”策略的线程安全链表是一种在安全性和并发性之间取得较好平衡的方案。它通过细粒度锁显著提高了并发潜力,允许多线程在链表不同区域同时工作; 通过内部迭代函数避免了传统迭代器的并发安全问题; 它通过固定的加锁顺序避免了死锁。

其主要局限在于手递手锁定带来的顺序瓶颈,一个慢操作会阻塞后续线程。并且这种设计的复杂性也远高于单锁实现。