准备

  • MySQL 8.0.25

背景

InnoDB 最新的代码实现了一种无锁的多生产者-多消费者的队列, 整个设计都比较简单,但这是我们平时编程中非常有效的数据结构. 关于无锁依然是采用常见的 CAS 操作, 通过 CPU 的 CAS 指令, 我们就可以用其来实现各种无锁(lock free)的数据结构.

实现

InnoDB 无锁队列类名为: mpmc_bq, 意为 Multiple producer consumer, bounded queue, 即有界的多生产者-多消费者队列. 其入列出列的顺序依然是先入先出(FIFO), InnoDB 采用类模板的方式实现, 可以适用于不同数据类型, 整个队列采用数组的方式来模拟队列的入列出列.

整体思路

接口

  • enqueue(): 元素入列

  • dequeue(): 元素出列

设计思路

具体实现

类 mpmc_bq 实现了 enqueue() 和 dequeue 接口, 其中 使用 Cell 代表其队列中的数据元素:

1
2
3
4
5
struct Cell {
std::atomic<size_t> m_pos;
T m_data;
};

enqueue 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/** Enqueue an element
@param[in] data Element to insert, it will be copied
@return true on success */
/* 元素入列. */
bool enqueue(T const &data) MY_ATTRIBUTE((warn_unused_result)) {
/* m_enqueue_pos only wraps at MAX(m_enqueue_pos), instead
we use the capacity to convert the sequence to an array
index. This is why the ring buffer must be a size which
is a power of 2. This also allows the sequence to double
as a ticket/lock. */

/* 获取当前的 m_enqueue_pos. */
size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);

Cell *cell;

for (;;) {
/* 以 m_capacity 取模求对应位置的元素. */
cell = &m_ring[pos & m_capacity];

size_t seq;

/* 获取元素 cell 的 m_pos. */
seq = cell->m_pos.load(std::memory_order_acquire);

/* 计算 cell->m_pos 和 m_enqueue_pos 的差值. */
intptr_t diff = (intptr_t)seq - (intptr_t)pos;

/* If they are the same then it means this cell is empty */

if (diff == 0) {
/* Claim our spot by moving head. If head isn't the same as we last
checked then that means someone beat us to the punch. Weak compare is
faster, but can return spurious results which in this instance is OK,
because it's in the loop */

/* cell->m_pos 和 m_enqueue_pos 相等代表 cell 为空闲状态, m_enqueue_pos 自增 1,
* 假如自增失败即代表当前位置的 cell 已被占用, 需要重新获取 m_enqueue_pos. */
if (m_enqueue_pos.compare_exchange_weak(pos, pos + 1,
std::memory_order_relaxed)) {
break;
}

} else if (diff < 0) {
/* The queue is full */

/* cell->m_pos 和 m_enqueue_pos 在入列成功的状态下都是自增 1.
* diff < 0 即代表 m_enqueue_pos 已经回环,但当前的 cell 仍未出列. */

return (false);

} else {
/* 重新获取 m_enqueue_pos. */
pos = m_enqueue_pos.load(std::memory_order_relaxed);
}
}

cell->m_data = data;

/* Increment the sequence so that the tail knows it's accessible */

/* cell->m_pos 自增 1. */
cell->m_pos.store(pos + 1, std::memory_order_release);

return (true);
}

dequeue 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/** Dequeue an element
@param[out] data Element read from the queue
@return true on success */
/* 元素出列. */
bool dequeue(T &data) MY_ATTRIBUTE((warn_unused_result)) {
Cell *cell;
/* 获取当前的 m_dequeue_pos. */
size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);

for (;;) {
/* 以 m_capacity 取模求对应位置的元素. */
cell = &m_ring[pos & m_capacity];

/* 获取元素 cell 的 m_pos. */
size_t seq = cell->m_pos.load(std::memory_order_acquire);

/* 计算 cell->m_pos 和 m_dequeue_pos + 1 的差值. */
auto diff = (intptr_t)seq - (intptr_t)(pos + 1);

if (diff == 0) {
/* Claim our spot by moving the head. If head isn't the same as we last
checked then that means someone beat us to the punch. Weak compare is
faster, but can return spurious results. Which in this instance is
OK, because it's in the loop. */

/* cell->m_pos 和 m_dequeue_pos + 1 相等代表 cell 已经是成功入列的元素,
* 因为在每次成功入列后, cell->m_pos 会自增 1.
* 尝试 m_dequeue_pos 自增 1,
* 假如自增失败即代表当前位置的 cell 已被出列, 需要重新获取 m_dequeue_pos. */
if (m_dequeue_pos.compare_exchange_weak(pos, pos + 1,
std::memory_order_relaxed)) {
break;
}

} else if (diff < 0) {
/* The queue is empty */

/* cell 入列成功会将 m_pos 自增 1, 所以假如 m_pos 小于 m_dequeue_pos + 1,
* 即代表 cell 元素暂未入列. */
return (false);

} else {
/* Under normal circumstances this branch should never be taken. */

/* 重新获取 m_dequeue_pos. */
pos = m_dequeue_pos.load(std::memory_order_relaxed);
}
}

/* 获取对应元素的数据成员. */
data = cell->m_data;

/* Set the sequence to what the head sequence should be next
time around */

/* 更新 cell->m_pos 为 m_dequeue_pos + m_capacity + 1. */
cell->m_pos.store(pos + m_capacity + 1, std::memory_order_release);

return (true);
}

总结

以上基本上就是 InnoDB 实现的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上, 重点是利用 CAS 这种原子操作来将之前需要 mutex 保护的并发操作开销减小.