概述

版本:v5.15.10

Rocksdb的写入操作是通过Put()接口完成的。在Rocksdb中维护一个队列,但只有队头的Leader可以写入, 队头可以将自己和后面的Writer任务合并操作。

为了防止数据丢失,Rocksdb设计首先写入WAL文件,再写入Memtable,在队头Leader完成WAL写入任务后,假如参数allow_concurrent_memtable_write被设置为true整个队列的数量大于1, Rocksdb就会采取并行写Memtable的策略,即Leader唤醒后面的Writer任务,将它们的状态置为返回为STATE_PARALLEL_MEMTABLE_WRITER, 后面的Writer就会各自完成自己的Memtable写入任务。

假如Rocksdb没有设置allow_concurrent_memtable_write参数, Leader会代为写入Memtable,完成之后唤醒其他Writer任务,将它们的返回状态设置为STATE_COMPLETED, 然后指定一个新的队头Leader,最后Writer各自返回写入的状态.

具体实现

写入的具体实现在db/db_impl_write.cc:WriteImpl(). Rocksdb的Put()接口组装一个batch之后,调用Write()

1
2
3
4
5
6
7
8
9
10
11
12
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// 构造一个batch, 然后将key, value组合起来存放在batch中
WriteBatch batch(key.size() + value.size() + 24);
Status s = batch.Put(column_family, key, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}

这里我们直接来看Write()的实现,即Write()调用WriteImpl():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
// 如果关闭了Memtable选项, 只需要写入WAL文件
if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used, batch_cnt, pre_release_callback);
}

// 是否打开了pipelined写入方式
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
}

// 新建一个writer
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);

// 将writer加入一个batch队列中
write_thread_.JoinBatchGroup(&w);
// 判断返回的状态是否为并行写memtable的leader
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// 应该写入memtable
if (w.ShouldWriteToMemtable()) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_memtable_time);

ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);

PERF_TIMER_START(write_pre_and_post_process_time);
}

// 检查memtable的写入任务是否全部已经完成,假如完成,最后一个writer以leader的角色退出,
// 并会选出下一个batch的leader
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
for (auto* writer : *(w.write_group)) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
if (!ws.ok()) {
status = ws;
break;
}
}
}
// 假如还有writer在进行写入,自己就以follwer的角色退出,仅设置自己的状态
auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit

status = w.FinalStatus();
}
// 假如退出时的状态为STATE_COMPLETED, 则写入任务已经完成,则退出
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
return w.FinalStatus();
}
// 下面处理状态为STATE_GROUP_LEADER,即作为整个batch的leader的情况
assert(w.state == WriteThread::STATE_GROUP_LEADER);

// leader选择其他writer加入整个batch中
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

if (status.ok()) {
// ...
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// 写入WAL文件
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
}
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;

if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);

if (!parallel) {
// 假如没有开启并行写memtable
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
} else {
// ...
// 唤醒其他的writer写入各自的memtable
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;

// leader判断是否需要写入memtable, 如果需要,仅完成自己的memtable的写入
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
PERF_TIMER_START(write_pre_and_post_process_time);

// ...
// 假如开启了并行写,判断目前的memtable写入状况
bool should_exit_batch_group = true;
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
// 假如Leader作为整个batch的最后一个写入完成的writer, 则设置其他的writer状态并选出下一个batch的
// leader,退出
if (should_exit_batch_group) {
// ...
write_thread_.ExitAsBatchGroupLeader(write_group, status);
}

if (status.ok()) {
status = w.FinalStatus();
}
return status;
}

Rocksdb为了防止数据丢失先写入WAL文件,再写入Memtable,我们来看一下WAL日志文件的写入实现WriteToWAL函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
// ...
// 合并多个Batch
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
// 写入WAL文件
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
// 如果设置了need_log_sync, 即WriteOptions::sync == true, 每条Log都会被刷盘
if (status.ok() && need_log_sync) {
// 将每条Log进行刷盘
for (auto& log : logs_) {
status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->Fsync();
}
}
// ...
return status;
}

写入WAL文件成功后,接下来写入Memtable,我们简单的来看一下单个Writer的Memtable写入实现InsertInto函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch) {
// 构造一个Memtable的Inserter
MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, has_valid_writes,
seq_per_batch);
// 对Memtable进行写入操作
Status s = batch->Iterate(&inserter);
// ...
return s;
}

batch->Iterate(&inserter)就是对Memtable的具体写入操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Status WriteBatch::Iterate(Handler* handler) const {
// ...
while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() &&
handler->Continue()) {
if (LIKELY(!s.IsTryAgain())) {
tag = 0;
column_family = 0; // default
// 从batch中读取数据
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
} else {
assert(s.IsTryAgain());
s = Status::OK();
}

// 根据操作类型进行不同的写入
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
// ...
s = handler->PutCF(column_family, key, value);
// ...
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
// ...
s = handler->DeleteCF(column_family, key);
// ...
}
// ...
}

所有的handle操作均在db/write_batch.cc,我们挑选其中的Delete操作来看看具体实现细节,DeleteCF()的具体实现函数是DeleteImpl()

1
2
3
4
5
6
7
8
9
10
11
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType delete_type) {
// ...
// 获取Column Family的Memtable
MemTable* mem = cf_mems_->GetMemTable();
// 调用Memtable的Add()对Memtable进行写入,注意传入的ValueType参数为delete_type
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
// ...
}

对Memtable的写入直接调用Add()接口,具体细节Rocksdb中的Memtable已详细介绍。