概述

版本:v5.15.10

之前我们分析了Rocksdb的默认写入方式,而在options可以设置enable_pipelined_write, 即pipelined(流水线)写入方式,默认的写入方式中,一个batch的需要完成WAL之后,再完成Memtable的写入才选出下一个Leader.而Pipelined写入中不需要等待Memtable写入完成,即当WAL写入完成之后,即可选出下一个Leader继续完成下一个batch的写入从而达到Pipelined的效果.

具体实现

Pipelined的写入在默认的写入方式中进行跳转,我们直接来看Pipelined的具体实现db/db_impl_write.cc:PipelinedWriteImpl():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

WriteContext write_context;
// 新建一个writer
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
// 将新建的writer加入batch队列中
write_thread_.JoinBatchGroup(&w);
// 判断状态是否为STATE_GROUP_LEADER, 即一个batch的leader
if (w.state == WriteThread::STATE_GROUP_LEADER) {
// ...
// leader选择其他writer加入整个batch中
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
size_t total_count = 0;
size_t total_byte_size = 0;

// ...
// 需要写入WAL
if (w.ShouldWriteToWAL()) {
// ...