数据库内核月报

数据库内核月报 - 2017 / 06

MySQL · 源码分析 · Tokudb序列化和反序列化过程

Author: ruze

序列化和写盘

Tokudb数据节点写盘主要是由后台线程异步完成的:

数据在磁盘上是序列化过的,序列化的过程就是把一个数据结构转换成字节流。

写数据包括两个阶段:

tokudb序列化和压缩单位是partition,对于internal节点,就是把msg buffer序列化并压缩;对于leaf节点,就是把basement node序列化并压缩。

一个节点(node)在磁盘上是如何存储的呢? 节点数据在写盘时会被写到某个offset开始的位置,这个offset是从blocktable里面分配的一个空闲的空间。我们后面会专门写一篇有关btt(Block Translation Table)和block table的文章。 一个node的数据包含:header,pivot key和partition三部分:

有趣的是,压缩算法的信息是存放在partition压缩buffer的第一个字节。所以,tokudb支持FT索引内部同时使用多种压缩算法。

反序列化和读盘

Tokudb读盘的过程是在cachetable里通过调用get_and_pin系列函数实现

数据从磁盘读到内存之前需要进行解压缩,然后对解压缩好的buffer进行反序列化,转换成内存数据结构。反序列化是使用序列化相反的方法把数据解析出来。

前面提过序列化和压缩的单位是partition,反序列化和解压缩的单位也是partition。

酱,节点数据就可以被FT层访问了。

序列化和压缩过程详解

这里顺便提一下BTT (Block Translation Table),这个表记录了节点(blocknum)在FT文件存储位置(offset)的映射关系。

为什么要引入这个表?Tokudb刷脏时,数据被写到一个新的空闲位置,避免了in-place update,简化recovery过程。

toku_ftnode_flush_callback是调用get_and_pin系列函数提供的flush_callback回调,checkpoint线程(也包含checkpoint thread pool的线程,在checkpoint过程中帮助前景线程做节点数据的回写)或evictor线程在这个函数里面会调用toku_serialize_ftnode_to做序列化和压缩工作。

toku_serialize_ftnode_to比较简单,首先调用toku_serialize_ftnode_to_memory执行序列化和压缩,然后调用blocktable.realloc_on_disk,为blocknum分配一个新的offset,最后调用pwrite把压缩的buffer写到盘上,回写完成清node->dirty标记。

这里单独说一下toku_serialize_ftnode_to_memory的第6个参数in_parallel,true表示并行处理序列化和压缩过程,false表示串行处理。

toku_ftnode_flush_callback通常是在evictor或者checkpoint线程上下文调用的,不影响前景线程服务客户端,这个参数一般是false,只有在loader场景下是true。

toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT ft, bool for_checkpoint) {

    size_t n_to_write;
    size_t n_uncompressed_bytes;
    char *compressed_buf = nullptr;

    // because toku_serialize_ftnode_to is only called for
    // in toku_ftnode_flush_callback, we pass false
    // for in_parallel. The reasoning is that when we write
    // nodes to disk via toku_ftnode_flush_callback, we
    // assume that it is being done on a non-critical
    // background thread (probably for checkpointing), and therefore
    // should not hog CPU,
    //
    // Should the above facts change, we may want to revisit
    // passing false for in_parallel here
    //
    // alternatively, we could have made in_parallel a parameter
    // for toku_serialize_ftnode_to, but instead we did this.
    int r = toku_serialize_ftnode_to_memory(
        node,
        ndd,
        ft->h->basementnodesize,
        ft->h->compression_method,
        do_rebalancing,
        toku_drd_unsafe_fetch(&toku_serialize_in_parallel),
        &n_to_write,
        &n_uncompressed_bytes,
        &compressed_buf
        );
    if (r != 0) {
        return r;
    }

    // If the node has never been written, then write the whole buffer, including the zeros
    invariant(blocknum.b>=0);
    DISKOFF offset;

    // Dirties the ft
    ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset,
                                   ft, fd, for_checkpoint);

    tokutime_t t0 = toku_time_now();
    toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
    tokutime_t t1 = toku_time_now();

    tokutime_t io_time = t1 - t0;
    toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);

    toku_free(compressed_buf);
    node->dirty = 0;  // See #1957.   Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
    return 0;
}

