Author: diaoliang
简而言之,在RocksDB中的读取需要处理的最核心的一个问题就是如何读取最新的数据,这是由于RocksDB是基于LSM,因此在RocksDB中,对于数据的delete以及update,它并不会立即去执行对应的动作,而只是插入一条新的数据,而数据的最终更新(last-write-win)以及删除是在compact的时候来做的.
其实最那就是如何读取到一个数据的最新版本,因此首先我们需要知道在RocksDB中,多个版本的数据是如何保存的。首先我们需要知道在RocksDB中,数据是保存在两个地方,一个是memtable(内存),一个是sstable(磁盘),因此RocksDB读取数据也是依次从这两个地方读取.
在这篇文章里我们只介绍内存中的数据读取.
首先我们来看memtable部分,首先我们知道在RocksDB中,每个version都会一个sequence number,每次写入都会更新这个sequence number,因此相同key的不同版本就是通过这个seq来确定的,这个sequence相当于一个时间戳,这样通过sequence我们就可以得到某一个key的最新数据.
通过上面我们知道由于用户插入或者读取的时候传递进来永远是只有user_key,因此在RocksDB内部还会维护一个internal_key,这个internal_key格式如下:
user_key + sequence + type
对应的代码:
InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t));
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
这里type就是表示当前操作,这里在memtable中,分为三种种操作,下面value就表示是插入,而merge这次暂时忽略,我们以后会详细介绍这个操作:
enum ValueType : unsigned char {
kTypeDeletion = 0x0,
kTypeValue = 0x1,
kTypeMerge = 0x2,
........................
}
然后我们来看不同版本的key被插入的时候,在RocksDB内部是如何组织的。在RocksDB中的不同版本的key是按照下面的逻辑进行排序:
increasing user key (according to user-supplied comparator)
decreasing sequence number
decreasing type (though sequence# should be enough to disambiguate)
那么此时为了读取最新的那条数据,我们只需要读取最大seq的那条数据就可以了.
对应代码就是InternalKeyComparator这个类,可以看到当key相同时说明是相同key的不同版本,因此开始进行后续的处理:
int InternalKeyComparator::Compare(const ParsedInternalKey& a,
const ParsedInternalKey& b) const {
int r = user_comparator_->Compare(a.user_key, b.user_key);
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
if (a.sequence > b.sequence) {
r = -1;
} else if (a.sequence < b.sequence) {
r = +1;
} else if (a.type > b.type) {
r = -1;
} else if (a.type < b.type) {
r = +1;
}
}
return r;
}
这里InternalKey对于用户来说是完全透明的,那么当用户来查找对应的user_key的时候,RocksDB又是如何来构建对应的internalkey呢,这里有一个核心的数据结构叫做LookupKey.我们来看这个类的实现:
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& _user_key, SequenceNumber sequence);
...................................................
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
...........................................
};
这里可以看到每次构造lookupkey的时候,必须得传入一个seq,那么这个seq是如何计算的呢,来看代码:
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
...........................................
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful either.
// Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
// specified we should be fine with skipping seq numbers that are greater
// than that.
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
} else {
.............................................................
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
}
.........................................
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
}
通过上面的代码我们可以看到每次调用Get的时候,RocksDB都会构造一个LookupKey,这里我们可以简单的认为这个seq就是当前的version最后一次写成功的seq(以后会介绍这里的publish_seq).
然后上面的代码最终会调用MemTable::Get,在分析这个函数之前我们先来看一个数据结构Saver,这个数据结构用来保存查找内容时的上下文.
struct Saver {
Status* status;
const LookupKey* key;
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress;
std::string* value;
SequenceNumber seq;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
RangeDelAggregator* range_del_agg;
MemTable* mem;
Logger* logger;
Statistics* statistics;
bool inplace_update_support;
Env* env_;
ReadCallback* callback_;
bool* is_blob_index;
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsCommitted(_seq);
}
return true;
}
};
然后我们来看MemTable::Get这个函数,这个函数最核心的步骤就是构造Saver对象,然后调用MemTableRep::Get,这里注意传递给Get的第三个参数是一个回调函数,后面我们会详细分析这个函数.
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
...............................................
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.range_del_agg = range_del_agg;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
table_->Get(key, &saver, SaveValue);
..............................................
}
然后我们来看MemTableRep::Get,首先我们需要知道MemTableRep这个类用来抽象不同的MemTable的实现,也就是说它是一个虚类,然后不同的MemTable实现了它,这里我们只来分析skiplist也就是默认的MemTable实现.
void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto iter = GetDynamicPrefixIterator();
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {
}
}
上面的函数中最核心的是两个一个是iter->Seek一个是callback_func,我们一个个来,先来分析Seek,可以看到这里Seek的时候传递进去有两个参数,一个是internal_key,一个是memtable_key,那么这两个key分别代表什么呢,我们再次回到LookupKey这个类,可以看到这里memtable_key就是(end_-start_),而internal_key就是(end_-kstart_)
class LookupKey {
public:
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const {
return Slice(start_, static_cast<size_t>(end_ - start_));
}
// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_));
}
然后那么对应的这三个变量又表示什么呢,我们来看LookupKey的构造函数:
LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
size_t usize = _user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
// NOTE: We don't support users keys of more than 2GB :)
dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
kstart_ = dst;
memcpy(dst, _user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
通过上面的构造函数可以看到在LookupKey中会把全部的internal_key(user_key+seq+type)和RocksDB为user_key所添加的内容指针分别保存起来,也就是memtable_key保存了内部使用的key,而internal_key保存了RocksDB为构造在内部key添加的内容.这里可以看到查找的时候,保存的type是一个特殊的type,这个type其实是kTypeBlobIndex,也就是是值最大的type.那么为什么要这么做呢,我们在分析之前先来看对应的Seek函数.
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& user_key, const char* memtable_key)
override {
if (memtable_key != nullptr) {
iter_.Seek(memtable_key);
} else {
iter_.Seek(EncodeKey(&tmp_, user_key));
}
}
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
node_ = list_->FindGreaterOrEqual(target);
}
这里由于上面的memtable_key肯定不为null,那么就是会调用下面对应的Seek函数,而这个函数最终会调用skiplist的FindGreaterOrEqual函数,这个函数也就是用来定位到大于或者等于memtable_key的位置,此时我们再回忆下一开始介绍的key的排序(InternalKeyComparator::Compare),也就是当Key相同时,按照seq的降序,如果seq相同则按照type的降序,那么此时FindGreaterOrEqual就比较好理解了,也就是会返回小于我们输入seq的值,而当seq相等的话,则会返回小于我们的输入type的值(由于我们传入的是最大的type,因此也就是会直接返回值).那么此时返回的位置有可能key本身就比我们的输入key小,并且我们还需要肯根据不同的type来做不同的操作,那么此时就需要SaveValue回调了.
接下来我们来看对应的callbakc_func(SaveValue)函数,这个函数有两个参数,第一个参数是之前保存的Saver对象,第二个则就是我们在skiplist中定位到的位置.这个函数要做的比较简单,首先就是判断是否得到的key和我们传递进来的key相同,如果不同,则说明查找的key不合法,因此直接返回.这里我们着重来看对于插入和删除的处理.
static bool SaveValue(void* arg, const char* entry) {
......................................................
if (s->mem->GetInternalKeyComparator().user_comparator()->Equal(
Slice(key_ptr, key_length - 8), s->key->user_key())) {
...........................................................
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}
*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
}
return false;
}
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else {
*(s->status) = Status::NotFound();
}
*(s->found_final_value) = true;
return false;
}
}
}
这里我们暂时忽略merge相关的操作(以后我们会重开篇幅来分析merge),那么实现就比较简单了,当查找到对应的值的时候,直接赋值然后返回给用户(设置found_final_value).这里可以看到如果是Delete的话,直接返回NotFound.
下一篇我们将会来介绍RocksDB如何在sstable中查找到对应的数据.