概述

因为RocksDB采用LSM-Tree的算法实现,用户使用RocksDB进行插入或删除操作key-value,首先写入内存的数据结构—Memtable,一旦一个Memtable被写满,它就会转为Immutable,然后被替换为一个新的Memtable.

Memtable 和 Immutable Memtable

在RocksDB中,Memtable 和 Immutable memtable都位于内存, 它俩属于一样的数据结构,唯一的区别是Memtable可读可写,而Immutable memtable是只读的,不允许写入。RocksDB引入了Column family的概念,在一个Column family中只有一个Memtable,但允许存在多个Immutable memtable. 与LevelDB不同的是,RocksDB实现多个数据结构类型的Memtable,默认的是SkipList,即跳跃表。用户可以根据不同的需求,选择不同的Memtable实现,参考Wiki-Memtable

Memtable 与Immutable Memtable的转换

在一个column family中,维护一个Memtable数据结构mem_和一个Immutable Memtable的列表imm_:

1
2
MemTable* mem_;
MemTableList imm_;

当达到限定条件,需要将Memtable转为Immutable Memtable时:首先构造一个新的Memtable, 然后将旧的Memtable添加至imm_,增加新的Memtable的引用计数,最后将新的Memtable添加至column family:

1
2
3
4
5
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
// ...
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref();
cfd->SetMemtable(new_mem);

Memtable的数据结构

我们以默认的SkipList来分析Memtable, 源码在db/memtable.cc,在一个Memtable中维护两个SkipList,其中范围删除插入range_del_table_,其余的操作写入table_。首先来看Memtable定义的操作接口: Add()Get()

Add()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
// 一条key-value Entry的数据格式
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t val_size = static_cast<uint32_t>(value.size());
uint32_t internal_key_size = key_size + 8;
const uint32_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = nullptr;
// 通过判断key-value的类型来选择memtable, 范围删除的kv插入range_del_table_
std::unique_ptr<MemTableRep>& table =
type == kTypeRangeDeletion ? range_del_table_ : table_;
KeyHandle handle = table->Allocate(encoded_len, &buf);
//...
// 是否允许并发插入
if (!allow_concurrent) {
// 是否制定了函数提取key的前缀
if (insert_with_hint_prefix_extractor_ != nullptr &&
insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
// ...
bool res = table->InsertWithHint(handle, &insert_hints_[prefix]);
} else {
// 插入key-value pair
bool res = table->Insert(handle);
}
} else {
// 插入key-value pair
bool res = table->InsertConcurrently(handle);
}
return true;
}

Add()函数将用户的key和value封装成一个buf,然后根据不同的条件调用table->Insert()插入至Memtable。table就是Memtable的工厂类实现,默认SkiplistRep, 即通过调用SkipListInsert()完成key的插入。

Get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
// 在range_del_table_上初始化一个迭代器
std::unique_ptr<InternalIterator> range_del_iter(
NewRangeTombstoneIterator(read_opts));
Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!status.ok()) {
*s = status;
return false;
}

Slice user_key = key.user_key();
// 利用前缀提取过滤判断key是否存在
bool const may_contain =
nullptr == prefix_bloom_
? false
: prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
if (prefix_bloom_ && !may_contain) {
// 如果前缀过滤器通知这个key不存在
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.range_del_agg = range_del_agg;
// ...
// 进入默认的SkipList进行查找,利用SkipList的Iterator查找
table_->Get(key, &saver, SaveValue);

*seq = saver.seq;
}
// ...
return found_final_value;
}

Memtable的Get()调用了SkipListRepGet()接口,通过SkipList的FindGreaterOrEqual()来查找。查找出来的key会被传入的回调函数SaveValue()根据type处理,例如kTypeDeletion就返回NotFound()

我们顺便来看一下SkipList的实现: memtable/skiplist.h

SkipList数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SkipList {
private:
struct Node;

public:

explicit SkipList(Comparator cmp, Allocator* allocator,
int32_t max_height = 12, int32_t branching_factor = 4);
void Insert(const Key& key);
bool Contains(const Key& key) const;
uint64_t EstimateCount(const Key& key) const;

// 用来迭代SkipList的迭代器
class Iterator {
// ...
};

private:
// ...
Comparator const compare_;
Allocator* const allocator_; // Allocator used for allocations of nodes
Node* const head_;
std::atomic<int> max_height_; // Height of the entire list
};

SkipList本质上是一个带有分层的有序链表:

Node的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
template<typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) { }

Key const key;
// ...

private:
// 存放该节点的next_节点的数组
// 数组大小为该节点的height,当调用NewNode()分配内存初始化整个数组
std::atomic<Node*> next_[1];
};

SkipList的插入

这个就是Memtable的Add()最后写入Skiplist调用的接口Insert()。插入的时候随机生成一个高度height,将构造的新的节点从第0层到height层分别插入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
template<typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// 这里对连续插入做了优化,prev_[0]记录了上次插入的Node,prev_height_记录了上次
// 插入的最大层数。如果新的key大于prev_[0]的值并且小于prev_[0]的Next节点的值,即
// prev_[0]就是新的key的前节点。
if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
(prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
//...
// 按照层级将每层的前节点补齐。
for (int i = 1; i < prev_height_; i++) {
prev_[i] = prev_[0];
}
} else {
// 逐层查找该key待插入位置的前节点
FindLessThan(key, prev_);
}

// skiplist不允许重复插入
assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key));

int height = RandomHeight();
// 如果当前节点的高度大于最高节点,则高出部分的的前节点都是头节点。
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev_[i] = head_;
}
// ...
}

Node* x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// 从0层开始插入,直到height层,原理就是简单的链表插入
x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i));
prev_[i]->SetNext(i, x);
}
// 这里记录了插入的节点和height,为了连续插入做优化
prev_[0] = x;
prev_height_ = height;
}

SkipList的迭代

SkipList的Iterator迭代比较简单,直接从第0层的单链表根据next_[0]顺序迭代。