序列化和压缩过程是在toku_serialize_ftnode_to_memory实现,这个函数比较长,我们分成3段来看。

partition序列化和压缩

toku_serialize_ftnode_to_memory的第5个参数do_rebalancing表示leaf节点在写回之前是否要做rebalance,这个参数是在toku_ftnode_flush_callback指定的,如果写回的是数据节点本身,那么是需要做rebalance的。

toku_serialize_ftnode_to_memory首先确保整个数据节点都在内存中,这么做是因为节点的partition数据是依次顺序存放的;然后根据do_rebalancing决定是否要对leaf节点做rebalance;接着是一大段内存分配:

这里有个小的优化,并没有为每个partition申请compressed的buffer,而是申请了一个足够大的buffer,每个partition使用其中的一段。uncompressed的buffer也是一样处理的。

足够大的buffer是什么意思呢?

使用不同压缩算法,压缩之后的最大可能长度是不同的。

分配好buffer之后,调用serialize_and_compress_in_parallel或者serialize_and_compress_serially进行序列化和压缩。

int toku_serialize_ftnode_to_memory(FTNODE node,
                                    FTNODE_DISK_DATA* ndd,
                                    unsigned int basementnodesize,
                                    enum toku_compression_method compression_method,
                                    bool do_rebalancing,
                                    bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
                            /*out*/ size_t *n_bytes_to_write,
                            /*out*/ size_t *n_uncompressed_bytes,
                            /*out*/ char  **bytes_to_write)
// Effect: Writes out each child to a separate malloc'd buffer, then compresses
//   all of them, and writes the uncompressed header, to bytes_to_write,
//   which is malloc'd.
//
//   The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
//   512-byte padding is for O_DIRECT to work.
{
    toku_ftnode_assert_fully_in_memory(node);

    if (do_rebalancing && node->height == 0) {
        toku_ftnode_leaf_rebalance(node, basementnodesize);
    }
    const int npartitions = node->n_children;

    // Each partition represents a compressed sub block
    // For internal nodes, a sub block is a message buffer
    // For leaf nodes, a sub block is a basement node
    toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
    struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
    XREALLOC_N(npartitions, *ndd);

    //
    // First, let's serialize and compress the individual sub blocks
    //

    // determine how large our serialization and compression buffers need to be.
    size_t serialize_buf_size = 0, compression_buf_size = 0;
    for (int i = 0; i < node->n_children; i++) {
        sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
        sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
        serialize_buf_size += sb[i].uncompressed_size;
        compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
    }

    // give each sub block a base pointer to enough buffer space for serialization and compression
    toku::scoped_malloc serialize_buf(serialize_buf_size);
    toku::scoped_malloc compression_buf(compression_buf_size);
    for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
        sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
        sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
        uncompressed_offset += sb[i].uncompressed_size;
        compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
        invariant(uncompressed_offset <= serialize_buf_size);
        invariant(compressed_offset <= compression_buf_size);
    }

    // do the actual serialization now that we have buffer space
    struct serialize_times st = { 0, 0 };
    if (in_parallel) {
        serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
    } else {
        serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
    }

serialize_and_compress_serially就是串行调用serialize_and_compress_partition进行序列化和压缩。

static void
serialize_and_compress_serially(FTNODE node,
                                int npartitions,
                                enum toku_compression_method compression_method,
                                struct sub_block sb[],
                                struct serialize_times *st) {
    for (int i = 0; i < npartitions; i++) {
        serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
    }
}

serialize_and_compress_in_parallel使用了threadpool来并行执行序列化和压缩,每个partition由一个专门的线程来处理。当前上下文也可以执行序列化和压缩,所以threadpool只创建了(npartitions-1)个线程。

