Ch12-LevelDB 之 Major Compaction

Ch12-LevelDB 之 Major Compaction

July 10, 2022
LevelDB
leveldb

在 Minor Compaction 完成的时候,会对新生成的 version 计算一次 compaction_level_ 和 compaction_score_。这两个参数决定了当前 version 的 是否需要做 Major Compaction。

1. 整体介绍 #

整体流程代码描述如下

void DBImpl::BackgroundCompaction() {
    Compaction* c;
    if (is_manual) {
    } else {
        // 选择 Compaction 文件
        c = versions_->PickCompaction();
    }

    if (c == nullptr) {
        // Nothing to do
    } else if (!is_manual && c->IsTrivialMove()) {
        /* 满足条件后将 level 层的文件移动到 level+1 层
         * level 层的文件个数只有一个
         * level 层文件与 level+1 层文件没有重叠
         * level 层文件与 level+2 层的文件重叠部分的文件大小不超过阈值
         */
        FileMetaData* f = c->input(0, 0);
        c->edit()->RemoveFile(c->level(), f->number);
        c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest);
        status = versions_->LogAndApply(c->edit(), &mutex_);
    } else {
        CompactionState* compact = new CompactionState(c);
        // 执行 Compaction 操作
        status = DoCompactionWork(compact);
        CleanupCompaction(compact);
        c->ReleaseInputs();
        // 删除过期文件
        RemoveObsoleteFiles();
    }
}

Major Compaction 步骤如下

  • 选择 Compaction 所需要的初始文件
  • 扩展 Compaction 所需要的所有文件
  • 执行 Compaction 操作
    • 如果可以使用 TrivialMove,如果可以则直接移动 level 文件完成 Compaction
    • 获取 MergingIterator 做归并排序
    • 将归并好的数据借助 TableBuilder 写入 SSTable
  • 删除不用的文件

2. 准备工作 #

判断 Compaction 类型,具体函数见 VersionSet::PickCompaction(),而打分逻辑见 VersionSet::Finalize()

类型 说明
size_compaction 要求 current_->compaction_score_ >= 1
seek_compaction 要求 current_->file_to_compact_ != nullptr
Compaction* VersionSet::PickCompaction() {
    Compaction* c;

    const bool size_compaction = (current_->compaction_score_ >= 1);
    const bool seek_compaction = (current_->file_to_compact_ != nullptr);
    // ...

    return c;
}

3. 选择 Compaction 初始文件 #

具体函数见 VersionSet::PickCompaction()

类型 说明
size_compaction 当前 level 所有 files_ 中,若其中 file 最大 key 小于上次合并时最大 key,则该 file 将参与 Compaction
seek_compaction current_->file_to_compact_ 指向的那个文件
Compaction* VersionSet::PickCompaction() {
    // ...
    if (size_compaction) {
        level = current_->compaction_level_;
        c = new Compaction(options_, level);
        // ...
        for (size_t i = 0; i < current_->files_[level].size(); i++) {
            FileMetaData* f = current_->files_[level][i];
            // 初始文件的最大 key 要大于该层上次合并时,所有参与合并文件的最大 key
            if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
                c->inputs_[0].push_back(f);
                break;
            }
        }
        if (c->inputs_[0].empty()) {
            c->inputs_[0].push_back(current_->files_[level][0]);
        }
    } else if (seek_compaction) {
        level = current_->file_to_compact_level_;
        c = new Compaction(options_, level);
        c->inputs_[0].push_back(current_->file_to_compact_);
    }

    // ...
    return c;
}

步骤如下:

  • 如果是 size_compaction
    • 前提说明:在打分确定 current_->compaction_score_ 的时候就已经确定了 current_->compaction_level_
    • 根据 current_->compaction_level_ 取出对应 level 的所有 file
      • 如果该 file meta 中的 largest 小于上次 Compaction 中所有 file 的 largest,便将该 file 加入到待 compaction 的 file 列表中
      • 如果待 compaction 的列表仍然为空,那么则将当前 level 的第一个 file 加入到待 compaction 的 file 列表
  • 如果是 seek_compaction
    • 将让 current_->file_to_compact_##allowed_seeks=0 的那个 file 加入到待 compaction 的 file 列表

有个小细节,无论是 size_compaction 还是 seek_compaction 都会确定接下来 Compaction 的 level。

通过判断 level==0 层文件数目和 level !=0 层数据总 size 来确定是否 compaction 是比较容易理解的,但是为什么还需要一个不通过 size 来判断的 compaction type 呢?我们来假设这么一种极端场景,每个 level 都达到了 compaction 的条件,那么一旦触发 compaction 就会产生类似雪崩的现象,为了避免这种现象的产生,让 compaction 提前进行就是个不错的优化项。基于此 LevelDB 引入了 seek 次数这个优化项。LevelDB 认为若干次读放大现象所耗费的时间可能会比做一次 Compaction 更加耗时,那么当对某些文件读取的次数超过 allowed_seeks(经验值)时,就触发 Compaction,这个 Compaction 名称就是 Seek Compaction。

