概述

Rocksdb的写入操作是通过Put()接口完成的。在Rocksdb中维护一个队列,但只有队头的Leader可以写入, 队头可以将自己和后面的Writer任务合并操作。

为了防止数据丢失,Rocksdb设计首先写入WAL文件,再写入Memtable,在队头Leader完成WAL写入任务后,假如参数allow_concurrent_memtable_write被设置为true整个队列的数量大于1, Rocksdb就会采取并行写Memtable的策略,即Leader唤醒后面的Writer任务,将它们的状态为返回为STATE_PARALLEL_MEMTABLE_WRITER, 后面的Writer就会各自完成自己的Memtable写入任务。

假如Rocksdb没有设置allow_concurrent_memtable_write参数, Leader会代为写入Memtable,完成之后唤醒其他Writer任务,将它们的返回状态设置为STATE_COMPLETED, 然后指定一个新的队头Leader,最后Writer各自返回写入的Status

Put的具体实现在db/db_impl_write.cc.

实现

Rocksdb的Put()接口组装一个batch之后,调用Write()

1
2
3
4
5
6
7
8
9
10
11
12
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// 构造一个batch, 然后将key, value组合起来存放在batch中
WriteBatch batch(key.size() + value.size() + 24);
Status s = batch.Put(column_family, key, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}

这里我们直接来看Write()的实现,即Write()调用WriteImpl():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
// ...
// 如果关闭了Memtable选项, 只需要写入WAL文件
if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used, batch_cnt, pre_release_callback);
}

// 如果打开了enable_pipelined_write, 可以进行Pipeline写入
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
}
// ...
// 新建一个Writer任务
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);
// 将Writer加入至WriteGroup, 稍后的博文详细介绍JoinBatchGroup
write_thread_.JoinBatchGroup(&w);
// 如果Writer的状态是STATE_PARALLEL_MEMTABLE_WRITER, 即并行完成Memtable写入操作
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// 如果需要写入Memtable
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
// 写入Memtable
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);
}
}

// 如果Writer的状态是STATE_COMPLETED
// 即写入已经完成,WriteGroup的Leader已经更新了sequence,直接返回状态
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
return w.FinalStatus();
}

// 如果Writer返回的状态是STATE_GROUP_LEADER, 即作为队头Leader
// 1. 首先完成WAL文件的写入
// 2. 根据是否能并发写Memtable, 完成Memtable的写入
// 2.1 允许并发写,唤醒其他Writer,各自去完成Memtable的写入
assert(w.state == WriteThread::STATE_GROUP_LEADER);
// ...
if (!two_write_queues_ || !disable_memtable) {
// Write操作的预处理
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
}
// ...
// 如果Write预处理成功
if (status.ok()) {
// ...
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
// 写入WAL文件
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
}
} else {
if (status.ok() && !write_options.disableWAL) {
// 并发写入WAL文件
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
}
// ...
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;

// 如果WAL写入成功,准备写入Memtable
if (status.ok()) {
if (!parallel) {
// 如果没有设置并发写入Memtable
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
} else {
// ...
// 设置WriteGroup中的其他Writer的状态为STATE_PARALLEL_MEMTABLE_WRITER
// 即唤醒其他Writer,让它们完成自己的Memtable写入任务.
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;

// Leader完成自己Memtable写入任务
if (w.ShouldWriteToMemtable()) {
// ...
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt);
}
}
// ...
}

// 如果需要将WAL的Log刷盘
if (need_log_sync) {
// ...
if (two_write_queues_) {
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
}
}

Rocksdb为了防止数据丢失先写入WAL文件,再写入Memtable,我们来看一下WAL日志文件的写入实现WriteToWAL函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
// ...
// 合并多个Batch
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
// 写入WAL文件
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
// 如果设置了need_log_sync, 即WriteOptions::sync == true, 每条Log都会被刷盘
if (status.ok() && need_log_sync) {
// 将每条Log进行刷盘
for (auto& log : logs_) {
status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->Fsync();
}
}
// ...
return status;
}

写入WAL文件成功后,接下来写入Memtable,我们简单的来看一下单个Writer的Memtable写入实现InsertInto函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch) {
// 构造一个Memtable的Inserter
MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, has_valid_writes,
seq_per_batch);
// 对Memtable进行写入操作
Status s = batch->Iterate(&inserter);
// ...
return s;
}

batch->Iterate(&inserter)就是对Memtable的具体写入操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Status WriteBatch::Iterate(Handler* handler) const {
// ...
while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() &&
handler->Continue()) {
if (LIKELY(!s.IsTryAgain())) {
tag = 0;
column_family = 0; // default
// 从batch中读取数据
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
} else {
assert(s.IsTryAgain());
s = Status::OK();
}

// 根据操作类型进行不同的写入
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
// ...
s = handler->PutCF(column_family, key, value);
// ...
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
// ...
s = handler->DeleteCF(column_family, key);
// ...
}
// ...
}

所有的handle操作均在db/write_batch.cc,我们挑选其中的Delete操作来看看具体实现细节,DeleteCF()的具体实现函数是DeleteImpl()

1
2
3
4
5
6
7
8
9
10
11
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType delete_type) {
// ...
// 获取Column Family的Memtable
MemTable* mem = cf_mems_->GetMemTable();
// 调用Memtable的Add()对Memtable进行写入,注意传入的ValueType参数为delete_type
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
// ...
}

这里对Memtable的写入直接调用Add()接口,具体细节Rocksdb中的Memtable已详细介绍。

Pipeline写入

代码实现:db/db_impl_write.ccPipelinedWriteImpl(),如果打开了enable_pipelined_write选项,Rocksdb支持Pipeline写入:具体实现与前面所讲的类似,Leader完成WAL文件的写入,之后设置一个Memtable Writer的Ledaer,用来并行的完成Memtable的写入。之后重新在设置下一个Group的Leader,即在执行Memtable的写入任务时,开启下一个Group的写入任务,用来达到Pipeline写入的效果。MEMTABLE_WRITER_LEADER的逻辑与GROUP_LEADER类似:完成自己的Memtable写入任务,唤醒Group中的其他Writer,并行的完成Memtable的写入任务。