版本

  • MySQL 8.0.19

准备

在 MySQL 8.0.14 版本 InnoDB 引擎发布了一个新的特性 Parallel read of index (并行索引读取), 主要用于并行的读取索引数据, 目前仅仅支持 SELECT COUNT() 和 CHECK TABLE 操作, InnoDB 后续对于其他操作还会有更多的优化支持. 通过这个并行索引读取框架, InnoDB 可以支持同步、异步的并发读取索引数据, 异步的读取索引数据可以用来实现逻辑预读操作. 在此之前的预读逻辑, InnoDB 只有线性预读和随机预读这两种物理预读处理方法, 而对于 B+ tree 这种树形结构显然逻辑预读才更合适.

请注意此篇分析基于 MySQL 8.0.19,从 8.0.14 feature 引入开始, Parallel Read of index 进行了多次代码优化重构, 但其主要的设计思想没有改变. 目前 Parallel read of index 框架仍处于不稳定的阶段, 后续我们会持续针对新版本更新对应的内容.

并行索引读取

参数

  • innodb_parallel_read_threads: 当前并行读取的 worker 线程数量.

innodb_parallel_read_threads 是 session 级别的变量, 假如需要打开并行扫描框架即:

1
2
3
set local innodb_parallel_read_threads=4;

select count(*) from sbtest.table;

设计思想

Parallel read of index 主要利用当前的多核硬件优势, 针对当前可以并行读取的逻辑例如 SELECT COUNT() 或者 CHECK TABLE, 其主要逻辑是收集数据叶子节点的 Page Number, 使用多个 worker 并行读取数据 Page, 利用不同的回调函数来处理获取后的 rows. 目前 SELET COUNT() 和 CHECK TABLE 都是同步读取, 但 InnoDB 依然提供了接口处理对应的异步读取, 后续会针对需要异步读取的场景提供更多的优化路径.

实现

row_scan_index_for_mysql()

row_scan_index_for_mysql() 作为 SELECT COUNT() 和 CHECK TABLE 的入口函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 /* 扫描索引数据 */
----------------------------
| row_scan_index_for_mysql() |
----------------------------
|
|
| /* SELECT COUNT() */
| ----------------------------------------
--> | row_mysql_parallel_select_count_star() |
| ----------------------------------------
|
|
| /* CHECK TABLE */
| ------------------------
--> | parallel_check_table() |
------------------------

基本数据结构

  • Parallel_reader::Scan_range: 代表当前并行扫描的范围.

  • Parallel_reader::Config 并行扫描的 configuration.

  • Parallel_reader::Scan_ctx 并行扫描的上下文 (context).

  • Parallel_reader::Ctx 并行读取的执行上下文 (Parallel reader execution context)

  • Parallel_reader 并行扫描 reader

SELECT COUNT()

我们以全表扫描 SELECT COUNT() 为例, 根据源码分析 Parallel Read 的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/* SELECT COUNT() 的入口函数 */
dberr_t row_mysql_parallel_select_count_star(
trx_t *trx, std::vector<dict_index_t *> &indexes, size_t max_threads,
ulint *n_rows) {
ut_a(!indexes.empty());
using Shards = Counter::Shards<Parallel_reader::MAX_THREADS>;

Shards n_recs;
Counter::clear(n_recs);

struct Check_interrupt {
byte m_pad[INNOBASE_CACHE_LINE_SIZE - (sizeof(size_t) + sizeof(void *))];
size_t m_count{};
const buf_block_t *m_prev_block{};
};

Check_interrupt checker[Parallel_reader::MAX_THREADS] = {};

/* 初始化并行 reader. */
Parallel_reader reader(max_threads);

ib::info() << "Parallel scan: " << max_threads;

/* 设置扫描的 range, ::Scan_range 可以支持范围扫描. */
const Parallel_reader::Scan_range FULL_SCAN;

// clang-format off
bool success{};

/* 迭代需要扫描的索引. */
for (auto index : indexes) {
/* 初始化扫描配置. */
Parallel_reader::Config config(FULL_SCAN, index);

/* 通过 add_scan() 将待扫描的上下文 Parallel_reader::Scan_ctx (context)
* 注册至 reader. add_scan() 的第三个参数是一个 lambda 函数, 在扫描一条
* record 后调用. */
success =
reader.add_scan(trx, config, [&](const Parallel_reader::Ctx *ctx) {
Counter::inc(n_recs, ctx->m_thread_id);

auto &check = checker[ctx->m_thread_id];

if (ctx->m_block != check.m_prev_block) {
check.m_prev_block = ctx->m_block;

++check.m_count;

if (!(check.m_count % 64) && trx_is_interrupted(trx)) {
return (DB_INTERRUPTED);
}
}
return (DB_SUCCESS);
});

if (!success) {
break;
}
}
// clang-format on

/* 执行 reader.run() 开始并行扫描. */
auto err = success ? reader.run() : DB_ERROR;

if (err == DB_SUCCESS) {
/* 输出统计 */
Counter::for_each(n_recs, [=](const Counter::Type n) {
if (n > 0) {
*n_rows += n;
ib::info() << "n: " << n;
}
});
}

return (err);
}

