Ch10-LevelDB 之 Put
June 20, 2022
Put 流程
Put 写入到 WAL 中,就表示写入成功了,所以代码中可以看到 Put 操作的第一步就是将操作封装成 Log Writer。如果 MemTable 还没来的及 Compaction 到 SSTable 就宕机了,那么在下次启动 Open 的时候,会将其恢复回来写入到 SSTable 中;
// include/leveldb/db.h
class LEVELDB_EXPORT DB {
virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) = 0;
}
// db/db_impl.h
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
LevelDB 的所有写操作都会被封装成 Writer,而 Writer 又会被放到 DB 维护的 writers_ 队列中。
struct DBImpl::Writer {
explicit Writer(port::Mutex* mu): batch(nullptr), sync(false), done(false), cv(mu) {}
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};
class DBImpl : public DB {
private:
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
...
writers_.push_back(&w);
...
// MemTable 是否达到 Flush 阈值,
Status status = MakeRoomForWrite(updates == nullptr);
if (status.ok() && updates != nullptr) {
/* 合并 Writer
* 1. 队头 sync 必须是 false
* 2. 当前队列中的 batch_size 小于 1MB(可能会被调整)
*/
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
...
// 写入 WAL
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
...
// Flush WAL 和 写入到 MemTable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}