数据库内核月报

数据库内核月报 - 2018 / 05

MySQL · RocksDB · MANIFEST文件介绍

Author: diaoliang

概述

在RocksDB中MANIFEST保存了存储引擎的内部的一些状态元数据,简单来说当系统异常重启,或者程序异常被退出之后,RocksDB需要有一种机制能够恢复到一个一致性的状态, 而这个一致性的状态就是靠MANIFEST来保证的.

MANIFEST在RocksDB中是一个单独的文件,而这个文件所保存的数据基本是来自于VersionEdit这个结构.

MANIFEST包含了两个文件,一个log文件一个包含最新MANIFEST文件名的文件,Manifest的log文件名是这样 MANIFEST-(seqnumber),这个seq会一直增长.只有当 超过了指定的大小之后,MANIFEST会刷新一个新的文件,当新的文件刷新到磁盘(并且文件名更新)之后,老的文件会被删除掉.这里可以认为每一次MANIFEST的更新都代表一次snapshot.

下面就是MANIFEST的基本文件组成:

MANIFEST = { CURRENT, MANIFEST-<seq-no>* } 
CURRENT = File pointer to the latest manifest log
MANIFEST-<seq no> = Contains snapshot of RocksDB state and subsequent modifications

在RocksDB中任意时间存储引擎的状态都会保存为一个Version(也就是SST的集合),而每次对Version的修改都是一个VersionEdit,而最终这些VersionEdit就是 组成manifest-log文件的内容.

下面就是MANIFEST的log文件的基本构成:

version-edit      = Any RocksDB state change
version           = { version-edit* }
manifest-log-file = { version, version-edit* }
                  = { version-edit* }

实现

整个MANIFEST涉及到三个数据结构分别是VersionEdit/Version/VersionSet,其中前两个上面已经有介绍,而最后一个VersionSet顾名思义表示一堆Version的集合,其实就是 记录了各个版本的信息用来管理整个Version.

class VersionSet {
 public:
  VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
             const EnvOptions& env_options, Cache* table_cache,
             WriteBufferManager* write_buffer_manager,
             WriteController* write_controller);
  ~VersionSet();
.......................
 private:
  struct ManifestWriter;

  friend class Version;
.................................
  // Opened lazily
  unique_ptr<log::Writer> descriptor_log_;
  // generates a increasing version number for every new version
  uint64_t current_version_number_;

  // Queue of writers to the manifest file
  std::deque<ManifestWriter*> manifest_writers_;
..........................................
  

这里最关键的两个数据结构是descriptor_log_和manifest_writers_,前一个表示了当前manifest-log文件,后一个表示需要写入到manifest-log文件中的内容.

下面就是ManifestWriter的结构,可以看到其中包含了一个VersionEdit的数组,这个数组就是即将要写入到manifest-log文件中的内容.

// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
  Status status;
  bool done;
  InstrumentedCondVar cv;
  ColumnFamilyData* cfd;
  const autovector<VersionEdit*>& edit_list;

  explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
                          const autovector<VersionEdit*>& e)
      : done(false), cv(mu), cfd(_cfd), edit_list(e) {}
};

然后我们来看RocksDB如何来创建以及写入文件,下面所有的代码都是包含在VersionSet::LogAndApply这个函数中.

首先在每次LogAndApply的时候都会创建一个新的ManifesWriter加入到manifest_writers_队列中.这里只有当之前保存在队列中 的所有Writer都写入完毕之后才会加入到队列,否则就会等待.

  // queue our request
  ManifestWriter w(mu, column_family_data, edit_list);
  manifest_writers_.push_back(&w);
  while (!w.done && &w != manifest_writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

接下来就是保存对应的数据到batch_edits中(manifest_writers_).

autovector<VersionEdit*> batch_edits;
....................................
 if (w.edit_list.front()->IsColumnFamilyManipulation()) {
    // no group commits for column family add or drop
    LogAndApplyCFHelper(w.edit_list.front());
    batch_edits.push_back(w.edit_list.front());
  } else {
    v = new Version(column_family_data, this, current_version_number_++);
........................................................
    for (const auto& writer : manifest_writers_) {
      if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
          writer->cfd->GetID() != column_family_data->GetID()) {
        break;
      }
      last_writer = writer;
      for (const auto& edit : writer->edit_list) {
...........................................
        batch_edits.push_back(edit);
      }
    }
    builder->SaveTo(v->storage_info());
  }

然后就是创建新的manifest-log文件的逻辑.这里可以看到要么是第一次进入,要么文件大小大于option对应的值才会创建新的文件