threadpool线程执行的函数也是serialize_and_compress_partition;threadpool线程和当前上下文之间是使用work进行同步的。

static void *
serialize_and_compress_worker(void *arg) {
    struct workset *ws = (struct workset *) arg;
    while (1) {
        struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
        if (w == NULL)
            break;
        int i = w->i;
        serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
    }
    workset_release_ref(ws);
    return arg;
}

static void
serialize_and_compress_in_parallel(FTNODE node,
                                   int npartitions,
                                   enum toku_compression_method compression_method,
                                   struct sub_block sb[],
                                   struct serialize_times *st) {
    if (npartitions == 1) {
        serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
    } else {
        int T = num_cores;
        if (T > npartitions)
            T = npartitions;
        if (T > 0)
            T = T - 1;
        struct workset ws;
        ZERO_STRUCT(ws);
        workset_init(&ws);
        struct serialize_compress_work work[npartitions];
        workset_lock(&ws);
        for (int i = 0; i < npartitions; i++) {
            work[i] = (struct serialize_compress_work) { .base = ,
                                                         .node = node,
                                                         .i = i,
                                                         .compression_method = compression_method,
                                                         .sb = sb,
                                                         .st = { .serialize_time = 0, .compress_time = 0} };
            workset_put_locked(&ws, &work[i].base);
        }
        workset_unlock(&ws);
        toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
        workset_add_ref(&ws, T);
        serialize_and_compress_worker(&ws);
        workset_join(&ws);
        workset_destroy(&ws);

        // gather up the statistics from each thread's work item
        for (int i = 0; i < npartitions; i++) {
            st->serialize_time += work[i].st.serialize_time;
            st->compress_time += work[i].st.compress_time;
        }
    }
}

pivot key序列化和压缩

回到toku_serialize_ftnode_to_memory,序列化partition之后就是序列化pivot key的过程。 sb_node_info存放pivot key压缩数据的信息:

前面提到,压缩后的size是由压缩算法决定,不同的压缩算法压缩之后最大可能的size是不同的。

toku_serialize_ftnode_to_memory调用serialize_and_compress_sb_node_info把pivot key信息序列化并压缩。

pivot key的compressed buffer头8个字节分别存储pivot key的compressed size和uncompressed size,从第9个字节开始才是压缩的字节流;而checksum是针对整个compressed buffer做的。

    //
    // Now lets create a sub-block that has the common node information,
    // This does NOT include the header
    //
    // determine how large our serialization and copmression buffers need to be
    struct sub_block sb_node_info;
    sub_block_init(&sb_node_info);
    size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
    size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
    toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
    toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
    sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
    sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
    sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
    sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();

    // do the actual serialization now that we have buffer space
    serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);

    //
    // At this point, we have compressed each of our pieces into individual sub_blocks,
    // we can put the header and all the subblocks into a single buffer and return it.
    //

    // update the serialize times, ignore the header for simplicity. we captured all
    // of the partitions' serialize times so that's probably good enough.
    toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);

header序列化

序列化pivot key之后,toku_serialize_ftnode_to_memory计算节点node压缩前size和压缩后的size。 计算方法很简单:partition的size总和 + pivot key的size + header的size + 4个字节的overhead(pivot key的checksum)。

节点node压缩之后的size是为分配压缩后的数据buffer,为了支持direct I/O,分配的buffer和buffer size必须是512对齐的。

分配的buffer size记在n_bytes_to_write返回给调用函数;压缩之后的数据存储在bytes_to_write指向的buffer中。

