Author: diaoliang
在RocksDB中,将MemTable刷新到磁盘之后,将会有很多sstable,而这些sstable则是可能包含了相同的key的不同时间的值,这样子就会导致两个问题:
而compact就是用来解决上面两个问题的,简单来说compact就是读取几个sstable然后合并为一个(或者多个)sstable. 而什么时候合并,合并的时候如何来挑选sstable,这个就是compcation strategy.一般来说compact strategy的目的都是为了更低的amplification:
而在RockDB中实现了多种compact strategy,不同的strategy有不同的侧重,这里我们只分析默认的strategy, 那就是leveled-N compaction.
在Leveled compaction中,所有的SSTables被分为很多levels(level0/1/2/3…).
先来看在RocksDB中是什么时候会引起compact.在RocksDB中所有的compact都是在后台线程中进行的,这个线程就是BGWorkCompaction.这个线程只有在两种情况下被调用,一个是 手动compact(RunManualCompaction),一个就是自动(MaybeScheduleFlushOrCompaction),我们主要来看自动的compact,而MaybeScheduleFlushOrCompaction这个函数我们在之前介绍flush的时候已经介绍过了,简单来说就是会在切换WAL(SwitchWAL)或者writebuffer满的时候(HandleWriteBufferFull)被调用.
我们来看在MaybeScheduleFlushOrCompaction中compact的调用.这里可以看到RocksDB中后台运行的compact会有一个限制(max_compactions).而我们可以看到这里还有一个变量 unscheduled_compactions_,这个变量表示需要被compact的columnfamily的队列长度.
while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
}
类似flush的逻辑,compact的时候RocksDB也有一个队列叫做DBImpl::compaction_queue_.
std::deque<ColumnFamilyData*> compaction_queue_;
然后我们来看这个队列何时被更新,其中unscheduled_compactions_和队列的更新是同步的,因此只有compaction_queue_更新之后,调用compact后台线程才会进入compact处理.
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
上面的核心函数是NeedsCompaction,通过这个函数来判断是否有sst需要被compact,因此接下来我们就来详细分析这个函数.当满足下列几个条件之一就将会更新compact队列
bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
if (!vstorage->ExpiredTtlFiles().empty()) {
return true;
}
if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
return true;
}
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
if (vstorage->CompactionScore(i) >= 1) {
return true;
}
}
return false;
}
因此接下来我们来分析最核心的CompactionScore,这里将会涉及到两个变量,这两个变量分别保存了level以及每个level所对应的score(这里score越高表示compact优先级越高),而score小于1则表示不需要compact.
std::vector<double> compaction_score_;
std::vector<int> compaction_level_;
这两个vector是在VersionStorageInfo::ComputeCompactionScore中被更新,因此我们来看这个函数,这个函数中会对level-0和其他的level区别处理。 首先来看level-0的处理:
之所以要做第三步,主要还是为了防止level-0的文件size过大,那么当它需要compact的时候有可能会需要和level-1 compact,那么此时就有可能会有一个很大的compact.
if (level == 0) {
int num_sorted_runs = 0;
uint64_t total_size = 0;
for (auto* f : files_[level]) {
if (!f->being_compacted) {
total_size += f->compensated_file_size;
num_sorted_runs++;
}
}
.........................
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
score = std::max(
score, static_cast<double>(total_size) /
mutable_cf_options.max_bytes_for_level_base);
}
}
然后是非level-0的处理,这里也是计算level的文件大小然后再除以MaxBytesForLevel,然后得到当前level的score.
uint64_t level_bytes_no_compacting = 0;
for (auto f : files_[level]) {
if (!f->being_compacted) {
level_bytes_no_compacting += f->compensated_file_size;
}
}
score = static_cast<double>(level_bytes_no_compacting) /
MaxBytesForLevel(level);
上面我们看到有一个MaxBytesForLevel,这个函数的作用就是得到当前level的最大的文件大小.而这个函数实现也很简单.
uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < static_cast<int>(level_max_bytes_.size()));
return level_max_bytes_[level];
}
可以看到核心就是level_max_bytes_这个数组,接下来我们就来看这个数组是在哪里被初始化的。level_max_bytes这个数组是在VersionStorageInfo::CalculateBaseBytes 这个函数中被初始化,这里RocksDB有一个option叫做level_compaction_dynamic_level_bytes,这个配置如果被设置,那么level_max_bytes将会这样 设置(这里我们只关注level):
Target_Size(Ln+1) = Target_Size(Ln) * max_bytes_for_level_multiplier * max_bytes_for_level_multiplier_additional[n].
举个例子,如果max_bytes_for_level_base=1024,max_bytes_for_level_multiplier=10,然后max_bytes_for_level_multiplier_additional未设置,那么L1, L2,L3的大小限制分别为1024,10240,102400.
下面是对应代码.
if (!ioptions.level_compaction_dynamic_level_bytes) {
base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
// Calculate for static bytes base case
for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
level_max_bytes_[i] = options.max_bytes_for_level_base;
} else if (i > 1) {
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1],
options.max_bytes_for_level_multiplier),
options.MaxBytesMultiplerAdditional(i - 1));
} else {
level_max_bytes_[i] = options.max_bytes_for_level_base;
}
}
}
然后我们来看如果设置了level_compaction_dynamic_level_bytes会如何来计算.如果设置了dynamic,那么就说明每次计算出来的每个level的最大值都是不一样的, 首先我们要知道调用CalculateBaseBytes是在每次创建version的时候。因此他是这样计算的.最大的level(num_levels -1 )的大小限制是不计入计算的,然后就是这样计算.
Target_Size(Ln-1) = Target_Size(Ln) / max_bytes_for_level_multiplier
举个例子,假设调用CalculateBaseBytes的时候,max_bytes_for_level_base是1G,然后num_levels = 6,然后当前最大的level的大小为256G,那么从L1-L6的大小是 0, 0, 0.276GB, 2.76GB, 27.6GB 和 276GB.
首先计算第一个非空的level.
for (int i = 1; i < num_levels_; i++) {
uint64_t total_size = 0;
for (const auto& f : files_[i]) {
total_size += f->fd.GetFileSize();
}
if (total_size > 0 && first_non_empty_level == -1) {
first_non_empty_level = i;
}
if (total_size > max_level_size) {
max_level_size = total_size;
}
}
得到最小的那个非0的level的size.
uint64_t base_bytes_max = options.max_bytes_for_level_base;
uint64_t base_bytes_min = static_cast<uint64_t>(
base_bytes_max / options.max_bytes_for_level_multiplier);
// Try whether we can make last level's target size to be max_level_size
uint64_t cur_level_size = max_level_size;
for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
// Round up after dividing
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
}
找到base_level_size,一般来说也就是cur_level_size.
// Find base level (where L0 data is compacted to).
base_level_ = first_non_empty_level;
while (base_level_ > 1 && cur_level_size > base_bytes_max) {
--base_level_;
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
}
if (cur_level_size > base_bytes_max) {
// Even L1 will be too large
assert(base_level_ == 1);
base_level_size = base_bytes_max;
} else {
base_level_size = cur_level_size;
}
然后给level_max_bytes_ 赋值
uint64_t level_size = base_level_size;
for (int i = base_level_; i < num_levels_; i++) {
if (i > base_level_) {
level_size = MultiplyCheckOverflow(
level_size, options.max_bytes_for_level_multiplier);
}
// Don't set any level below base_bytes_max. Otherwise, the LSM can
// assume an hourglass shape where L1+ sizes are smaller than L0. This
// causes compaction scoring, which depends on level sizes, to favor L1+
// at the expense of L0, which may fill up and stall.
level_max_bytes_[i] = std::max(level_size, base_bytes_max);
}
}
分析完毕何时会触发Compact,那么我们接下来来分析如何Compact.其中Compact的所有操作都在DBImpl::BackgroundCompaction中进行,因此接下来我们来分析 这个函数. 首先是从compaction_queue_队列中读取第一个需要compact的column family.
// cfd is referenced here
auto cfd = PopFirstFromCompactionQueue();
// We unreference here because the following code will take a Ref() on
// this cfd if it is going to use it (Compaction class holds a
// reference).
// This will all happen under a mutex so we don't have to be afraid of
// somebody else deleting it.
if (cfd->Unref()) {
delete cfd;
// This was the last reference of the column family, so no need to
// compact.
return Status::OK();
}
然后就是选取当前CF中所需要compact的内容.
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
从上面可以看到PickCompaction这个函数,而这个函数会根据设置的不同的Compact策略调用不同的方法,这里我们只看默认的LevelCompact的对应函数.
Compaction* LevelCompactionBuilder::PickCompaction() {
// Pick up the first file to start compaction. It may have been extended
// to a clean cut.
SetupInitialFiles();
if (start_level_inputs_.empty()) {
return nullptr;
}
assert(start_level_ >= 0 && output_level_ >= 0);
// If it is a L0 -> base level compaction, we need to set up other L0
// files if needed.
if (!SetupOtherL0FilesIfNeeded()) {
return nullptr;
}
// Pick files in the output level and expand more files in the start level
// if needed.
if (!SetupOtherInputsIfNeeded()) {
return nullptr;
}
// Form a compaction object containing the files we picked.
Compaction* c = GetCompaction();
TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
return c;
}
这里PickCompaction分别调用了三个主要的函数.
先来看SetupInitialFiles,这个函数他会遍历所有的level,然后来选择对应需要compact的input和output.
这里可看到,他会从之前计算好的的compact信息中得到对应的score.
void LevelCompactionBuilder::SetupInitialFiles() {
// Find the compactions by size on all levels.
bool skipped_l0_to_base = false;
for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
start_level_score_ = vstorage_->CompactionScore(i);
start_level_ = vstorage_->CompactionScoreLevel(i);
assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
................................................................
}
只有当score大于一才有必要进行compact的处理(所有操作都在上面的循环中).这里可以看到如果是level0的话,那么output_level 则是vstorage_->base_level(),否则就是level+1. 这里base_level()可以认为就是level1或者是最小的非空的level(之前CalculateBaseBytes中计算).
if (start_level_score_ >= 1) {
if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
// If L0->base_level compaction is pending, don't schedule further
// compaction from base level. Otherwise L0->base_level compaction
// may starve.
continue;
}
output_level_ =
(start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
if (PickFileToCompact()) {
// found the compaction!
if (start_level_ == 0) {
// L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
compaction_reason_ = CompactionReason::kLevelL0FilesNum;
} else {
// L1+ score = `Level files size` / `MaxBytesForLevel`
compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
}
break;
} else {
// didn't find the compaction, clear the inputs
......................................................
}
}
}
上面的代码中我们可以看到最终是通过PickFileToCompact来选择input以及output文件.因此我们接下来就来分这个函数.
首先是得到当前level(start_level_)的未compacted的最大大小的文件
// Pick the largest file in this level that is not already
// being compacted
const std::vector<int>& file_size =
vstorage_->FilesByCompactionPri(start_level_);
const std::vector<FileMetaData*>& level_files =
vstorage_->LevelFiles(start_level_);
紧接着就是这个函数最核心的功能了,它会开始遍历当前的输入level的所有待compact的文件,然后选择一些合适的文件然后compact到下一个level.
unsigned int cmp_idx;
for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
cmp_idx < file_size.size(); cmp_idx++) {
..........................................
}
然后我们来详细分析上面循环中所做的事情 首先选择好文件之后,将会扩展当前文件的key的范围,得到一个”clean cut”的范围, 这里”clean cut”是这个意思,假设我们有五个文件他们的key range分别为:
f1[a1 a2] f2[a3 a4] f3[a4 a6] f4[a6 a7] f5[a8 a9]
如果我们第一次选择了f3,那么我们通过clean cut,则将还会选择f2,f4,因为他们都是连续的. 选择好之后,会再做一次判断,这次是判断是否正在compact的out_level的文件范围是否和我们选择好的文件的key有重合,如果有,则跳过这个文件. 这里之所以会有这个判断,主要原因还是因为compact是会并行的执行的.
int index = file_size[cmp_idx];
auto* f = level_files[index];
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
continue;
}
start_level_inputs_.files.push_back(f);
start_level_inputs_.level = start_level_;
if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_) ||
compaction_picker_->FilesRangeOverlapWithCompaction(
{start_level_inputs_}, output_level_)) {
// A locked (pending compaction) input-level file was pulled in due to
// user-key overlap.
start_level_inputs_.clear();
continue;
}
选择好输入文件之后,接下来就是选择输出level中需要一起被compact的文件(output_level_inputs). 实现也是比较简单,就是从输出level的所有文件中找到是否有和上面选择好的input中有重合的文件,如果有,那么则需要一起进行compact.
InternalKey smallest, largest;
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
CompactionInputFiles output_level_inputs;
output_level_inputs.level = output_level_;
vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
&output_level_inputs.files);
if (!output_level_inputs.empty() &&
!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&output_level_inputs)) {
start_level_inputs_.clear();
continue;
}
base_index_ = index;
break;
继续分析PickCompaction,我们知道在RocksDB中level-0会比较特殊,那是因为只有level-0中的文件是无序的,而在上面的操作中, 我们是假设在非level-0,因此接下来我们需要处理level-0的情况,这个函数就是SetupOtherL0FilesIfNeeded.
这里如果start_level_为0,也就是level-0的话,才会进行下面的处理,就是从level-0中得到所有的重合key的文件,然后加入到start_level_inputs中.
if (start_level_ == 0 && output_level_ != 0) {
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
assert(compaction_picker_->level0_compactions_in_progress()->empty());
InternalKey smallest, largest;
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
start_level_inputs_.files.clear();
vstorage_->GetOverlappingInputs(0, &smallest, &largest,
&start_level_inputs_.files);
// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
if (compaction_picker_->IsRangeInCompaction(
vstorage_, &smallest, &largest, output_level_, &parent_index_)) {
return false;
}
}
假设start_level_inputs被扩展了,那么对应的output也需要被扩展,因为非level0的其他的level的文件key都是不会overlap的. 那么此时就是会调用SetupOtherInputsIfNeeded.
if (output_level_ != 0) {
output_level_inputs_.level = output_level_;
if (!compaction_picker_->SetupOtherInputs(
cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
&output_level_inputs_, &parent_index_, base_index_)) {
return false;
}
compaction_inputs_.push_back(start_level_inputs_);
if (!output_level_inputs_.empty()) {
compaction_inputs_.push_back(output_level_inputs_);
}
// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
output_level_)) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
}
compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
output_level_inputs_, &grandparents_);
}
最后就是构造一个compact然后返回.
// Form a compaction object containing the files we picked.
Compaction* c = GetCompaction();
TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
return c;
最后再回到BackgroundCompaction中,这里就是在得到需要compact的文件之后,进行具体的compact. 这里我们可以看到核心的数据结构就是CompactionJob,每一次的compact都是一个job,最终对于文件的compact都是在 CompactionJob::run中实现.
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
&mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats);
compaction_job.Prepare();
mutex_.Unlock();
compaction_job.Run();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options(), FlushReason::kAutoCompaction);
}
*made_progress = true;
在RocksDB中,Compact是会多线程并发的执行,而这里怎样并发,并发多少线程都是在CompactionJob中实现的,简单来说,当你的compact的文件range不重合的话,那么都是可以并发执行的。
我们先来看CompactionJob::Prepare函数,在这个函数中主要是做一些执行前的准备工作,首先是取得对应的compact的边界,这里每一个需要并发的compact都被抽象为一个sub compaction.因此在GenSubcompactionBoundaries会解析到对应的sub compaction以及边界.解析完毕之后,则将会把对应的信息全部加入sub_compact_states中。
void CompactionJob::Prepare() {
..........................
if (c->ShouldFormSubcompactions()) {
const uint64_t start_micros = env_->NowMicros();
GenSubcompactionBoundaries();
MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
env_->NowMicros() - start_micros);
assert(sizes_.size() == boundaries_.size() + 1);
for (size_t i = 0; i <= boundaries_.size(); i++) {
Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
}
MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
}
......................................
}
因此我们来详细分析GenSubcompactionBoundaries,这个函数比较长,我们来分开分析,首先是遍历所有的需要compact的level,然后取得每一个level的边界(也就是最大最小key)。
void CompactionJob::GenSubcompactionBoundaries() {
...........................
// Add the starting and/or ending key of certain input files as a potential
// boundary
for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
int lvl = c->level(lvl_idx);
if (lvl >= start_lvl && lvl <= out_lvl) {
const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
size_t num_files = flevel->num_files;
.....................
if (lvl == 0) {
// For level 0 add the starting and ending key of each file since the
// files may have greatly differing key ranges (not range-partitioned)
for (size_t i = 0; i < num_files; i++) {
bounds.emplace_back(flevel->files[i].smallest_key);
bounds.emplace_back(flevel->files[i].largest_key);
}
} else {
// For all other levels add the smallest/largest key in the level to
// encompass the range covered by that level
bounds.emplace_back(flevel->files[0].smallest_key);
bounds.emplace_back(flevel->files[num_files - 1].largest_key);
if (lvl == out_lvl) {
// For the last level include the starting keys of all files since
// the last level is the largest and probably has the widest key
// range. Since it's range partitioned, the ending key of one file
// and the starting key of the next are very close (or identical).
for (size_t i = 1; i < num_files; i++) {
bounds.emplace_back(flevel->files[i].smallest_key);
}
}
}
}
}
......................
然后则是对取得的bounds进行排序以及去重.
std::sort(bounds.begin(), bounds.end(),
[cfd_comparator](const Slice& a, const Slice& b) -> bool {
return cfd_comparator->Compare(ExtractUserKey(a),
ExtractUserKey(b)) < 0;
});
// Remove duplicated entries from bounds
bounds.erase(
std::unique(bounds.begin(), bounds.end(),
[cfd_comparator](const Slice& a, const Slice& b) -> bool {
return cfd_comparator->Compare(ExtractUserKey(a),
ExtractUserKey(b)) == 0;
}),
bounds.end());
接近着就来计算理想情况下所需要的subcompactions的个数以及输出文件的个数.
// Group the ranges into subcompactions
const double min_file_fill_percent = 4.0 / 5;
int base_level = v->storage_info()->base_level();
uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
sum / min_file_fill_percent /
MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
c->immutable_cf_options()->compaction_style, base_level,
c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
uint64_t subcompactions =
std::min({static_cast<uint64_t>(ranges.size()),
static_cast<uint64_t>(c->max_subcompactions()),
max_output_files});
最后更新boundaries_,这里会根据根据文件的大小,通过平均的size,来吧所有的range分为几份,最终这些都会保存在boundaries_中.
if (subcompactions > 1) {
double mean = sum * 1.0 / subcompactions;
// Greedily add ranges to the subcompaction until the sum of the ranges'
// sizes becomes >= the expected mean size of a subcompaction
sum = 0;
for (size_t i = 0; i < ranges.size() - 1; i++) {
sum += ranges[i].size;
if (subcompactions == 1) {
// If there's only one left to schedule then it goes to the end so no
// need to put an end boundary
continue;
}
if (sum >= mean) {
boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
sizes_.emplace_back(sum);
subcompactions--;
sum = 0;
}
}
sizes_.emplace_back(sum + ranges.back().size);
}
然后我们来看CompactJob::Run的实现,在这个函数中,就是会遍历所有的sub_compact,然后启动线程来进行对应的compact工作,最后等到所有的线程完成,然后退出.
// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
thread_pool.reserve(num_threads - 1);
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
&compact_->sub_compact_states[i]);
}
// Always schedule the first subcompaction (whether or not there are also
// others) in the current thread to be efficient with resources
ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
// Wait for all other threads (if there are any) to finish execution
for (auto& thread : thread_pool) {
thread.join();
}
if (output_directory_) {
output_directory_->Fsync();
}
最后我们可以看到最终compact工作是在CompactionJob::ProcessKeyValueCompaction是实现的,这个函数我们暂时就不分析了,我们只需要知道所有的compact工作都是在这个函数中执行的.