Author: diaoliang
在RocksDB 3.0中加入了Column Family特性,加入这个特性之后,每一个KV对都会关联一个Column Family,其中默认的Column Family是 “default”. Column Family主要是提供给RocksDB一个逻辑的分区.从实现上来看不同的Column Family共享WAL,而都有自己的Memtable和SST.这就意味着我们可以很 快速已经方便的设置不同的属性给不同的Column Family以及快速删除对应的Column Family.
首先是创建Column Family,这里注意我们可以通过两种方式来创建Column Family,一种是在Open DB的时候通过传递需要创建的Column Family,一种是当DB创建并打开之后, 通过直接的CreateColumnFamily来创建Column Family.
DB::Open(const DBOptions& db_options, const std::string& name, const std::vector<ColumnFamilyDescriptor>& column_families, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
DB::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle);
这里可以看到不管是哪一种方式最终都会返回一个ColumnFamilyHandle给调用者来使用.
然后就是删除Column Family的方式,这里很简单就是传递之前创建的ColumnFamilyHandle给RocksDB,然后用以删除.
DropColumnFamily(ColumnFamilyHandle* column_family);
所有的Column Family都是通过一个叫做ColumnFamilySet的结构来管理的,而每一个Column Family都是一个ColumnFamilyData.
先来看ColumnFamilySet,这里可以看到它有两个数据结构来管理Column Family,分别是map(column_family_data_)以及一个双向链表(dummy_cfd_). 其中map用来保存Column Family名字和对应的id以及ColumnFamilyData的映射. 这里要注意在RocksDB内部是将没一个ColumnFamily的名字表示为一个uint32类型的ID(max_column_family_).也就是这个ID是一个简单的递增的数值.
class ColumnFamilySet {
public:
// ColumnFamilySet supports iteration
public:
.................................
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); }
...............................
private:
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor
// REQUIRES: DB mutex held
void RemoveColumnFamily(ColumnFamilyData* cfd);
// column_families_ and column_family_data_ need to be protected:
// * when mutating both conditions have to be satisfied:
// 1. DB mutex locked
// 2. thread currently in single-threaded write thread
// * when reading, at least one condition needs to be satisfied:
// 1. DB mutex locked
// 2. accessed from a single-threaded write thread
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
// We don't hold the refcount here, since default column family always exists
// We are also not responsible for cleaning up default_cfd_cache_. This is
// just a cache that makes common case (accessing default column family)
// faster
ColumnFamilyData* default_cfd_cache_;
..................................
};
然后来看ColumnFamilyData,这个数据结构就是用来表示一个ColumnFamily,保存了对应的信息,我们可以看到有ID/name以及当前ColumnFamily对应的所有的version(dummy_versions_). 其中这里的next_/prev_就是在ColumnFamilySet中用来表示所有ColumnFamily的双向链表.
class ColumnFamilyData {
public:
~ColumnFamilyData();
// thread-safe
uint32_t GetID() const { return id_; }
// thread-safe
const std::string& GetName() const { return name_; }
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
// Unref decreases the reference count, but does not handle deletion
// when the count goes to 0. If this method returns true then the
// caller should delete the instance immediately, or later, by calling
// FreeDeadColumnFamilies(). Unref() can only be called while holding
// a DB mutex, or during single-threaded recovery.
bool Unref() {
int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
assert(old_refs > 0);
return old_refs == 1;
}
..............................
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& options,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set);
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_
......................................................
// Thread's local copy of SuperVersion pointer
// This needs to be destructed before mutex_
std::unique_ptr<ThreadLocalPtr> local_sv_;
// pointers for a circular linked list. we use it to support iterations over
// all column families that are alive (note: dropped column families can also
// be alive as long as client holds a reference)
ColumnFamilyData* next_;
ColumnFamilyData* prev_;
...................................
ColumnFamilySet* column_family_set_;
..................................
};
然后就是返回给调用者的ColumnFamilyHandleImpl结构,这个结构主要是封装了ColumnFamilyData.
// ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client
// is done using the column family
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
// create while holding the mutex
ColumnFamilyHandleImpl(
ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
......................................
private:
ColumnFamilyData* cfd_;
DBImpl* db_;
InstrumentedMutex* mutex_;
};
接下来我们就来从ColumnFamily的创建以及删除来分析ColumnFamily的实现.我们从DBImpl::CreateColumnFamilyImpl开始.在这个函数 中首先就是通过调用GetNextColumnFamilyID来得到当前创建的ColumnFamily对应的ID(自增).然后再调用LogAndApply来对ColumnFamily 进行对应的操作.最后再返回封装好的ColumnFamilyHandle给调用者.
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
.......................................
{
...................................
VersionEdit edit;
edit.AddColumnFamily(column_family_name);
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(new_id);
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(cf_options.comparator->Name());
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
{ // write thread
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
&mutex_, directories_.GetDbDir(), false,
&cf_options);
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
........................................
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
}
.............................................
} // InstrumentedMutexLock l(&mutex_)
.................................
return s;
}
最终会在LogAndApply调用ColumnFamilySet的CreateColumnFamily函数(通过VersionSet::CreateColumnFamily),这个函数我们可看到主要做了下面三件事情
// under a DB mutex AND write thread
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, env_options_, this);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
// add to linked list
new_cfd->next_ = dummy_cfd_;
auto prev = dummy_cfd_->prev_;
new_cfd->prev_ = prev;
prev->next_ = new_cfd;
dummy_cfd_->prev_ = new_cfd;
if (id == 0) {
default_cfd_cache_ = new_cfd;
}
return new_cfd;
}
然后来看如何删除ColumnFamily,这里所有的删除最终都会调用ColumnFamilySet::RemoveColumnFamily函数,这个函数是是从两个Map中删除对应的ColumnFamily. 这里或许我们要问了,为什么管理的双向链表不需要删除呢。这里原因是这样的,由于ColumnFamilyData是通过引用计数管理的,因此只有当所有的引用计数都清零之后, 才需要真正的函数ColumnFamilyData(也就是会从双向链表中删除数据).
// under a DB mutex AND from a write thread
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end());
column_family_data_.erase(cfd_iter);
column_families_.erase(cfd->GetName());
}
因此我们来看ColumnFamilyData的析构函数.可以看到析构函数中会从双向链表中删除对应的数据,以及处理对应的Version(corrent_).
// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_.load(std::memory_order_relaxed) == 0);
// remove from linked list
auto prev = prev_;
auto next = next_;
prev->next_ = next;
next->prev_ = prev;
if (!dropped_ && column_family_set_ != nullptr) {
// If it's dropped, it's already removed from column family set
// If column_family_set_ == nullptr, this is dummy CFD and not in
// ColumnFamilySet
column_family_set_->RemoveColumnFamily(this);
}
if (current_ != nullptr) {
current_->Unref();
}
..............................
}
最后我们来看一下在磁盘上ColumnFamily是如何保存的,首先需要明确的是ColumnFamily是保存在MANIFEST文件中的,信息的保存比较简单(之前的文章有介绍), 和MANIFEST中其他的信息没什么区别,因此这里我们主要来看数据的读取以及初始化,这里所有的操作都是包含在VersionSet::Recover中,我们来看这个函数.
函数主要的逻辑就是读取MANIFEST然后来再来将磁盘上读取的ColumnFamily的信息初始化(初始化ColumnFamilySet结构),可以看到这里相当于将之前的create/drop 的操作全部回放一遍,也就是会调用CreateColumnFamily/DropColumnFamily来将磁盘的信息初始化到内存.
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
// we will delete the column family from
// column_families_not_found.
bool cf_in_not_found =
column_families_not_found.find(edit.column_family_) !=
column_families_not_found.end();
// in builders means that user supplied that column family
// option AND that we encountered column family add record
bool cf_in_builders =
builders.find(edit.column_family_) != builders.end();
// they can't both be true
assert(!(cf_in_not_found && cf_in_builders));
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) {
if (cf_in_builders || cf_in_not_found) {
s = Status::Corruption(
"Manifest adding the same column family twice");
break;
}
auto cf_options = cf_name_to_options.find(edit.column_family_name_);
if (cf_options == cf_name_to_options.end()) {
column_families_not_found.insert(
{edit.column_family_, edit.column_family_name_});
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
delete builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
} else {
// who else can have reference to cfd!?
assert(false);
}
} else if (cf_in_not_found) {
column_families_not_found.erase(edit.column_family_);
} else {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
} else if (!cf_in_not_found) {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family");
break;
}
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
}
if (cfd != nullptr) {
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
ROCKS_LOG_WARN(
db_options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
}
}
if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break;
}
}
if (edit.has_prev_log_number_) {
previous_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_max_column_family_) {
max_column_family = edit.max_column_family_;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}