节点node压缩之前的size,就是为了返回给调用函数,记在n_uncompressed_bytes参数中。

    // The total size of the node is:
    // size of header + disk size of the n+1 sub_block's created above
    uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
                                 + sb_node_info.compressed_size   // compressed nodeinfo (without its checksum)
                                 + 4);                            // nodeinfo's checksum
    uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
                                 + sb_node_info.uncompressed_size   // uncompressed nodeinfo (without its checksum)
                                 + 4);                            // nodeinfo's checksum
    // store the BP_SIZESs
    for (int i = 0; i < node->n_children; i++) {
        uint32_t len         = sb[i].compressed_size + 4; // data and checksum
        BP_SIZE (*ndd,i) = len;
        BP_START(*ndd,i) = total_node_size;
        total_node_size += sb[i].compressed_size + 4;
        total_uncompressed_size += sb[i].uncompressed_size + 4;
    }

    // now create the final serialized node
    uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
    char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
    char *curr_ptr = data;

前面提到节点node序列化的过程分为3个阶段:

前2个阶段都讨论过了,header的部分是调用serialize_node_header实现的。

到这里其他部分的序列化和压缩工作都做好了,header的序列化直接在前面分配好的压缩后数据buffer上进行,不需要压缩,也不必分配sub_block数据结构。

header处理完,直接把pivot key的sub_block的compressed_ptr数据和checksum拷贝过来。

pivot key处理完,直接把每个partition的compressed_ptr和checksum依次拷贝过来。

pad的部分写0。

    // write the header
    struct wbuf wb;
    wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
    serialize_node_header(node, *ndd, &wb);
    assert(wb.ndone == wb.size);
    curr_ptr += serialize_node_header_size(node);

    // now write sb_node_info
    memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
    curr_ptr += sb_node_info.compressed_size;
    // write the checksum
    *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
    curr_ptr += sizeof(sb_node_info.xsum);

    for (int i = 0; i < npartitions; i++) {
        memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
        curr_ptr += sb[i].compressed_size;
        // write the checksum
        *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
        curr_ptr += sizeof(sb[i].xsum);
    }
    // Zero the rest of the buffer
    memset(data + total_node_size, 0, total_buffer_size - total_node_size);

    assert(curr_ptr - data == total_node_size);
    *bytes_to_write = data;
    *n_bytes_to_write = total_buffer_size;
    *n_uncompressed_bytes = total_uncompressed_size;

    invariant(*n_bytes_to_write % 512 == 0);
    invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
    return 0;
}

假若一个node包含2个partition,它的序列化结构如下所示:

image.png

反序列化和解压缩过程详解

由于tokudb支持partial fetch(只读某几个partition)和partial evict(即把clean节点的部分partition释放掉),反序列化过程相比序列化过程略复杂一些。

fetch callback通过bfe这个hint告诉toku_deserialize_ftnode_from需要读那些partition。

bfe有五种类型:

只有在ft search高度>1以上的中间节点时,read_all_partitions会被设置成true,走老的代码路径deserialize_ftnode_from_fd,一次性把所有partition都读到内存中。

其他情况会调用read_ftnode_header_from_fd_into_rbuf_if_small_enough,把节点的header读到内存中,然后反序列化header并设置ndd(每个partition的offset和size);解压缩和反序列化pivot key设置pivot信息;根据bfe读取需要的partition。

节点的header,pivot key和partition都有自己的checksum信息,解析每个部分时都要确认checksum是匹配的。

enum ftnode_fetch_type {
    ftnode_fetch_none = 1, // no partitions needed.
    ftnode_fetch_subset, // some subset of partitions needed
    ftnode_fetch_prefetch, // this is part of a prefetch call
    ftnode_fetch_all, // every partition is needed
    ftnode_fetch_keymatch, // one child is needed if it holds both keys
};

int
toku_deserialize_ftnode_from (int fd,
                               BLOCKNUM blocknum,
                               uint32_t fullhash,
                               FTNODE *ftnode,
                               FTNODE_DISK_DATA* ndd,
                               ftnode_fetch_extra *bfe
    )
