数据库内核月报

数据库内核月报 - 2018 / 06

MySQL · RocksDB · Column Family介绍

Author: diaoliang

概述

在RocksDB 3.0中加入了Column Family特性,加入这个特性之后,每一个KV对都会关联一个Column Family,其中默认的Column Family是 “default”. Column Family主要是提供给RocksDB一个逻辑的分区.从实现上来看不同的Column Family共享WAL,而都有自己的Memtable和SST.这就意味着我们可以很 快速已经方便的设置不同的属性给不同的Column Family以及快速删除对应的Column Family.

主要API

首先是创建Column Family,这里注意我们可以通过两种方式来创建Column Family,一种是在Open DB的时候通过传递需要创建的Column Family,一种是当DB创建并打开之后, 通过直接的CreateColumnFamily来创建Column Family.

DB::Open(const DBOptions& db_options, const std::string& name, const std::vector<ColumnFamilyDescriptor>& column_families, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
DB::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle);

这里可以看到不管是哪一种方式最终都会返回一个ColumnFamilyHandle给调用者来使用.

然后就是删除Column Family的方式,这里很简单就是传递之前创建的ColumnFamilyHandle给RocksDB,然后用以删除.

DropColumnFamily(ColumnFamilyHandle* column_family);

实现

所有的Column Family都是通过一个叫做ColumnFamilySet的结构来管理的,而每一个Column Family都是一个ColumnFamilyData.

先来看ColumnFamilySet,这里可以看到它有两个数据结构来管理Column Family,分别是map(column_family_data_)以及一个双向链表(dummy_cfd_). 其中map用来保存Column Family名字和对应的id以及ColumnFamilyData的映射. 这里要注意在RocksDB内部是将没一个ColumnFamily的名字表示为一个uint32类型的ID(max_column_family_).也就是这个ID是一个简单的递增的数值.

class ColumnFamilySet {
 public:
  // ColumnFamilySet supports iteration
   public:
.................................

  ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
                                       Version* dummy_version,
                                       const ColumnFamilyOptions& options);
  iterator begin() { return iterator(dummy_cfd_->next_); }
  iterator end() { return iterator(dummy_cfd_); }
...............................
 private:
  friend class ColumnFamilyData;
  // helper function that gets called from cfd destructor
  // REQUIRES: DB mutex held
  void RemoveColumnFamily(ColumnFamilyData* cfd);

  // column_families_ and column_family_data_ need to be protected:
  // * when mutating both conditions have to be satisfied:
  // 1. DB mutex locked
  // 2. thread currently in single-threaded write thread
  // * when reading, at least one condition needs to be satisfied:
  // 1. DB mutex locked
  // 2. accessed from a single-threaded write thread
  std::unordered_map<std::string, uint32_t> column_families_;
  std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;

  uint32_t max_column_family_;
  ColumnFamilyData* dummy_cfd_;
  // We don't hold the refcount here, since default column family always exists
  // We are also not responsible for cleaning up default_cfd_cache_. This is
  // just a cache that makes common case (accessing default column family)
  // faster
  ColumnFamilyData* default_cfd_cache_;

..................................
};

然后来看ColumnFamilyData,这个数据结构就是用来表示一个ColumnFamily,保存了对应的信息,我们可以看到有ID/name以及当前ColumnFamily对应的所有的version(dummy_versions_). 其中这里的next_/prev_就是在ColumnFamilySet中用来表示所有ColumnFamily的双向链表.

class ColumnFamilyData {
 public:
  ~ColumnFamilyData();

  // thread-safe
  uint32_t GetID() const { return id_; }
  // thread-safe
  const std::string& GetName() const { return name_; }

  // Ref() can only be called from a context where the caller can guarantee
  // that ColumnFamilyData is alive (while holding a non-zero ref already,
  // holding a DB mutex, or as the leader in a write batch group).
  void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }

  // Unref decreases the reference count, but does not handle deletion
  // when the count goes to 0.  If this method returns true then the
  // caller should delete the instance immediately, or later, by calling
  // FreeDeadColumnFamilies().  Unref() can only be called while holding
  // a DB mutex, or during single-threaded recovery.
  bool Unref() {
    int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
    assert(old_refs > 0);
    return old_refs == 1;
  }
..............................

 private:
  friend class ColumnFamilySet;
  ColumnFamilyData(uint32_t id, const std::string& name,
                   Version* dummy_versions, Cache* table_cache,
                   WriteBufferManager* write_buffer_manager,
                   const ColumnFamilyOptions& options,
                   const ImmutableDBOptions& db_options,
                   const EnvOptions& env_options,
                   ColumnFamilySet* column_family_set);

  uint32_t id_;
  const std::string name_;
  Version* dummy_versions_;  // Head of circular doubly-linked list of versions.
  Version* current_;         // == dummy_versions->prev_
