数据库内核月报

数据库内核月报 - 2018 / 12

MySQL · RocksDB · 数据的读取(二)

Author: diaoliang

概述

上一篇文章中我们介绍了在RocksDB中如何在内存中查找对应的数据,这一篇我们将会详细介绍当内存中的数据不存在时,RocksDB如何在磁盘上查找对应的数据.

源码分析

依旧是从DBImpl::GetImpl开始,上一篇文章中我们分析这个函数只分析了Memtable相关的代码,这次我们来看当memtable没有查找到之后,RocksDB是如何处理的.我们可以看到当MemTable中没有找到对应的数据之后(包括删除),RocksDB将会进入对于sst的查找.

  if (!done) {
    PERF_TIMER_GUARD(get_from_output_files_time);
    sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
                     &range_del_agg, value_found, nullptr, nullptr, callback,
                     is_blob_index);
    RecordTick(stats_, MEMTABLE_MISS);
  }

从上面的代码我们可以看到直接从当前的version(sv->current)调用Get方法,因此接下来我们就来详细看这个函数。 这个函数简单来说就是根据所需要查找的key,然后选择对应的文件,这里每次会返回一个文件(key在sst的key范围内),然后循环查找.

先来看查找之前的初始化

  GetContext get_context(
      user_comparator(), merge_operator_, info_log_, db_statistics_,
      status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
      value, value_found, merge_context, range_del_agg, this->env_, seq,
      merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);

  // Pin blocks that we read to hold merge operands
  if (merge_operator_) {
    pinned_iters_mgr.StartPinning();
  }

  FilePicker fp(
      storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
      storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
      user_comparator(), internal_comparator());
  FdWithKeyRange* f = fp.GetNextFile();

第一个是GetContext结构,这个类只要是根据传递进来的文件元信息来查找对应的key.然后是FilePicker,这个类主要是根据传递进来的key来选择对应的文件.这里最重要就是GetNextFile这个函数,我们来看这个函数。

这个函数他会遍历所有的level,然后再遍历每个level的所有的文件,这里会对level 0的文件做一个特殊处理,这是因为只有level0的sst的range不是有序的,因此我们每次查找需要查找所有的文件,也就是会一个个的遍历.

而在非level0,我们只需要按照二分查找来得到对应的文件即可,如果二分查找不存在,那么我就需要进入下一个level进行查找.

FdWithKeyRange* GetNextFile() {
    while (!search_ended_) {  // Loops over different levels.
      while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
        // Loops over all files in current level.
        FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
        hit_file_level_ = curr_level_;
        is_hit_file_last_in_level_ =
            curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
        int cmp_largest = -1;
        if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
          // Check if key is within a file's range. If search left bound and
          // right bound point to the same find, we are sure key falls in
          // range.
          assert(
              curr_level_ == 0 ||
              curr_index_in_curr_level_ == start_index_in_curr_level_ ||
              user_comparator_->Compare(user_key_,
                ExtractUserKey(f->smallest_key)) <= 0);

          int cmp_smallest = user_comparator_->Compare(user_key_,
              ExtractUserKey(f->smallest_key));
          if (cmp_smallest >= 0) {
            cmp_largest = user_comparator_->Compare(user_key_,
                ExtractUserKey(f->largest_key));
          }

          // Setup file search bound for the next level based on the
          // comparison results
          if (curr_level_ > 0) {
            file_indexer_->GetNextLevelIndex(curr_level_,
                                            curr_index_in_curr_level_,
                                            cmp_smallest, cmp_largest,
                                            &search_left_bound_,
                                            &search_right_bound_);
          }
          // Key falls out of current file's range
          if (cmp_smallest < 0 || cmp_largest > 0) {
            if (curr_level_ == 0) {
              ++curr_index_in_curr_level_;
              continue;
            } else {
              // Search next level.
              break;
            }
          }
        }
        returned_file_level_ = curr_level_;
        if (curr_level_ > 0 && cmp_largest < 0) {
          // No more files to search in this level.
          search_ended_ = !PrepareNextLevel();
        } else {
          ++curr_index_in_curr_level_;
        }
        return f;
      }
      // Start searching next level.
      search_ended_ = !PrepareNextLevel();
    }
    // Search ended.
    return nullptr;
  }

这里RocksDB使用了一个技巧用来加快二分查找的速度,每次更新sst的时候,RocksDB都会调用FileIndexer::UpdateIndex来更新这样的一个结构,这个结构就是FileIndexer,它主要是用来保存每一个level和level+1的key范围的关联信息,这样当我们在level查找的时候,如果没有查找到信息,那么我们将会迅速得到下一个level需要查找的文件范围.每一个key来进行比较总会有三种情况:

那么我们只需要在初始化索引的时候能够得到当前的sst在下一个level中的位置,就可以根据上面三种类型来确定下一个level我们需要进行二分查找的文件范围.在RocksDB中定义了下面三个值.

    // Point to a left most file in a lower level that may contain a key,
    // which compares greater than smallest of a FileMetaData (upper level)
    int32_t smallest_lb;
    // Point to a left most file in a lower level that may contain a key,
    // which compares greater than largest of a FileMetaData (upper level)
    int32_t largest_lb;
    // Point to a right most file in a lower level that may contain a key,
    // which compares smaller than smallest of a FileMetaData (upper level)
    int32_t smallest_rb;
    // Point to a right most file in a lower level that may contain a key,
    // which compares smaller than largest of a FileMetaData (upper level)
    int32_t largest_rb;

我们通过例子来解释这三个值.假设有下面两个level,4个sst.那么初始化的时候,对应的level1的这个sst对应的四个值分别为. smallest_lb=1;largest_lb=2;smallest_rb=1;largest_rb=2;

        level 1:              [50 - 60]
        level 2:        [1 - 40], [45 - 55], [58 - 80]

此时如果我们查找一个key为49,然后第一次比较,也就是key < level1.sst->smallest,那么我们将会知道我们需要在0和smallest_rb之间来查找,也就是0和1.假设我们查找key是55,也就是 level1.sst->smallest < key < level1.test.largest,此时我们在level2将需要在smallest_rb和largest_rb之间.这里可以看到其实就是计算一个重合的区间。

来看RocksDB如何根据当前level的比较结果来计算下一个level需要二分查找的文件范围:

void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index,
                                    const int cmp_smallest,
                                    const int cmp_largest, int32_t* left_bound,
                                    int32_t* right_bound) const {
  assert(level > 0);

  const IndexUnit* index_units = next_level_index_[level].index_units;
  const auto& index = index_units[file_index];

  if (cmp_smallest < 0) {
    *left_bound = (level > 0 && file_index > 0)
                      ? index_units[file_index - 1].largest_lb
                      : 0;
    *right_bound = index.smallest_rb;
  } else if (cmp_smallest == 0) {
    *left_bound = index.smallest_lb;
    *right_bound = index.smallest_rb;
  } else if (cmp_smallest > 0 && cmp_largest < 0) {
    *left_bound = index.smallest_lb;
    *right_bound = index.largest_rb;
  } else if (cmp_largest == 0) {
    *left_bound = index.largest_lb;
    *right_bound = index.largest_rb;
  } else if (cmp_largest > 0) {
    *left_bound = index.largest_lb;
    *right_bound = level_rb_[level + 1];
  } else {
    assert(false);
  }
}

看完上面这些我们继续来看RocksDB对于文件的查找.这里所有对于key的查找都是在table_cache_->Get中.这里我们暂且略过这个函数的实现,最后我们再来详细分析这个函数.

 while (f != nullptr) {
................................

    *status = table_cache_->Get(
        read_options, *internal_comparator(), f->fd, ikey, &get_context,
        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
                        fp.IsHitFileLastInLevel()),
        fp.GetCurrentLevel());
    // TODO: examine the behavior for corrupted key
    if (!status->ok()) {
      return;
    }
.......................
 }

当table_cache_->Get返回之后,我们需要根据get_context来判断返回的结果


switch (get_context.State()) {
      case GetContext::kNotFound:
        // Keep searching in other files
        break;
      case GetContext::kMerge:
        break;
      case GetContext::kFound:
        if (fp.GetHitFileLevel() == 0) {
          RecordTick(db_statistics_, GET_HIT_L0);
        } else if (fp.GetHitFileLevel() == 1) {
          RecordTick(db_statistics_, GET_HIT_L1);
        } else if (fp.GetHitFileLevel() >= 2) {
          RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
        }
        return;
      case GetContext::kDeleted:
        // Use empty error message for speed
        *status = Status::NotFound();
        return;
      case GetContext::kCorrupt:
        *status = Status::Corruption("corrupted key for ", user_key);
        return;
      case GetContext::kBlobIndex:
        ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
        *status = Status::NotSupported(
            "Encounter unexpected blob index. Please open DB with "
            "rocksdb::blob_db::BlobDB instead.");
        return;
    }

如果没有发现对应的值则进入下一次文件查找

f = fp.GetNextFile();

最后我们来详细分析最核心的函数TableCache::Get,这个函数不仅仅是返回对应的查找结果,并且还会cache相应的文件信息,并且如果row_cache打开,他还会做row cache.这里row cache就是对当前的所需要查找的key在当前sst中对应的value进行cache.

先来看如果打开了row cache,RocksDB将会如何处理,首先它会计算row cache的key.通过下面的代码我们可以看到row cache的key就是fd_number+seq_no+user_key.

    uint64_t fd_number = fd.GetNumber();
    auto user_key = ExtractUserKey(k);
    // We use the user key as cache key instead of the internal key,
    // otherwise the whole cache would be invalidated every time the
    // sequence key increases. However, to support caching snapshot
    // reads, we append the sequence number (incremented by 1 to
    // distinguish from 0) only in this case.
    uint64_t seq_no =
        options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);

    // Compute row cache key.
    row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
                             row_cache_id_.size());
    AppendVarint64(&row_cache_key, fd_number);
    AppendVarint64(&row_cache_key, seq_no);
    row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
                             user_key.size());

