Author: ruze
Tokudb数据节点写盘主要是由后台线程异步完成的:
数据在磁盘上是序列化过的,序列化的过程就是把一个数据结构转换成字节流。
写数据包括两个阶段:
tokudb序列化和压缩单位是partition,对于internal节点,就是把msg buffer序列化并压缩;对于leaf节点,就是把basement node序列化并压缩。
一个节点(node)在磁盘上是如何存储的呢? 节点数据在写盘时会被写到某个offset开始的位置,这个offset是从blocktable里面分配的一个空闲的空间。我们后面会专门写一篇有关btt(Block Translation Table)和block table的文章。 一个node的数据包含:header,pivot key和partition三部分:
有趣的是,压缩算法的信息是存放在partition压缩buffer的第一个字节。所以,tokudb支持FT索引内部同时使用多种压缩算法。
Tokudb读盘的过程是在cachetable里通过调用get_and_pin系列函数实现
数据从磁盘读到内存之前需要进行解压缩,然后对解压缩好的buffer进行反序列化,转换成内存数据结构。反序列化是使用序列化相反的方法把数据解析出来。
前面提过序列化和压缩的单位是partition,反序列化和解压缩的单位也是partition。
酱,节点数据就可以被FT层访问了。
这里顺便提一下BTT (Block Translation Table),这个表记录了节点(blocknum)在FT文件存储位置(offset)的映射关系。
为什么要引入这个表?Tokudb刷脏时,数据被写到一个新的空闲位置,避免了in-place update,简化recovery过程。
toku_ftnode_flush_callback是调用get_and_pin系列函数提供的flush_callback回调,checkpoint线程(也包含checkpoint thread pool的线程,在checkpoint过程中帮助前景线程做节点数据的回写)或evictor线程在这个函数里面会调用toku_serialize_ftnode_to做序列化和压缩工作。
toku_serialize_ftnode_to比较简单,首先调用toku_serialize_ftnode_to_memory执行序列化和压缩,然后调用blocktable.realloc_on_disk,为blocknum分配一个新的offset,最后调用pwrite把压缩的buffer写到盘上,回写完成清node->dirty标记。
这里单独说一下toku_serialize_ftnode_to_memory的第6个参数in_parallel,true表示并行处理序列化和压缩过程,false表示串行处理。
toku_ftnode_flush_callback通常是在evictor或者checkpoint线程上下文调用的,不影响前景线程服务客户端,这个参数一般是false,只有在loader场景下是true。
toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT ft, bool for_checkpoint) {
size_t n_to_write;
size_t n_uncompressed_bytes;
char *compressed_buf = nullptr;
// because toku_serialize_ftnode_to is only called for
// in toku_ftnode_flush_callback, we pass false
// for in_parallel. The reasoning is that when we write
// nodes to disk via toku_ftnode_flush_callback, we
// assume that it is being done on a non-critical
// background thread (probably for checkpointing), and therefore
// should not hog CPU,
//
// Should the above facts change, we may want to revisit
// passing false for in_parallel here
//
// alternatively, we could have made in_parallel a parameter
// for toku_serialize_ftnode_to, but instead we did this.
int r = toku_serialize_ftnode_to_memory(
node,
ndd,
ft->h->basementnodesize,
ft->h->compression_method,
do_rebalancing,
toku_drd_unsafe_fetch(&toku_serialize_in_parallel),
&n_to_write,
&n_uncompressed_bytes,
&compressed_buf
);
if (r != 0) {
return r;
}
// If the node has never been written, then write the whole buffer, including the zeros
invariant(blocknum.b>=0);
DISKOFF offset;
// Dirties the ft
ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset,
ft, fd, for_checkpoint);
tokutime_t t0 = toku_time_now();
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
tokutime_t t1 = toku_time_now();
tokutime_t io_time = t1 - t0;
toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
toku_free(compressed_buf);
node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
return 0;
}
序列化和压缩过程是在toku_serialize_ftnode_to_memory实现,这个函数比较长,我们分成3段来看。
toku_serialize_ftnode_to_memory的第5个参数do_rebalancing表示leaf节点在写回之前是否要做rebalance,这个参数是在toku_ftnode_flush_callback指定的,如果写回的是数据节点本身,那么是需要做rebalance的。
toku_serialize_ftnode_to_memory首先确保整个数据节点都在内存中,这么做是因为节点的partition数据是依次顺序存放的;然后根据do_rebalancing决定是否要对leaf节点做rebalance;接着是一大段内存分配:
这里有个小的优化,并没有为每个partition申请compressed的buffer,而是申请了一个足够大的buffer,每个partition使用其中的一段。uncompressed的buffer也是一样处理的。
足够大的buffer是什么意思呢?
使用不同压缩算法,压缩之后的最大可能长度是不同的。
分配好buffer之后,调用serialize_and_compress_in_parallel或者serialize_and_compress_serially进行序列化和压缩。
int toku_serialize_ftnode_to_memory(FTNODE node,
FTNODE_DISK_DATA* ndd,
unsigned int basementnodesize,
enum toku_compression_method compression_method,
bool do_rebalancing,
bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
/*out*/ size_t *n_bytes_to_write,
/*out*/ size_t *n_uncompressed_bytes,
/*out*/ char **bytes_to_write)
// Effect: Writes out each child to a separate malloc'd buffer, then compresses
// all of them, and writes the uncompressed header, to bytes_to_write,
// which is malloc'd.
//
// The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
// 512-byte padding is for O_DIRECT to work.
{
toku_ftnode_assert_fully_in_memory(node);
if (do_rebalancing && node->height == 0) {
toku_ftnode_leaf_rebalance(node, basementnodesize);
}
const int npartitions = node->n_children;
// Each partition represents a compressed sub block
// For internal nodes, a sub block is a message buffer
// For leaf nodes, a sub block is a basement node
toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
XREALLOC_N(npartitions, *ndd);
//
// First, let's serialize and compress the individual sub blocks
//
// determine how large our serialization and compression buffers need to be.
size_t serialize_buf_size = 0, compression_buf_size = 0;
for (int i = 0; i < node->n_children; i++) {
sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
serialize_buf_size += sb[i].uncompressed_size;
compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
}
// give each sub block a base pointer to enough buffer space for serialization and compression
toku::scoped_malloc serialize_buf(serialize_buf_size);
toku::scoped_malloc compression_buf(compression_buf_size);
for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
uncompressed_offset += sb[i].uncompressed_size;
compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
invariant(uncompressed_offset <= serialize_buf_size);
invariant(compressed_offset <= compression_buf_size);
}
// do the actual serialization now that we have buffer space
struct serialize_times st = { 0, 0 };
if (in_parallel) {
serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
} else {
serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
}
serialize_and_compress_serially就是串行调用serialize_and_compress_partition进行序列化和压缩。
static void
serialize_and_compress_serially(FTNODE node,
int npartitions,
enum toku_compression_method compression_method,
struct sub_block sb[],
struct serialize_times *st) {
for (int i = 0; i < npartitions; i++) {
serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
}
}
serialize_and_compress_in_parallel使用了threadpool来并行执行序列化和压缩,每个partition由一个专门的线程来处理。当前上下文也可以执行序列化和压缩,所以threadpool只创建了(npartitions-1)个线程。
threadpool线程执行的函数也是serialize_and_compress_partition;threadpool线程和当前上下文之间是使用work进行同步的。
static void *
serialize_and_compress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
if (w == NULL)
break;
int i = w->i;
serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
}
workset_release_ref(ws);
return arg;
}
static void
serialize_and_compress_in_parallel(FTNODE node,
int npartitions,
enum toku_compression_method compression_method,
struct sub_block sb[],
struct serialize_times *st) {
if (npartitions == 1) {
serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
} else {
int T = num_cores;
if (T > npartitions)
T = npartitions;
if (T > 0)
T = T - 1;
struct workset ws;
ZERO_STRUCT(ws);
workset_init(&ws);
struct serialize_compress_work work[npartitions];
workset_lock(&ws);
for (int i = 0; i < npartitions; i++) {
work[i] = (struct serialize_compress_work) { .base = ,
.node = node,
.i = i,
.compression_method = compression_method,
.sb = sb,
.st = { .serialize_time = 0, .compress_time = 0} };
workset_put_locked(&ws, &work[i].base);
}
workset_unlock(&ws);
toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
workset_add_ref(&ws, T);
serialize_and_compress_worker(&ws);
workset_join(&ws);
workset_destroy(&ws);
// gather up the statistics from each thread's work item
for (int i = 0; i < npartitions; i++) {
st->serialize_time += work[i].st.serialize_time;
st->compress_time += work[i].st.compress_time;
}
}
}
回到toku_serialize_ftnode_to_memory,序列化partition之后就是序列化pivot key的过程。 sb_node_info存放pivot key压缩数据的信息:
前面提到,压缩后的size是由压缩算法决定,不同的压缩算法压缩之后最大可能的size是不同的。
toku_serialize_ftnode_to_memory调用serialize_and_compress_sb_node_info把pivot key信息序列化并压缩。
pivot key的compressed buffer头8个字节分别存储pivot key的compressed size和uncompressed size,从第9个字节开始才是压缩的字节流;而checksum是针对整个compressed buffer做的。
//
// Now lets create a sub-block that has the common node information,
// This does NOT include the header
//
// determine how large our serialization and copmression buffers need to be
struct sub_block sb_node_info;
sub_block_init(&sb_node_info);
size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
// do the actual serialization now that we have buffer space
serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
//
// At this point, we have compressed each of our pieces into individual sub_blocks,
// we can put the header and all the subblocks into a single buffer and return it.
//
// update the serialize times, ignore the header for simplicity. we captured all
// of the partitions' serialize times so that's probably good enough.
toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
序列化pivot key之后,toku_serialize_ftnode_to_memory计算节点node压缩前size和压缩后的size。 计算方法很简单:partition的size总和 + pivot key的size + header的size + 4个字节的overhead(pivot key的checksum)。
节点node压缩之后的size是为分配压缩后的数据buffer,为了支持direct I/O,分配的buffer和buffer size必须是512对齐的。
分配的buffer size记在n_bytes_to_write返回给调用函数;压缩之后的数据存储在bytes_to_write指向的buffer中。
节点node压缩之前的size,就是为了返回给调用函数,记在n_uncompressed_bytes参数中。
// The total size of the node is:
// size of header + disk size of the n+1 sub_block's created above
uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
+ sb_node_info.compressed_size // compressed nodeinfo (without its checksum)
+ 4); // nodeinfo's checksum
uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
+ sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum)
+ 4); // nodeinfo's checksum
// store the BP_SIZESs
for (int i = 0; i < node->n_children; i++) {
uint32_t len = sb[i].compressed_size + 4; // data and checksum
BP_SIZE (*ndd,i) = len;
BP_START(*ndd,i) = total_node_size;
total_node_size += sb[i].compressed_size + 4;
total_uncompressed_size += sb[i].uncompressed_size + 4;
}
// now create the final serialized node
uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
char *curr_ptr = data;
前面提到节点node序列化的过程分为3个阶段:
前2个阶段都讨论过了,header的部分是调用serialize_node_header实现的。
到这里其他部分的序列化和压缩工作都做好了,header的序列化直接在前面分配好的压缩后数据buffer上进行,不需要压缩,也不必分配sub_block数据结构。
header处理完,直接把pivot key的sub_block的compressed_ptr数据和checksum拷贝过来。
pivot key处理完,直接把每个partition的compressed_ptr和checksum依次拷贝过来。
pad的部分写0。
// write the header
struct wbuf wb;
wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
serialize_node_header(node, *ndd, &wb);
assert(wb.ndone == wb.size);
curr_ptr += serialize_node_header_size(node);
// now write sb_node_info
memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
curr_ptr += sb_node_info.compressed_size;
// write the checksum
*(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
curr_ptr += sizeof(sb_node_info.xsum);
for (int i = 0; i < npartitions; i++) {
memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
curr_ptr += sb[i].compressed_size;
// write the checksum
*(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
curr_ptr += sizeof(sb[i].xsum);
}
// Zero the rest of the buffer
memset(data + total_node_size, 0, total_buffer_size - total_node_size);
assert(curr_ptr - data == total_node_size);
*bytes_to_write = data;
*n_bytes_to_write = total_buffer_size;
*n_uncompressed_bytes = total_uncompressed_size;
invariant(*n_bytes_to_write % 512 == 0);
invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
return 0;
}
假若一个node包含2个partition,它的序列化结构如下所示:
由于tokudb支持partial fetch(只读某几个partition)和partial evict(即把clean节点的部分partition释放掉),反序列化过程相比序列化过程略复杂一些。
fetch callback通过bfe这个hint告诉toku_deserialize_ftnode_from需要读那些partition。
bfe有五种类型:
只有在ft search高度>1以上的中间节点时,read_all_partitions会被设置成true,走老的代码路径deserialize_ftnode_from_fd,一次性把所有partition都读到内存中。
其他情况会调用read_ftnode_header_from_fd_into_rbuf_if_small_enough,把节点的header读到内存中,然后反序列化header并设置ndd(每个partition的offset和size);解压缩和反序列化pivot key设置pivot信息;根据bfe读取需要的partition。
节点的header,pivot key和partition都有自己的checksum信息,解析每个部分时都要确认checksum是匹配的。
enum ftnode_fetch_type {
ftnode_fetch_none = 1, // no partitions needed.
ftnode_fetch_subset, // some subset of partitions needed
ftnode_fetch_prefetch, // this is part of a prefetch call
ftnode_fetch_all, // every partition is needed
ftnode_fetch_keymatch, // one child is needed if it holds both keys
};
int
toku_deserialize_ftnode_from (int fd,
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE *ftnode,
FTNODE_DISK_DATA* ndd,
ftnode_fetch_extra *bfe
)
// Effect: Read a node in. If possible, read just the header.
{
int r = 0;
struct rbuf rb = RBUF_INITIALIZER;
// each function below takes the appropriate io/decompression/deserialize statistics
if (!bfe->read_all_partitions) {
read_ftnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->ft, &rb, bfe);
r = deserialize_ftnode_header_from_rbuf_if_small_enough(ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
} else {
// force us to do it the old way
r = -1;
}
if (r != 0) {
// Something went wrong, go back to doing it the old way.
r = deserialize_ftnode_from_fd(fd, blocknum, fullhash, ftnode, ndd, bfe, NULL);
}
toku_free(rb.buf);
return r;
}
deserialize_ftnode_header_from_rbuf_if_small_enough比较长,基本是toku_serialize_ftnode_to_memory的相反过程。
header部分是不压缩的,直接解析,比较magic number,解析node->n_children和ndd等。
然后比较header的checksum
node->n_children = rbuf_int(rb);
// Guaranteed to be have been able to read up to here. If n_children
// is too big, we may have a problem, so check that we won't overflow
// while reading the partition locations.
unsigned int nhsize;
nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in.
unsigned int needed_size;
needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo.
if (needed_size > rb->size) {
r = toku_db_badformat();
goto cleanup;
}
XMALLOC_N(node->n_children, node->bp);
XMALLOC_N(node->n_children, *ndd);
// read the partition locations
for (int i=0; i<node->n_children; i++) {
BP_START(*ndd,i) = rbuf_int(rb);
BP_SIZE (*ndd,i) = rbuf_int(rb);
}
uint32_t checksum;
checksum = toku_x1764_memory(rb->buf, rb->ndone);
uint32_t stored_checksum;
stored_checksum = rbuf_int(rb);
if (stored_checksum != checksum) {
dump_bad_block(rb->buf, rb->size);
r = TOKUDB_BAD_CHECKSUM;
goto cleanup;
}
接着处理pivot key,比较pivot key部分的checksum,解压缩,反序列化,设置pivot信息。
// Finish reading compressed the sub_block
const void **cp;
cp = (const void **) &sb_node_info.compressed_ptr;
rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
sb_node_info.xsum = rbuf_int(rb);
// let's check the checksum
uint32_t actual_xsum;
actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size);
if (sb_node_info.xsum != actual_xsum) {
r = TOKUDB_BAD_CHECKSUM;
goto cleanup;
}
// Now decompress the subblock
{
toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
tokutime_t decompress_t0 = toku_time_now();
toku_decompress(
(Bytef *) sb_node_info.uncompressed_ptr,
sb_node_info.uncompressed_size,
(Bytef *) sb_node_info.compressed_ptr,
sb_node_info.compressed_size
);
tokutime_t decompress_t1 = toku_time_now();
decompress_time = decompress_t1 - decompress_t0;
// at this point sb->uncompressed_ptr stores the serialized node info.
r = deserialize_ftnode_info(&sb_node_info, node);
if (r != 0) {
goto cleanup;
}
}
最后是根据bfe读取需要的partition,读partition是通过调用pf_callback实现的。
// Now we have the ftnode_info. We have a bunch more stuff in the
// rbuf, so we might be able to store the compressed data for some
// objects.
// We can proceed to deserialize the individual subblocks.
// setup the memory of the partitions
// for partitions being decompressed, create either message buffer or basement node
// for partitions staying compressed, create sub_block
setup_ftnode_partitions(node, bfe, false);
// We must capture deserialize and decompression time before
// the pf_callback, otherwise we would double-count.
t1 = toku_time_now();
deserialize_time = (t1 - t0) - decompress_time;
// do partial fetch if necessary
if (bfe->type != ftnode_fetch_none) {
PAIR_ATTR attr;
r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr, NULL);
if (r != 0) {
goto cleanup;
}
}
deserialize_ftnode_from_fd的部分留给读者自行分析。