......................................................

  // Thread's local copy of SuperVersion pointer
  // This needs to be destructed before mutex_
  std::unique_ptr<ThreadLocalPtr> local_sv_;

  // pointers for a circular linked list. we use it to support iterations over
  // all column families that are alive (note: dropped column families can also
  // be alive as long as client holds a reference)
  ColumnFamilyData* next_;
  ColumnFamilyData* prev_;
...................................

  ColumnFamilySet* column_family_set_;
..................................
};

然后就是返回给调用者的ColumnFamilyHandleImpl结构,这个结构主要是封装了ColumnFamilyData.

// ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client
// is done using the column family
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
 public:
  // create while holding the mutex
  ColumnFamilyHandleImpl(
      ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
  // destroy without mutex
  virtual ~ColumnFamilyHandleImpl();
  virtual ColumnFamilyData* cfd() const { return cfd_; }
......................................

 private:
  ColumnFamilyData* cfd_;
  DBImpl* db_;
  InstrumentedMutex* mutex_;
};

接下来我们就来从ColumnFamily的创建以及删除来分析ColumnFamily的实现.我们从DBImpl::CreateColumnFamilyImpl开始.在这个函数 中首先就是通过调用GetNextColumnFamilyID来得到当前创建的ColumnFamily对应的ID(自增).然后再调用LogAndApply来对ColumnFamily 进行对应的操作.最后再返回封装好的ColumnFamilyHandle给调用者.

Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
                                      const std::string& column_family_name,
                                      ColumnFamilyHandle** handle) {
.......................................

  {
...................................
    VersionEdit edit;
    edit.AddColumnFamily(column_family_name);
    uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
    edit.SetColumnFamily(new_id);
    edit.SetLogNumber(logfile_number_);
    edit.SetComparatorName(cf_options.comparator->Name());

    // LogAndApply will both write the creation in MANIFEST and create
    // ColumnFamilyData object
    {  // write thread
      WriteThread::Writer w;
      write_thread_.EnterUnbatched(&w, &mutex_);
      // LogAndApply will both write the creation in MANIFEST and create
      // ColumnFamilyData object
      s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
                                 &mutex_, directories_.GetDbDir(), false,
                                 &cf_options);
      write_thread_.ExitUnbatched(&w);
    }
    if (s.ok()) {
........................................
      *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                     "Created column family [%s] (ID %u)",
                     column_family_name.c_str(), (unsigned)cfd->GetID());
    }
.............................................
  }  // InstrumentedMutexLock l(&mutex_)

.................................
  return s;
}

最终会在LogAndApply调用ColumnFamilySet的CreateColumnFamily函数(通过VersionSet::CreateColumnFamily),这个函数我们可看到主要做了下面三件事情

  1. 创建ColumnFamilyData对象
  2. 将新的创建好的CFD加入到双向链表
  3. 对应的Map数据结构更新数据
    // under a DB mutex AND write thread
    ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
     const std::string& name, uint32_t id, Version* dummy_versions,
     const ColumnFamilyOptions& options) {
      assert(column_families_.find(name) == column_families_.end());
      ColumnFamilyData* new_cfd = new ColumnFamilyData(
       id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
       *db_options_, env_options_, this);
      column_families_.insert({name, id});
      column_family_data_.insert({id, new_cfd});
      max_column_family_ = std::max(max_column_family_, id);
      // add to linked list
      new_cfd->next_ = dummy_cfd_;
      auto prev = dummy_cfd_->prev_;
      new_cfd->prev_ = prev;
      prev->next_ = new_cfd;
      dummy_cfd_->prev_ = new_cfd;
      if (id == 0) {
     default_cfd_cache_ = new_cfd;
      }
      return new_cfd;
    }
    

    然后来看如何删除ColumnFamily,这里所有的删除最终都会调用ColumnFamilySet::RemoveColumnFamily函数,这个函数是是从两个Map中删除对应的ColumnFamily. 这里或许我们要问了,为什么管理的双向链表不需要删除呢。这里原因是这样的,由于ColumnFamilyData是通过引用计数管理的,因此只有当所有的引用计数都清零之后, 才需要真正的函数ColumnFamilyData(也就是会从双向链表中删除数据).

    // under a DB mutex AND from a write thread
    void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
      auto cfd_iter = column_family_data_.find(cfd->GetID());
      assert(cfd_iter != column_family_data_.end());
      column_family_data_.erase(cfd_iter);
      column_families_.erase(cfd->GetName());
    }
    

