Ch12-LevelDB 之 Major Compaction
July 10, 2022
在 Minor Compaction 完成的时候,会对新生成的 version 计算一次 compaction_level_ 和 compaction_score_。这两个参数决定了当前 version 的 是否需要做 Major Compaction。
1. 整体介绍 #
整体流程代码描述如下
void DBImpl::BackgroundCompaction() {
Compaction* c;
if (is_manual) {
} else {
// 选择 Compaction 文件
c = versions_->PickCompaction();
}
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
/* 满足条件后将 level 层的文件移动到 level+1 层
* level 层的文件个数只有一个
* level 层文件与 level+1 层文件没有重叠
* level 层文件与 level+2 层的文件重叠部分的文件大小不超过阈值
*/
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
} else {
CompactionState* compact = new CompactionState(c);
// 执行 Compaction 操作
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
// 删除过期文件
RemoveObsoleteFiles();
}
}
Major Compaction 步骤如下
- 选择 Compaction 所需要的初始文件
- 扩展 Compaction 所需要的所有文件
- 执行 Compaction 操作
- 如果可以使用 TrivialMove,如果可以则直接移动 level 文件完成 Compaction
- 获取 MergingIterator 做归并排序
- 将归并好的数据借助 TableBuilder 写入 SSTable
- 删除不用的文件
2. 准备工作 #
判断 Compaction 类型,具体函数见 VersionSet::PickCompaction()
,而打分逻辑见 VersionSet::Finalize()
类型 | 说明 |
---|---|
size_compaction | 要求 current_->compaction_score_ >= 1 |
seek_compaction | 要求 current_->file_to_compact_ != nullptr |
Compaction* VersionSet::PickCompaction() {
Compaction* c;
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
// ...
return c;
}
3. 选择 Compaction 初始文件 #
具体函数见 VersionSet::PickCompaction()
类型 | 说明 |
---|---|
size_compaction | 当前 level 所有 files_ 中,若其中 file 最大 key 小于上次合并时最大 key,则该 file 将参与 Compaction |
seek_compaction | current_->file_to_compact_ 指向的那个文件 |
Compaction* VersionSet::PickCompaction() {
// ...
if (size_compaction) {
level = current_->compaction_level_;
c = new Compaction(options_, level);
// ...
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
// 初始文件的最大 key 要大于该层上次合并时,所有参与合并文件的最大 key
if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
}
// ...
return c;
}
步骤如下:
- 如果是 size_compaction
- 前提说明:在打分确定 current_->compaction_score_ 的时候就已经确定了 current_->compaction_level_
- 根据 current_->compaction_level_ 取出对应 level 的所有 file
- 如果该 file meta 中的 largest 小于上次 Compaction 中所有 file 的 largest,便将该 file 加入到待 compaction 的 file 列表中
- 如果待 compaction 的列表仍然为空,那么则将当前 level 的第一个 file 加入到待 compaction 的 file 列表
- 如果是 seek_compaction
- 将让 current_->file_to_compact_##allowed_seeks=0 的那个 file 加入到待 compaction 的 file 列表
有个小细节,无论是 size_compaction 还是 seek_compaction 都会确定接下来 Compaction 的 level。
通过判断 level==0 层文件数目和 level !=0 层数据总 size 来确定是否 compaction 是比较容易理解的,但是为什么还需要一个不通过 size 来判断的 compaction type 呢?我们来假设这么一种极端场景,每个 level 都达到了 compaction 的条件,那么一旦触发 compaction 就会产生类似雪崩的现象,为了避免这种现象的产生,让 compaction 提前进行就是个不错的优化项。基于此 LevelDB 引入了 seek 次数这个优化项。LevelDB 认为若干次读放大现象所耗费的时间可能会比做一次 Compaction 更加耗时,那么当对某些文件读取的次数超过 allowed_seeks(经验值)时,就触发 Compaction,这个 Compaction 名称就是 Seek Compaction。
4. 选择 Compaction 文件 #
具体函数见 VersionSet::PickCompaction()
类型 | 说明 |
---|---|
level == 0 | 所有存在交集的 SSTable |
level >= 1 | 根据 SSTable 的交集部分不断扩大参与合并的文件范围,直到 SSTable 不再有交集为止 |
Compaction* VersionSet::PickCompaction() {
Compaction* c;
if (level == 0) {
// 获取 level == 0 中 compaction 中 files_ 的最小值和最大值
GetRange(c->inputs_[0], &smallest, &largest);
// 从 Version 的 level == 0 中获取所有满足条件的 FileMetaData
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
}
SetupOtherInputs(c);
return c;
}
void VersionSet::SetupOtherInputs(Compaction* c) {
// ...
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
// 获取 level == i 中 compaction 中 files_ 的最小值和最大值
GetRange(c->inputs_[0], &smallest, &largest);
// 给定一个 key 的范围,选择 level == i 中所有和该范围有重叠的 sstable 文件加入集合
current_->GetOverlappingInputs(level + 1, &smallest, &largest, &c->inputs_[1]);
// ...
}
步骤如下
- 对于 level == 0
- 遍历当前 version 中 level==0 的所有 files_,计算得出当前 level 中的 smallest 和 largest
- 遍历当前 version 中 level==0 的所有 files_,将所有存在交集的 file 放入到待 Compaction 的 file 列表中
- 对于其他 level
- 获取到 level + 1 中与 level 中选定的文件有重叠的文件
5. 执行 Compaction 操作 #
Status DBImpl::DoCompactionWork(CompactionState* compact) {
// 构造 MergingIterator
Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
if (!drop) {
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(input->key(), input->value());
}
input->Next();
}
// 将 Compaction 结果重新写入 SSTable
status = InstallCompactionResults(compact);
}
具体函数见 Status DBImpl::DoCompactionWork(CompactionState* compact)
- 用合并的输入文件构造 MergingIterator
- 遍历 MergingIterator
- 这个过程就是对输入文件做归并排序的过程
- 如果遍历过程中发现有 imm memtable 文件存在,就会转而先做 minor compaction 并且会唤醒在 MakeRoomForWrite() 中等待 minor compaction 完成的线程
- 借助工具类 TableBuilder 构建 sstable 文件
- 将遍历迭代器产生的 kv 对加入 builder
- 如果当前文件大小超过阈值或和 level+2 层有太多的重叠部分
- 完成对该文件的写入,并打开新的 TableBuilder
6. 删除不用的文件 #
具体函数见 DBImpl::RemoveObsoleteFiles()
满足以下条件的数据项会被抛弃,不会加入到合并后的文件里
- 数据项的类型是删除
- 这个数据项比当前最老的 snapshot 还要老
- level + 2 以上的层都不包含这个 user_key(如果在 Compaction 阶段删掉了,用户读的时候就会读到错误的数据)
- 比这些数据项更老的所有相同 user_key 的数据项都会被抛弃
7. 其他事项 #
7.1 计分方式 #
LevelDB 中采取计分机制来决定下一次压缩应该在哪个层内进行,每次 Version 的变动都会更新 compaction_score_
。
- 计算每一层的计分被存放在
compaction_level_
和compaction_score_
,下次 Compaction 在会在计分最大的层里进行 compaction_score_ >= 1
说明已经触发了压缩的条件,必须要做 Compaction。
7.2 compaction_level_ 和 compaction_score_ 算法 #
void VersionSet::Finalize(Version* v) {
for (int level = 0; level < config::kNumLevels - 1; level++) {
/**
* 这里之所以对 level==0 进行特殊处理,主要基于一下两点
* 1. 使用更大的 write buffer 的情况下,这样就不会做太多的 L0->L1 的合并
* 2. level==0 层文件每次读的时候都要做归并,作者不希望 level==0 层有太多文件
*/
if (level == 0) {
// level==0 级文件数量 / level==0 级压缩阈值
score = v->files_[level].size()/static_cast<double>(config::kL0_CompactionTrigger);
} else {
// level==i 级文件大小总和 / level==i 级大小阈值(大小阈值为 10^i MB)
score = static_cast<double>(TotalFileSize(v->files_[level])) / MaxBytesForLevel(options_, level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
// VersionSet::PickCompaction() -> if(current_->compaction_score_ >= 1) { level = current_->compaction_level_; }