然后就是在row cache中进行一次查找.如果有对应的值则直接返回结果,否则则将会在对应的sst读取传递进来的key.

    if (auto row_handle =
            ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
      Cleanable value_pinner;
      auto release_cache_entry_func = [](void* cache_to_clean,
                                         void* cache_handle) {
        ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
      };
      auto found_row_cache_entry = static_cast<const std::string*>(
          ioptions_.row_cache->Value(row_handle));
....................................
      done = true;       
    } else {
      // Not found, setting up the replay log.
      RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
      row_cache_entry = &row_cache_entry_buffer;
    }

接下来就是需要在对应的sst文件读取对应的key的值,这里可以看到每一个fd都包含了一个TableReader的结构,这个结构就是用来保存文件的内容.而我们的table_cache主要就是缓存这个结构.

 Status s;
  TableReader* t = fd.table_reader;
  Cache::Handle* handle = nullptr;
  if (!done && s.ok()) {
    if (t == nullptr) {
      s = FindTable(env_options_, internal_comparator, fd, &handle,
                    options.read_tier == kBlockCacheTier /* no_io */,
                    true /* record_read_stats */, file_read_hist, skip_filters,
                    level);
      if (s.ok()) {
        t = GetTableReaderFromHandle(handle);
      }
    }
   ..........................
  }

上面的代码会直接调用TableCache::FindTable, 这个函数主要是用来实现对应tablereader的读取以及row cache.

Status TableCache::FindTable(const EnvOptions& env_options,
                             const InternalKeyComparator& internal_comparator,
                             const FileDescriptor& fd, Cache::Handle** handle,
                             const bool no_io, bool record_read_stats,
                             HistogramImpl* file_read_hist, bool skip_filters,
                             int level,
                             bool prefetch_index_and_filter_in_cache) {
...................................................
  if (*handle == nullptr) {
    if (no_io) {  // Don't do IO and return a not-found status
      return Status::Incomplete("Table not found in table_cache, no_io is set");
    }
    unique_ptr<TableReader> table_reader;
    s = GetTableReader(env_options, internal_comparator, fd,
                       false /* sequential mode */, 0 /* readahead */,
                       record_read_stats, file_read_hist, &table_reader,
                       skip_filters, level, prefetch_index_and_filter_in_cache);
    if (!s.ok()) {
      assert(table_reader == nullptr);
      RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
      // We do not cache error results so that if the error is transient,
      // or somebody repairs the file, we recover automatically.
    } else {
      s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
                         handle);
      if (s.ok()) {
        // Release ownership of table reader.
        table_reader.release();
      }
    }
  }
  return s;
}

通过上面的代码可以看到实现很简单,就是一般的cache逻辑,读取然后判断是否存在,不存在则插入到cache. 上面的函数会调用 TableCache::GetTableReader,我们来简单看下这个函数.

Status TableCache::GetTableReader(
    const EnvOptions& env_options,
    const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
    bool sequential_mode, size_t readahead, bool record_read_stats,
    HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
    bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
    bool for_compaction) {
..........................................
  if (s.ok()) {
...............................................    
    s = ioptions_.table_factory->NewTableReader(
        TableReaderOptions(ioptions_, env_options, internal_comparator,
                           skip_filters, level),
        std::move(file_reader), fd.GetFileSize(), table_reader,
        prefetch_index_and_filter_in_cache);
    TEST_SYNC_POINT("TableCache::GetTableReader:0");
  }
  return s;
} 

可以看到最关键的调用就是调用ioptions_.table_factory->NewTableReader, 这里RocksDB会根据我们配置的不同的sst格式来调用不同的reader,而在RocksDB中默认的格式是基于block.

// Create default block based table factory.
extern TableFactory* NewBlockBasedTableFactory(
    const BlockBasedTableOptions& table_options = BlockBasedTableOptions());

这里我们就不详细分析sst的文件格式了,以后我们会来详细对比这几个文件格式的优劣.这里我们只需要知道最终缓存的tablereader就是一个BlockBasedTable对象(假设使用了基于block的sst format).

当读取完毕TableReader之后,RocksDB就需要从sst文件中get key了,也就是最终的key查找方式是在每个sst format class的Get方法中实现的。

    if (s.ok()) {
      get_context->SetReplayLog(row_cache_entry);  // nullptr if no cache.
      s = t->Get(options, k, get_context, skip_filters);
      get_context->SetReplayLog(nullptr);
    }

和上面一样,这里的get也就是对应的sst format的get.

最后如果查找到key,则开始缓存对应的kv到row_cache.

    size_t charge =
        row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
    void* row_ptr = new std::string(std::move(*row_cache_entry));
    ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
                                &DeleteEntry<std::string>);

这里整个读取流程我们都分析完毕了,不过这里略过了merge,delete range以及不同sst format如何组织以及读取内容,后续我们会详细分析这些略过的内容.