注册并行扫描上下文分析 add_scan():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
dberr_t Parallel_reader::add_scan(trx_t *trx,
const Parallel_reader::Config &config,
Parallel_reader::F &&f) {
/* 初始化构造一个 Parallel_reader::Scan_ctx */
auto scan_ctx = std::shared_ptr<Scan_ctx>(
UT_NEW_NOKEY(Scan_ctx(this, m_scan_ctx_id, trx, config, std::move(f))),
[](Scan_ctx *scan_ctx) { UT_DELETE(scan_ctx); });

if (scan_ctx.get() == nullptr) {
/* 假如初始化失败, 即 OOM. */
ib::error() << "Out of memory";
return (DB_OUT_OF_MEMORY);
}

/* 将 scan_ctx 插入 m_scan_ctxs. m_scan_ctxs 存放全局的并行扫描上下文. */
m_scan_ctxs.push_back(scan_ctx);

/* 递增 m_scan_ctx_id. */
++m_scan_ctx_id;

/* 针对 scan_ctx 的 dict_index_t 调用 S lock. */
scan_ctx->index_s_lock();

Parallel_reader::Scan_ctx::Ranges ranges{};
dberr_t err{DB_SUCCESS};

/* 将 scan_ctx 分片 */
err = scan_ctx->partition(config.m_scan_range, ranges, 0);

if (ranges.empty() || err != DB_SUCCESS) {
/* Table is empty. */
scan_ctx->index_s_unlock();
return (err);
}

/* 基于分片创建并行读取的执行上下文 (Parallel_reader::Ctx) .*/
err = scan_ctx->create_contexts(ranges);

/* 针对 dict_index_t 释放 S lock. */
scan_ctx->index_s_unlock();

return (err);
}

我们需要将待扫描数据的 range 分片,然后分配各个 worker 线程, InnoDB 的 B+ 树将数据存放在所有的叶子节点, 即 叶子节点为 level 0, 为了更好的理解 Parallel Read 如何分片,我们根据源代码来分析, Parallel_reader::Scan_ctx::partition 调用 Parallel_reader::Scan_ctx::create_ranges() 来进行分片:

