Ch12-LevelDB 之 Major Compaction
July 10, 2022
在 Minor Compaction 完成的时候,会对新生成的 version 计算一次 compaction_level_ 和 compaction_score_。这两个参数决定了当前 version 的 是否需要做 Major Compaction。
1. 整体介绍 #
整体流程代码描述如下
void DBImpl::BackgroundCompaction() {
    Compaction* c;
    if (is_manual) {
    } else {
        // 选择 Compaction 文件
        c = versions_->PickCompaction();
    }
    if (c == nullptr) {
        // Nothing to do
    } else if (!is_manual && c->IsTrivialMove()) {
        /* 满足条件后将 level 层的文件移动到 level+1 层
         * level 层的文件个数只有一个
         * level 层文件与 level+1 层文件没有重叠
         * level 层文件与 level+2 层的文件重叠部分的文件大小不超过阈值
         */
        FileMetaData* f = c->input(0, 0);
        c->edit()->RemoveFile(c->level(), f->number);
        c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest);
        status = versions_->LogAndApply(c->edit(), &mutex_);
    } else {
        CompactionState* compact = new CompactionState(c);
        // 执行 Compaction 操作
        status = DoCompactionWork(compact);
        CleanupCompaction(compact);
        c->ReleaseInputs();
        // 删除过期文件
        RemoveObsoleteFiles();
    }
}
Major Compaction 步骤如下
- 选择 Compaction 所需要的初始文件
 - 扩展 Compaction 所需要的所有文件
 - 执行 Compaction 操作
- 如果可以使用 TrivialMove,如果可以则直接移动 level 文件完成 Compaction
 - 获取 MergingIterator 做归并排序
 - 将归并好的数据借助 TableBuilder 写入 SSTable
 
 - 删除不用的文件
 
2. 准备工作 #
判断 Compaction 类型,具体函数见 VersionSet::PickCompaction(),而打分逻辑见 VersionSet::Finalize()
| 类型 | 说明 | 
|---|---|
| size_compaction | 要求 current_->compaction_score_ >= 1 | 
| seek_compaction | 要求 current_->file_to_compact_ != nullptr | 
Compaction* VersionSet::PickCompaction() {
    Compaction* c;
    const bool size_compaction = (current_->compaction_score_ >= 1);
    const bool seek_compaction = (current_->file_to_compact_ != nullptr);
    // ...
    return c;
}
3. 选择 Compaction 初始文件 #
具体函数见 VersionSet::PickCompaction()
| 类型 | 说明 | 
|---|---|
| size_compaction | 当前 level 所有 files_ 中,若其中 file 最大 key 小于上次合并时最大 key,则该 file 将参与 Compaction | 
| seek_compaction | current_->file_to_compact_ 指向的那个文件 | 
Compaction* VersionSet::PickCompaction() {
    // ...
    if (size_compaction) {
        level = current_->compaction_level_;
        c = new Compaction(options_, level);
        // ...
        for (size_t i = 0; i < current_->files_[level].size(); i++) {
            FileMetaData* f = current_->files_[level][i];
            // 初始文件的最大 key 要大于该层上次合并时,所有参与合并文件的最大 key
            if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
                c->inputs_[0].push_back(f);
                break;
            }
        }
        if (c->inputs_[0].empty()) {
            c->inputs_[0].push_back(current_->files_[level][0]);
        }
    } else if (seek_compaction) {
        level = current_->file_to_compact_level_;
        c = new Compaction(options_, level);
        c->inputs_[0].push_back(current_->file_to_compact_);
    }
    // ...
    return c;
}
步骤如下:
- 如果是 size_compaction
- 前提说明:在打分确定 current_->compaction_score_ 的时候就已经确定了 current_->compaction_level_
 - 根据 current_->compaction_level_ 取出对应 level 的所有 file
- 如果该 file meta 中的 largest 小于上次 Compaction 中所有 file 的 largest,便将该 file 加入到待 compaction 的 file 列表中
 - 如果待 compaction 的列表仍然为空,那么则将当前 level 的第一个 file 加入到待 compaction 的 file 列表
 
 
 - 如果是 seek_compaction