因此我们来看ColumnFamilyData的析构函数.可以看到析构函数中会从双向链表中删除对应的数据,以及处理对应的Version(corrent_).

// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
  assert(refs_.load(std::memory_order_relaxed) == 0);
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
    column_family_set_->RemoveColumnFamily(this);
  }

  if (current_ != nullptr) {
    current_->Unref();
  }
..............................
}

最后我们来看一下在磁盘上ColumnFamily是如何保存的,首先需要明确的是ColumnFamily是保存在MANIFEST文件中的,信息的保存比较简单(之前的文章有介绍), 和MANIFEST中其他的信息没什么区别,因此这里我们主要来看数据的读取以及初始化,这里所有的操作都是包含在VersionSet::Recover中,我们来看这个函数.

函数主要的逻辑就是读取MANIFEST然后来再来将磁盘上读取的ColumnFamily的信息初始化(初始化ColumnFamilySet结构),可以看到这里相当于将之前的create/drop 的操作全部回放一遍,也就是会调用CreateColumnFamily/DropColumnFamily来将磁盘的信息初始化到内存.

while (reader.ReadRecord(&record, &scratch) && s.ok()) {
      VersionEdit edit;
      s = edit.DecodeFrom(record);
      if (!s.ok()) {
        break;
      }

      // Not found means that user didn't supply that column
      // family option AND we encountered column family add
      // record. Once we encounter column family drop record,
      // we will delete the column family from
      // column_families_not_found.
      bool cf_in_not_found =
          column_families_not_found.find(edit.column_family_) !=
          column_families_not_found.end();
      // in builders means that user supplied that column family
      // option AND that we encountered column family add record
      bool cf_in_builders =
          builders.find(edit.column_family_) != builders.end();

      // they can't both be true
      assert(!(cf_in_not_found && cf_in_builders));

      ColumnFamilyData* cfd = nullptr;

      if (edit.is_column_family_add_) {
        if (cf_in_builders || cf_in_not_found) {
          s = Status::Corruption(
              "Manifest adding the same column family twice");
          break;
        }
        auto cf_options = cf_name_to_options.find(edit.column_family_name_);
        if (cf_options == cf_name_to_options.end()) {
          column_families_not_found.insert(
              {edit.column_family_, edit.column_family_name_});
        } else {
          cfd = CreateColumnFamily(cf_options->second, &edit);
          cfd->set_initialized();
          builders.insert(
              {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
        }
      } else if (edit.is_column_family_drop_) {
        if (cf_in_builders) {
          auto builder = builders.find(edit.column_family_);
          assert(builder != builders.end());
          delete builder->second;
          builders.erase(builder);
          cfd = column_family_set_->GetColumnFamily(edit.column_family_);
          if (cfd->Unref()) {
            delete cfd;
            cfd = nullptr;
          } else {
            // who else can have reference to cfd!?
            assert(false);
          }
        } else if (cf_in_not_found) {
          column_families_not_found.erase(edit.column_family_);
        } else {
          s = Status::Corruption(
              "Manifest - dropping non-existing column family");
          break;
        }
      } else if (!cf_in_not_found) {
        if (!cf_in_builders) {
          s = Status::Corruption(
              "Manifest record referencing unknown column family");
          break;
        }

        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
        // this should never happen since cf_in_builders is true
        assert(cfd != nullptr);

        // if it is not column family add or column family drop,
        // then it's a file add/delete, which should be forwarded
        // to builder
        auto builder = builders.find(edit.column_family_);
        assert(builder != builders.end());
        builder->second->version_builder()->Apply(&edit);
      }

      if (cfd != nullptr) {
        if (edit.has_log_number_) {
          if (cfd->GetLogNumber() > edit.log_number_) {
            ROCKS_LOG_WARN(
                db_options_->info_log,
                "MANIFEST corruption detected, but ignored - Log numbers in "
                "records NOT monotonically increasing");
          } else {
            cfd->SetLogNumber(edit.log_number_);
            have_log_number = true;
          }
        }
        if (edit.has_comparator_ &&
            edit.comparator_ != cfd->user_comparator()->Name()) {
          s = Status::InvalidArgument(
              cfd->user_comparator()->Name(),
              "does not match existing comparator " + edit.comparator_);
          break;
        }
      }

      if (edit.has_prev_log_number_) {
        previous_log_number = edit.prev_log_number_;
        have_prev_log_number = true;
      }

      if (edit.has_next_file_number_) {
        next_file = edit.next_file_number_;
        have_next_file = true;
      }

      if (edit.has_max_column_family_) {
        max_column_family = edit.max_column_family_;
      }

      if (edit.has_last_sequence_) {
        last_sequence = edit.last_sequence_;
        have_last_sequence = true;
      }
    }