1
2
3
4
5
6
7
8
9
10
11
dberr_t Parallel_reader::Scan_ctx::partition(
const Scan_range &scan_range, Parallel_reader::Scan_ctx::Ranges &ranges,
size_t split_level) {
/* ... */

/* 创建分片. */
err = create_ranges(scan_range, m_config.m_index->page, 0, split_level,
ranges, &mtr);

/* ... */
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
dberr_t Parallel_reader::Scan_ctx::create_ranges(const Scan_range &scan_range,
page_no_t page_no,
size_t depth,
const size_t split_level,
Ranges &ranges, mtr_t *mtr) {
ut_ad(index_s_own());
ut_a(max_threads() > 0);
ut_a(page_no != FIL_NULL);

/* Do a breadth first traversal of the B+Tree using recursion. We want to
set up the scan ranges in one pass. This guarantees that the tree structure
cannot change while we are creating the scan sub-ranges.

Once we create the persistent cursor (Range) for a sub-tree we can release
the latches on all blocks traversed for that sub-tree. */

/* 当前扫描的索引 dict_index_t */
const auto index = m_config.m_index;

/* 索引的 root 节点 page. */
page_id_t page_id(index->space, page_no);

Savepoint savepoint({mtr->get_savepoint(), nullptr});

/* 使用 mtr 加上 s latch. */
auto block = block_get_s_latched(page_id, mtr, __LINE__);

/* read_level requested should be less than the tree height. */
ut_ad(m_config.m_read_level <
btr_page_get_level(buf_block_get_frame(block), mtr) + 1);

savepoint.second = block;

ulint offsets_[REC_OFFS_NORMAL_SIZE];
auto offsets = offsets_;

rec_offs_init(offsets_);

page_cur_t page_cursor;

page_cursor.index = index;

/* 假如是全表扫描 scan_range.m_start 为 NULL. */
auto start = scan_range.m_start;

if (start != nullptr) {
page_cur_search(block, index, start, PAGE_CUR_LE, &page_cursor);

if (page_cur_is_after_last(&page_cursor)) {
return (DB_SUCCESS);
} else if (page_cur_is_before_first((&page_cursor))) {
page_cur_move_to_next(&page_cursor);
}
} else {
/* page_cursor 定位当前 Page 的第一个 record. */
page_cur_set_before_first(block, &page_cursor);
/* 跳过 infimum record. */
page_cur_move_to_next(&page_cursor);
}

mem_heap_t *heap{};

/* 是否处于 leaf 节点. */
const auto at_leaf = page_is_leaf(buf_block_get_frame(block));
/* 当前的 level. */
const auto at_level = btr_page_get_level(buf_block_get_frame(block), mtr);

Savepoints savepoints{};

/* 迭代直到 page_cursor 处于当前 Page 的最后一个 record. */
while (!page_cur_is_after_last(&page_cursor)) {
const auto rec = page_cur_get_rec(&page_cursor);

ut_a(at_leaf || rec_get_node_ptr_flag(rec) ||
!dict_table_is_comp(index->table));

if (heap == nullptr) {
heap = mem_heap_create(srv_page_size / 4);
}

offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);

const auto end = scan_range.m_end;

if (end != nullptr && end->compare(rec, index, offsets) <= 0) {
break;
}

page_cur_t level_page_cursor;

/* 假如当前的 level 大于目标 level: m_config.m_read_level, 全表扫描中
* m_config.m_read_level 为0, 即叶子节点. */
if (at_level > m_config.m_read_level) {
/* 通过 rec 获取子节点的 Page Number. */
auto page_no = btr_node_ptr_get_child_page_no(rec, offsets);

if (depth < split_level) {
/* 假如 depth 小于 split_level, 需要从下一层开始 partition .*/
create_ranges(scan_range, page_no, depth + 1, split_level, ranges, mtr);

/* 移动到下一个 record. */
page_cur_move_to_next(&page_cursor);
continue;
}

/* start_range() 会定位到叶子节点根据 range.start 来创建 Page Cursor. */
level_page_cursor = start_range(page_no, mtr, start, savepoints);
} else {
/* In case of root node being the leaf node or in case we've been asked to
read the root node (via read_level) place the cursor on the root node and
proceed. */

if (start != nullptr) {
page_cur_search(block, index, start, PAGE_CUR_GE, &page_cursor);
ut_a(!page_rec_is_infimum(page_cur_get_rec(&page_cursor)));
} else {
page_cur_set_before_first(block, &page_cursor);

/* Skip the infimum record. */
page_cur_move_to_next(&page_cursor);
ut_a(!page_cur_is_after_last(&page_cursor));
}

/* Since we are already at the requested level use the current page
* cursor. */
memcpy(&level_page_cursor, &page_cursor, sizeof(level_page_cursor));
}

if (!page_rec_is_supremum(page_cur_get_rec(&level_page_cursor))) {
/* 创建持久化 Cursor 并插入全局 config.m_scan_range. */
create_range(ranges, level_page_cursor, mtr);
}

/* We've created the persistent cursor, safe to release S latches on
the blocks that are in this range (sub-tree). */
for (auto &savepoint : savepoints) {
mtr->release_block_at_savepoint(savepoint.first, savepoint.second);
}

if (m_depth == 0 && depth == 0) {
m_depth = savepoints.size();
}

savepoints.clear();

/* 假如当前 level 和 m_config.m_read_level 相等直接返回. */
if (at_level == m_config.m_read_level) {
break;
}

start = nullptr;

/* 移动到下一个 record. */
page_cur_move_to_next(&page_cursor);
}

savepoints.push_back(savepoint);

for (auto &savepoint : savepoints) {
mtr->release_block_at_savepoint(savepoint.first, savepoint.second);
}

if (heap != nullptr) {
mem_heap_free(heap);
}

return (DB_SUCCESS);
}