if (!descriptor_log_ ||
      manifest_file_size_ > db_options_->max_manifest_file_size) {
    pending_manifest_file_number_ = NewFileNumber();
    batch_edits.back()->SetNextFile(next_file_number_.load());
    new_descriptor_log = true;
  } else {
    pending_manifest_file_number_ = manifest_file_number_;
  }

  if (new_descriptor_log) {
    // if we're writing out new snapshot make sure to persist max column family
    if (column_family_set_->GetMaxColumnFamily() > 0) {
      w.edit_list.front()->SetMaxColumnFamily(
          column_family_set_->GetMaxColumnFamily());
    }
  }

如果需要创建新的manifest-log文件,则开始构造对应的文件信息并创建文件.

if (new_descriptor_log) {
      // create manifest file
      ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
                     pending_manifest_file_number_);
      unique_ptr<WritableFile> descriptor_file;
      EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
      s = NewWritableFile(
          env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
          &descriptor_file, opt_env_opts);
      if (s.ok()) {
        descriptor_file->SetPreallocationBlockSize(
            db_options_->manifest_preallocation_size);

        unique_ptr<WritableFileWriter> file_writer(
            new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
        descriptor_log_.reset(
            new log::Writer(std::move(file_writer), 0, false));
        s = WriteSnapshot(descriptor_log_.get());
      }
    }

开始写入对应的VersionEdit的record到文件(最后我们会来看这个record的构成),这里看到写入完成后会调用Sync来刷新内容到磁盘,等这些操作都做完之后,则会更新Current文件也就是更新最新的manifest-log文件名到CURRENT文件中.

      for (auto& e : batch_edits) {
        std::string record;
        if (!e->EncodeTo(&record)) {
          s = Status::Corruption(
              "Unable to Encode VersionEdit:" + e->DebugString(true));
          break;
        }
        TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
                         rocksdb_kill_odds * REDUCE_ODDS2);
        s = descriptor_log_->AddRecord(record);
        if (!s.ok()) {
          break;
        }
      }
      if (s.ok()) {
        s = SyncManifest(env_, db_options_, descriptor_log_->file());
      }
.............................
    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
    if (s.ok() && new_descriptor_log) {
      s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
                         db_directory);
    }

CURRENT文件更新完毕之后,就可以删除老的mainfest文件了.

  // Append the old mainfest file to the obsolete_manifests_ list to be deleted
  // by PurgeObsoleteFiles later.
  if (s.ok() && new_descriptor_log) {
    obsolete_manifests_.emplace_back(
        DescriptorFileName("", manifest_file_number_));
  }

最后则是更新manifest_writers_队列,唤醒之前阻塞的内容.

  // wake up all the waiting writers
  while (true) {
    ManifestWriter* ready = manifest_writers_.front();
    manifest_writers_.pop_front();
    if (ready != &w) {
      ready->status = s;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }
  // Notify new head of write queue
  if (!manifest_writers_.empty()) {
    manifest_writers_.front()->cv.Signal();
  }

文件内容

具体的文件格式可以看RocksDB的wiki,这里我来介绍下对应的源码. 通过上面的分析我们可以看到最终是通过VersionEdit::EncodeTo来序列化数据,而VersionEdit主要包含了比如log_number/last_sequence_这些字段,这里还有一个比较关键的信息被序列化了,那就是FileMetaData,也就是SST文件的元信息.

struct FileMetaData {
  FileDescriptor fd;
  InternalKey smallest;            // Smallest internal key served by table
  InternalKey largest;             // Largest internal key served by table
  SequenceNumber smallest_seqno;   // The smallest seqno in this file
  SequenceNumber largest_seqno;    // The largest seqno in this file

.........................................
  // File size compensated by deletion entry.
  // This is updated in Version::UpdateAccumulatedStats() first time when the
  // file is created or loaded.  After it is updated (!= 0), it is immutable.
  uint64_t compensated_file_size;
  // These values can mutate, but they can only be read or written from
  // single-threaded LogAndApply thread
  uint64_t num_entries;            // the number of entries.
  uint64_t num_deletions;          // the number of deletion entries.
  uint64_t raw_key_size;           // total uncompressed key size.
  uint64_t raw_value_size;         // total uncompressed value size.

  int refs;  // Reference count

  bool being_compacted;        // Is this file undergoing compaction?
  bool init_stats_from_file;   // true if the data-entry stats of this file
                               // has initialized from file.

  bool marked_for_compaction;  // True if client asked us nicely to compact this
                               // file.
};

工具

这里查看MANIFEST文件依旧是使用ldb工具:

pagefault@god ~/tools/polarbase/data/.rocksdb $ ../../bin/ldb manifest_dump --path=./MANIFEST-000001
--------------- Column family "default"  (ID 0) --------------
log number: 13
comparator: <NO COMPARATOR>
--- level 0 --- version# 0 ---
 11:80860[' 
--------------- Column family "__system__"  (ID 1) --------------
log number: 24
comparator: RocksDB_SE_v3.10
--- level 0 --- version# 1 ---
 25:1094[' 
next_file_number 27 last_sequence 190  prev_log_number 0 max_column_family 1