// Effect: Read a node in.  If possible, read just the header.
{
    int r = 0;
    struct rbuf rb = RBUF_INITIALIZER;

    // each function below takes the appropriate io/decompression/deserialize statistics

    if (!bfe->read_all_partitions) {
        read_ftnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->ft, &rb, bfe);
        r = deserialize_ftnode_header_from_rbuf_if_small_enough(ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
    } else {
        // force us to do it the old way
        r = -1;
    }
    if (r != 0) {
        // Something went wrong, go back to doing it the old way.
        r = deserialize_ftnode_from_fd(fd, blocknum, fullhash, ftnode, ndd, bfe, NULL);
    }

    toku_free(rb.buf);
    return r;
}

deserialize_ftnode_header_from_rbuf_if_small_enough比较长,基本是toku_serialize_ftnode_to_memory的相反过程。

header部分是不压缩的,直接解析,比较magic number,解析node->n_children和ndd等。

然后比较header的checksum

    node->n_children = rbuf_int(rb);
    // Guaranteed to be have been able to read up to here.  If n_children
    // is too big, we may have a problem, so check that we won't overflow
    // while reading the partition locations.
    unsigned int nhsize;
    nhsize =  serialize_node_header_size(node); // we can do this because n_children is filled in.
    unsigned int needed_size;
    needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo.
    if (needed_size > rb->size) {
        r = toku_db_badformat();
        goto cleanup;
    }

    XMALLOC_N(node->n_children, node->bp);
    XMALLOC_N(node->n_children, *ndd);
    // read the partition locations
    for (int i=0; i<node->n_children; i++) {
        BP_START(*ndd,i) = rbuf_int(rb);
        BP_SIZE (*ndd,i) = rbuf_int(rb);
    }

    uint32_t checksum;
    checksum = toku_x1764_memory(rb->buf, rb->ndone);
    uint32_t stored_checksum;
    stored_checksum = rbuf_int(rb);
    if (stored_checksum != checksum) {
        dump_bad_block(rb->buf, rb->size);
        r = TOKUDB_BAD_CHECKSUM;
        goto cleanup;
    }

接着处理pivot key,比较pivot key部分的checksum,解压缩,反序列化,设置pivot信息。

    // Finish reading compressed the sub_block
    const void **cp;
    cp = (const void **) &sb_node_info.compressed_ptr;
    rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
    sb_node_info.xsum = rbuf_int(rb);
    // let's check the checksum
    uint32_t actual_xsum;
    actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size);
    if (sb_node_info.xsum != actual_xsum) {
        r = TOKUDB_BAD_CHECKSUM;
        goto cleanup;
    }

    // Now decompress the subblock
    {
        toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
        sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
        tokutime_t decompress_t0 = toku_time_now();
        toku_decompress(
            (Bytef *) sb_node_info.uncompressed_ptr,
            sb_node_info.uncompressed_size,
            (Bytef *) sb_node_info.compressed_ptr,
            sb_node_info.compressed_size
            );
        tokutime_t decompress_t1 = toku_time_now();
        decompress_time = decompress_t1 - decompress_t0;

        // at this point sb->uncompressed_ptr stores the serialized node info.
        r = deserialize_ftnode_info(&sb_node_info, node);
        if (r != 0) {
            goto cleanup;
        }
    }

最后是根据bfe读取需要的partition,读partition是通过调用pf_callback实现的。

    // Now we have the ftnode_info.  We have a bunch more stuff in the
    // rbuf, so we might be able to store the compressed data for some
    // objects.
    // We can proceed to deserialize the individual subblocks.

    // setup the memory of the partitions
    // for partitions being decompressed, create either message buffer or basement node
    // for partitions staying compressed, create sub_block
    setup_ftnode_partitions(node, bfe, false);

    // We must capture deserialize and decompression time before
    // the pf_callback, otherwise we would double-count.
    t1 = toku_time_now();
    deserialize_time = (t1 - t0) - decompress_time;

    // do partial fetch if necessary
    if (bfe->type != ftnode_fetch_none) {
        PAIR_ATTR attr;
        r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr, NULL);
        if (r != 0) {
            goto cleanup;
        }
    }

deserialize_ftnode_from_fd的部分留给读者自行分析。