Parallel Read 的创建分片的方法是根据 B+ 树形结构, 使用 btr_node_ptr_get_child_page_no() 获取子节点的 Page Number, 即获取数据叶子节点的 Cursor.

创建 ranges 完成后, 通过 Parallel_reader::Scan_ctx::create_contexts() 根据配置的 worker 线程数目来决定是否设置 ctx->m_split, 然后将 Parallel_reader::Ctx 对象加入工作队列.

并行读取流程

reader.run() 根据设定的 woker 线程数目来创建对应数目的 Parallel_reader::worker 线程, worker 线程会从 m_ctxs 中获取待执行上下文 Parallel_reader::Ctx, 并调用 Parallel_reader::Ctx::traverse 读取 records.

另外 Parallel reader 也会根据需求创建预读线程 Parallel_reader::read_ahead_worker, 并进行物理预读.

并行读取线程

并行读取线程 Parallel_reader::worker 会根据创建的 Parallel_reader::Ctx 来读取对应叶子节点的 record, 并且会根据 trx->read_view 来判断可见性.

并行预读线程

针对 Parallel read 中的 Page, Parallel read of index 提供了预读的功能, 但目前处于关闭 (MySQL 8.0.19) 的阶段.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* n_pages 默认大小为 FSP_EXTENT_SIZE */
void Parallel_reader::read_ahead_worker(page_no_t n_pages) {
DBUG_EXECUTE_IF("bug28079850", set_error_state(DB_INTERRUPTED););

while (is_active() && !is_error_set()) {
uint64_t dequeue_count{};

Read_ahead_request read_ahead_request;

while (m_read_aheadq.dequeue(read_ahead_request)) {
auto scan_ctx = read_ahead_request.m_scan_ctx;

if (trx_is_interrupted(scan_ctx->m_trx)) {
set_error_state(DB_INTERRUPTED);
break;
}

ut_a(scan_ctx->m_config.m_read_ahead);
ut_a(read_ahead_request.m_page_no != FIL_NULL);

page_id_t page_id(scan_ctx->m_config.m_index->space,
read_ahead_request.m_page_no);

/* 以 page_id 的数据 Page 为基准, 连续读取下 N 个 Page. */
buf_phy_read_ahead(page_id, scan_ctx->m_config.m_page_size, n_pages);

++dequeue_count;
}

m_consumed.fetch_add(dequeue_count, std::memory_order_relaxed);

while (read_ahead_queue_empty() && is_active() && !is_error_set()) {
os_thread_sleep(20);
}
}
}

总结

我们通过 SELECT COUNT() 分析了 InnoDB 实现的 Parallel Read 框架,在并行扫描的过程中,无论是 dict_index_t 还是数据 Page 都是 S 锁. 虽然目前仅支持 CHECK TABLE 和 SELECT COUNT(), 但整个框架支持了足够多的接口,后续应该会支持更多的场景. 例如目前 CHECK TABLE 和 SELECT COUNT() 都是同步的并行读取, 使用 Parallel Read 框架针对 SELECT * 的全表扫描可以优化为异步的逻辑预读.