1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteContext write_context; WriteThread::Writer w(write_options, my_batch, callback, log_ref, disable_memtable); write_thread_.JoinBatchGroup(&w); if (w.state == WriteThread::STATE_GROUP_LEADER) { last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); const SequenceNumber current_sequence = write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; size_t total_count = 0; size_t total_byte_size = 0; if (w.ShouldWriteToWAL()) { w.status = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, need_log_dir_sync, current_sequence); }
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); }
WriteThread::WriteGroup memtable_write_group; if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { PERF_TIMER_GUARD(write_memtable_time); assert(w.status.ok()); write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); if (memtable_write_group.size > 1 && immutable_db_options_.allow_concurrent_memtable_write) { write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); } else { memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 , this, seq_per_batch_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { assert(w.ShouldWriteToMemtable()); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 , this, true ); if (write_thread_.CompleteParallelMemTableWriter(&w)) { MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); write_thread_.ExitAsMemTableWriter(&w, *w.write_group); } } if (seq_used != nullptr) { *seq_used = w.sequence; } assert(w.state == WriteThread::STATE_COMPLETED); return w.FinalStatus(); }
|