- 将让 current_->file_to_compact_##allowed_seeks=0 的那个 file 加入到待 compaction 的 file 列表
 
 
有个小细节,无论是 size_compaction 还是 seek_compaction 都会确定接下来 Compaction 的 level。
通过判断 level==0 层文件数目和 level !=0 层数据总 size 来确定是否 compaction 是比较容易理解的,但是为什么还需要一个不通过 size 来判断的 compaction type 呢?我们来假设这么一种极端场景,每个 level 都达到了 compaction 的条件,那么一旦触发 compaction 就会产生类似雪崩的现象,为了避免这种现象的产生,让 compaction 提前进行就是个不错的优化项。基于此 LevelDB 引入了 seek 次数这个优化项。LevelDB 认为若干次读放大现象所耗费的时间可能会比做一次 Compaction 更加耗时,那么当对某些文件读取的次数超过 allowed_seeks(经验值)时,就触发 Compaction,这个 Compaction 名称就是 Seek Compaction。
4. 选择 Compaction 文件 #
具体函数见 VersionSet::PickCompaction()
| 类型 | 说明 | 
|---|---|
| level == 0 | 所有存在交集的 SSTable | 
| level >= 1 | 根据 SSTable 的交集部分不断扩大参与合并的文件范围,直到 SSTable 不再有交集为止 | 
Compaction* VersionSet::PickCompaction() {
    Compaction* c;
    if (level == 0) {
        // 获取 level == 0 中 compaction 中 files_ 的最小值和最大值
        GetRange(c->inputs_[0], &smallest, &largest);
        // 从 Version 的 level == 0 中获取所有满足条件的 FileMetaData
        current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    }
    SetupOtherInputs(c);
    return c;
}
void VersionSet::SetupOtherInputs(Compaction* c) {
    // ...
    AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
    // 获取 level == i 中 compaction 中 files_ 的最小值和最大值
    GetRange(c->inputs_[0], &smallest, &largest);
    // 给定一个 key 的范围,选择 level == i 中所有和该范围有重叠的 sstable 文件加入集合
    current_->GetOverlappingInputs(level + 1, &smallest, &largest, &c->inputs_[1]);
    // ...
}
步骤如下
- 对于 level == 0
- 遍历当前 version 中 level==0 的所有 files_,计算得出当前 level 中的 smallest 和 largest
 - 遍历当前 version 中 level==0 的所有 files_,将所有存在交集的 file 放入到待 Compaction 的 file 列表中
 
 - 对于其他 level
- 获取到 level + 1 中与 level 中选定的文件有重叠的文件
 
 
5. 执行 Compaction 操作 #
Status DBImpl::DoCompactionWork(CompactionState* compact) {
    // 构造 MergingIterator
    Iterator* input = versions_->MakeInputIterator(compact->compaction);
    input->SeekToFirst();
    while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
        if (!drop) {
            compact->current_output()->largest.DecodeFrom(key);
            compact->builder->Add(input->key(), input->value());
        }
        input->Next();
    }
    // 将 Compaction 结果重新写入 SSTable
    status = InstallCompactionResults(compact);
}
具体函数见 Status DBImpl::DoCompactionWork(CompactionState* compact)
- 用合并的输入文件构造 MergingIterator
 - 遍历 MergingIterator
- 这个过程就是对输入文件做归并排序的过程
 - 如果遍历过程中发现有 imm memtable 文件存在,就会转而先做 minor compaction 并且会唤醒在 MakeRoomForWrite() 中等待 minor compaction 完成的线程
 
 - 借助工具类 TableBuilder 构建 sstable 文件
- 将遍历迭代器产生的 kv 对加入 builder
 
 - 如果当前文件大小超过阈值或和 level+2 层有太多的重叠部分
- 完成对该文件的写入,并打开新的 TableBuilder
 
 
6. 删除不用的文件 #
具体函数见 DBImpl::RemoveObsoleteFiles()
满足以下条件的数据项会被抛弃,不会加入到合并后的文件里
- 数据项的类型是删除
 - 这个数据项比当前最老的 snapshot 还要老
 - level + 2 以上的层都不包含这个 user_key(如果在 Compaction 阶段删掉了,用户读的时候就会读到错误的数据)
 - 比这些数据项更老的所有相同 user_key 的数据项都会被抛弃
 
7. 其他事项 #
7.1 计分方式 #
LevelDB 中采取计分机制来决定下一次压缩应该在哪个层内进行,每次 Version 的变动都会更新 compaction_score_。
- 计算每一层的计分被存放在 
compaction_level_和compaction_score_,下次 Compaction 在会在计分最大的层里进行 compaction_score_ >= 1说明已经触发了压缩的条件,必须要做 Compaction。
7.2 compaction_level_ 和 compaction_score_ 算法 #
void VersionSet::Finalize(Version* v) {
    for (int level = 0; level < config::kNumLevels - 1; level++) {
        /**
         * 这里之所以对 level==0 进行特殊处理,主要基于一下两点
         * 1. 使用更大的 write buffer 的情况下,这样就不会做太多的 L0->L1 的合并
         * 2. level==0 层文件每次读的时候都要做归并,作者不希望 level==0 层有太多文件
         */
        if (level == 0) {
            // level==0 级文件数量 / level==0 级压缩阈值
            score = v->files_[level].size()/static_cast<double>(config::kL0_CompactionTrigger);
        } else {
            // level==i 级文件大小总和 / level==i 级大小阈值(大小阈值为 10^i MB)
            score = static_cast<double>(TotalFileSize(v->files_[level])) / MaxBytesForLevel(options_, level);
        }
        if (score > best_score) {
            best_level = level;
            best_score = score;
        }
    }
    v->compaction_level_ = best_level;
    v->compaction_score_ = best_score;
}
// VersionSet::PickCompaction() -> if(current_->compaction_score_ >= 1) { level = current_->compaction_level_; }