|
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteContext write_context;
// 新建一个writer
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
// 将新建的writer加入batch队列中
write_thread_.JoinBatchGroup(&w);
// 判断状态是否为STATE_GROUP_LEADER, 即一个batch的leader
if (w.state == WriteThread::STATE_GROUP_LEADER) {
// ...
// leader选择其他writer加入整个batch中
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
size_t total_count = 0;
size_t total_byte_size = 0;
// ...
// 需要写入WAL
if (w.ShouldWriteToWAL()) {
// ...
// 写入WAL
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
}
// batch的Leader完成WAL的写入后退出,稍后会介绍该该函数
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}
// 假如状态为STATE_MEMTABLE_WRITER_LEADER, 即作为memtable的batch中的Leader
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(w.status.ok());
// leader选择其他的writer加入整个batch
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
// 假如允许并发写memtable,leader唤醒其他writer来完成各自的memtable写入任务
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
// 否则由memtable的batch中所设置的leader来完成memtable的写入
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, seq_per_batch_);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
}
// 假如状态为STATE_PARALLEL_MEMTABLE_WRITER,即memtale的batch中的writer
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(w.ShouldWriteToMemtable());
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
// memtable的写入
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
// 查看memtable的batch中的所有writers是否已经全部完成了写入
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// 假如全部完成了写入,则以memtable的batch的leader身份设置其他的writer状态,
// 并选择下一个memtabl的batch的leader.
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
assert(w.state == WriteThread::STATE_COMPLETED);
return w.FinalStatus();
}
|