4. 选择 Compaction 文件 #

具体函数见 VersionSet::PickCompaction()

类型 说明
level == 0 所有存在交集的 SSTable
level >= 1 根据 SSTable 的交集部分不断扩大参与合并的文件范围,直到 SSTable 不再有交集为止
Compaction* VersionSet::PickCompaction() {
    Compaction* c;

    if (level == 0) {
        // 获取 level == 0 中 compaction 中 files_ 的最小值和最大值
        GetRange(c->inputs_[0], &smallest, &largest);
        // 从 Version 的 level == 0 中获取所有满足条件的 FileMetaData
        current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    }

    SetupOtherInputs(c);

    return c;
}

void VersionSet::SetupOtherInputs(Compaction* c) {
    // ...
    AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
    // 获取 level == i 中 compaction 中 files_ 的最小值和最大值
    GetRange(c->inputs_[0], &smallest, &largest);
    // 给定一个 key 的范围,选择 level == i 中所有和该范围有重叠的 sstable 文件加入集合
    current_->GetOverlappingInputs(level + 1, &smallest, &largest, &c->inputs_[1]);
    // ...
}

步骤如下

  • 对于 level == 0
    • 遍历当前 version 中 level==0 的所有 files_,计算得出当前 level 中的 smallest 和 largest
    • 遍历当前 version 中 level==0 的所有 files_,将所有存在交集的 file 放入到待 Compaction 的 file 列表中
  • 对于其他 level
    • 获取到 level + 1 中与 level 中选定的文件有重叠的文件

5. 执行 Compaction 操作 #

Status DBImpl::DoCompactionWork(CompactionState* compact) {
    // 构造 MergingIterator
    Iterator* input = versions_->MakeInputIterator(compact->compaction);
    input->SeekToFirst();
    while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
        if (!drop) {
            compact->current_output()->largest.DecodeFrom(key);
            compact->builder->Add(input->key(), input->value());
        }
        input->Next();
    }
    // 将 Compaction 结果重新写入 SSTable
    status = InstallCompactionResults(compact);
}

具体函数见 Status DBImpl::DoCompactionWork(CompactionState* compact)

  • 用合并的输入文件构造 MergingIterator
  • 遍历 MergingIterator
    • 这个过程就是对输入文件做归并排序的过程
    • 如果遍历过程中发现有 imm memtable 文件存在,就会转而先做 minor compaction 并且会唤醒在 MakeRoomForWrite() 中等待 minor compaction 完成的线程
  • 借助工具类 TableBuilder 构建 sstable 文件
    • 将遍历迭代器产生的 kv 对加入 builder
  • 如果当前文件大小超过阈值或和 level+2 层有太多的重叠部分
    • 完成对该文件的写入,并打开新的 TableBuilder

6. 删除不用的文件 #

具体函数见 DBImpl::RemoveObsoleteFiles()

满足以下条件的数据项会被抛弃,不会加入到合并后的文件里

  • 数据项的类型是删除
  • 这个数据项比当前最老的 snapshot 还要老
  • level + 2 以上的层都不包含这个 user_key(如果在 Compaction 阶段删掉了,用户读的时候就会读到错误的数据)
  • 比这些数据项更老的所有相同 user_key 的数据项都会被抛弃

7. 其他事项 #

7.1 计分方式 #

LevelDB 中采取计分机制来决定下一次压缩应该在哪个层内进行,每次 Version 的变动都会更新 compaction_score_

  • 计算每一层的计分被存放在 compaction_level_compaction_score_,下次 Compaction 在会在计分最大的层里进行
  • compaction_score_ >= 1 说明已经触发了压缩的条件,必须要做 Compaction。

7.2 compaction_level_ 和 compaction_score_ 算法 #

void VersionSet::Finalize(Version* v) {
    for (int level = 0; level < config::kNumLevels - 1; level++) {
        /**
         * 这里之所以对 level==0 进行特殊处理,主要基于一下两点
         * 1. 使用更大的 write buffer 的情况下,这样就不会做太多的 L0->L1 的合并
         * 2. level==0 层文件每次读的时候都要做归并,作者不希望 level==0 层有太多文件
         */
        if (level == 0) {
            // level==0 级文件数量 / level==0 级压缩阈值
            score = v->files_[level].size()/static_cast<double>(config::kL0_CompactionTrigger);
        } else {
            // level==i 级文件大小总和 / level==i 级大小阈值(大小阈值为 10^i MB)
            score = static_cast<double>(TotalFileSize(v->files_[level])) / MaxBytesForLevel(options_, level);
        }

        if (score > best_score) {
            best_level = level;
            best_score = score;
        }
    }

    v->compaction_level_ = best_level;
    v->compaction_score_ = best_score;
}

// VersionSet::PickCompaction() -> if(current_->compaction_score_ >= 1) { level = current_->compaction_level_; }

参考文献 #