概述

Rocksdb的事务分为TransactionDB(PessimisticTransactionDB)和OptimisticTransactionDB,分别对应并发控制中的悲观锁和乐观锁。

概念

在了解Rocksdb的PessimisticTransaction具体细节之前,我们先来介绍几点事务的概念和SQL标准定义的事务隔离级别:

现象:

  • Dirty Reads:一个事务还未提交,另外一个事务访问此事务修改的数据,并使用,读取了事务中间状态数据。
  • Nonrepeatable Reads:一个事务读取同一条记录两次,由于两次读取间隔期间,另一个事务对数据进行了修改,使得事务两次读取的结果不一致,
  • Phantoms:事务A读取与搜索条件相匹配的若干行。事务B以插入或删除行等方式来修改事务A的结果集,然后再提交。

事务隔离级别:

  • Read Uncommitted:这是最低的事务隔离级别,所有事务都可以看到其他未提交事务的执行结果,存在脏读。
  • Read Committed:这个级别保证事务内读到的每一条数据行都是已经被commit的,不会存在脏读。
  • Repeatable Read:事务对操作的所有的行加读锁,对需要更新的行加写锁,这样其他事务不能操作这些行,确保事务读取数据时,多次操作会看到同样的数据行。
  • Serializable:串行化,强制事务之间进行排序,不会互相冲突。

源码分析

先来看一个Rocksdb的examples里有一些关于事务的例子,我们通过这些例子来逐一分析,首先看一个最简单事务例子

1
2
3
4
5
6
7
8
9
10
11
12
TransactionDB* txn_db;
// 新建一个事务DB,Rocksdb的事务DB默认为Pessimistic
Status s = TransactionDB::Open(options, path, &txn_db);
// 创建一个事务
Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
// 写入key
s = txn->Put(“key”, “value”);
s = txn->Delete(“key2”);
s = txn->Merge(“key3”, “value”);
// 提交事务
s = txn->Commit();
delete txn;

这是一个简单的PessimisticTransaction事务例子,我们来看看具体的接口实现:

Put接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// 尝试锁住Key
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);

if (s.ok()) {
// 通过事务内部的WriteBatch写入Key, 这个WriteBatch是这个事务独占的
s = GetBatchForWrite()->Put(column_family, key, value);
if (s.ok()) {
num_puts_++;
}
}

return s;
}

Put接口直接写入该事务独有的WriteBatch,假如在事务之外的写入:

1
s = txn_db->Put(write_options, "xyz", "zzz");

Rocksdb对于这种写入会在内部构造一个Transaction,然后直接Commit。

1
2
3
4
5
6
7
8
9
10
11
12
13
Status PessimisticTransactionDB::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
//...
Transaction* txn = BeginInternalTransaction(options);
//...
s = txn->PutUntracked(column_family, key, val);

if (s.ok()) {
s = txn->Commit();
}
//...
}

Get接口

1
2
3
4
5
6
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
pinnable_val);
}

Get接口优先读该事务的WriteBatch,然后再读DB。

Commit接口

Commit接口首先判断事务是否超时,然后通过正常的写入流程:首先写WAL然后写入Memtable,以此来提交事务。

Rocksdb的事务隔离级别

Read Committed

Rocksdb支持的事务隔离级别分别是Read CommittedRepeatable Read,Rocksdb利用Snapshot来实现Read Committed。

1
2
3
4
5
6
7
8
9
10
txn = txn_db->BeginTransaction(write_options);
txn->SetSnapshot();

// 在事务之外写入一个Key
db->Put(write_options, “key1”, “value0”);

// 通过事务更新这个Key
s = txn->Put(“key1”, “value1”);
s = txn->Commit();
// 事务的提交将会失败,即使事务之外的那次写入先于事务的写入。

通过使用SetSnapshot()来实现Read Committed,我们再来回顾一下Read Committed的概念: 事务读取所有的数据都必须是Commited的。我们来看一下Read committed更为详细的定义:

This isolation level guarantees that any data read is committed at the moment it is read. Thus it does not allows dirty read. The transaction hold a read or write lock on the current row, and thus prevent other rows from reading, updating or deleting it.

所以在SetSnapshot()之后这次事务的commit是失败的,那SetSnapshot是如何来实现的呢? 这是Snapshot的定义:

1
2
3
4
5
6
7
8
9
10
class SnapshotImpl : public Snapshot {
public:
// 记录一个sequence number
SequenceNumber number_; // const after creation
//
SequenceNumber min_uncommitted_ = 0;

private:
// 双向链表...
};

Snapshot其实就是记录一个sequence number的双向链表,在Rocksdb中每一条记录都有一个sequence number,从0开始全局唯一递增。

在PessimisticTransactionDB中,前面我们介绍了Put接口一开始会调用TryLock()尝试锁住所有的key,在TryLock接口中会对Snapshot进行检查,假如存在key被修改的情况,Put()就会失败,从而导致整个事务fail,这就符合了Read Committed的定义。

Repeatable Read

This is the most restrictive isolation level. The transaction holds read locks on all rows it references and write locks on all rows it inserts, updates, or deletes. Since other transaction cannot read, update or delete these rows, consequently it avoids non repeatable read.

Repeatable read要求在事务内多次读取的结果必须一致,在Rocksdb中提供GetForUpdate()来实现,

1
2
3
4
5
6
7
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool exclusive) {
Status s = TryLock(column_family, key, true /* read_only */, exclusive);
// ...
}

GetForUpdate读取该Key并对其上锁独占,这就确保了后面的读取结果不会被其他事务修改,以此来达到Repeated Read的要求。

OptimisticTransactionDB

与PessimisticTransaction不同的是OptimisticTransaction的Put不需要使用TryLock()加锁Key, 只需要在Commit阶段通过CheckKeysForConflicts()进行冲突检测。