RocksDB 的 Pipelined 写入
概述
版本:v5.15.10
之前我们分析了RocksDB的默认写入方式,而在options
可以设置enable_pipelined_write
, 即pipelined(流水线)写入方式,默认的写入方式中,一个batch的需要完成WAL之后,再完成Memtable的写入才选出下一个Leader.而Pipelined写入中不需要等待Memtable写入完成,即当WAL写入完成之后,即可选出下一个Leader继续完成下一个batch的写入从而达到Pipelined的效果.
具体实现
Pipelined的写入在默认的写入方式中进行跳转,我们直接来看Pipelined的具体实现db/db_impl_write.cc:PipelinedWriteImpl()
:
1 | Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, |
来分析ExitAsBatchGroupLeader()
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
// Propagate memtable write error to the whole group.
if (status.ok() && !write_group.status.ok()) {
status = write_group.status;
}
// 假如是Pipelined写入方式
if (enable_pipelined_write_) {
// Notify writers don't write to memtable to exit.
for (Writer* w = last_writer; w != leader;) {
Writer* next = w->link_older;
w->status = status;
// 如果无需写入memtale, 则设置其他的writer状态为STATE_COMPLETED后退出
if (!w->ShouldWriteToMemtable()) {
CompleteFollower(w, write_group);
}
w = next;
}
// 假如Leader也无需写入memtable, 设置自己的状态为STATE_COMPLETED后退出
if (!leader->ShouldWriteToMemtable()) {
CompleteLeader(write_group);
}
Writer* next_leader = nullptr;
// 选出下一个batch的leader
// 首先判断等待队列中是否有新来的writer, 假如没有,则插入一个dummy(空)的write至 // 等队列
Writer dummy;
Writer* expected = last_writer;
bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
if (!has_dummy) {
// 假如插入失败,即目前等待队列已经存在新来的writer,则设置下一个batch的leader
next_leader = FindNextLeader(expected, last_writer);
assert(next_leader != nullptr && next_leader != last_writer);
}
// leader判断batch的数量,假如大于0,则将自己连同其他的writer合并进memtable的
// batch队列中
if (write_group.size > 0) {
if (LinkGroup(write_group, &newest_memtable_writer_)) {
// The leader can now be different from current writer.
SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
}
}
// 假如dummpy插入成功,即目前不存在新来的writer,则以dummy为基准,选择下一个batch
// 的leader
if (has_dummy) {
assert(next_leader == nullptr);
expected = &dummy;
bool has_pending_writer =
!newest_writer_.compare_exchange_strong(expected, nullptr);
if (has_pending_writer) {
next_leader = FindNextLeader(expected, &dummy);
assert(next_leader != nullptr && next_leader != &dummy);
}
}
// 假如下一个batch的leader设置成功
if (next_leader != nullptr) {
next_leader->link_older = nullptr;
// 将其唤醒,并设置状态为STATE_GROUP_LEADER
SetState(next_leader, STATE_GROUP_LEADER);
}
// leader等待唤醒
AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&eabgl_ctx);
} else {
// 默认的写入方式
// ...
}
}
关于Pipelined的一个Bug
官方修复过一个关于Pipelined的Bug, 我们在上面的ExitAsBatchGroupLeader()
中可以看到每次batch的完成之后在查找下个Leader之前会插入一个dummy
的writer.
至于为什么要插入这个dummy
,我们来梳理一下RocksDB的Pipelined中查找下个Leader的一个关键逻辑: Pipelined会从最新的writer回溯至本次batch的last_writer
, 将last_writer
之后的writer置为下一个batch的leader, 而在某些特别的边界情况下,last_writer
的指针地址与之后的batch中某一个writer地址相同,所以last_writer
与其之间的writer都会被忽略,从而导致阻塞.
解决方法就是在中间放置一个dummy
, 抛弃之前与last_writer
比较的逻辑,改为每次与dummy
进行比较.