Author: ruze
Mysql是基于代价cost来选择索引,如果一个表有好几个索引,optimizer会分别计算每个索引访问的代价,选择代价最小的索引进行访问,这个索引也被称为access path。
Mysql在执行query语句的时候会在server层计算每个可选索引的代价,并选择代价最小的索引作为访问路径(access path)去引擎读取数据。 server层的handler类为引擎层提供一个框架来计算索引的代价。
Tokudb的records_in_range:根据search condition区间的大小做不同的处理。
当start_key和end_key不在同一个basement节点时,keys_range64是通过估算得到记录条数的。 估算的值受FT tree layout影响,FT tree可以是瘦高的,也可以是扁平的,不同的layout计算的结果可能会差别比较大。 而且,tokudb的键值key个数(in_memory_stats.numrows)也是个统计值,是每次在leaf节点做msn apply更新的,这个值也可能不准确。
ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* end_key) {
DBT *pleft_key, *pright_key;
DBT left_key, right_key;
ha_rows ret_val = HA_TOKUDB_RANGE_COUNT;
DB *kfile = share->key_file[keynr];
uint64_t rows = 0;
int error;
// get start_rows and end_rows values so that we can estimate range
// when calling key_range64, the only value we can trust is the value for less
// The reason is that the key being passed in may be a prefix of keys in the DB
// As a result, equal may be 0 and greater may actually be equal+greater
// So, we call key_range64 on the key, and the key that is after it.
if (!start_key && !end_key) {
error = estimate_num_rows(kfile, &rows, transaction);
if (error) {
ret_val = HA_TOKUDB_RANGE_COUNT;
goto cleanup;
}
ret_val = (rows <= 1) ? 1 : rows;
goto cleanup;
}
if (start_key) {
uchar inf_byte = (start_key->flag == HA_READ_KEY_EXACT) ? COL_NEG_INF : COL_POS_INF;
pack_key(&left_key, keynr, key_buff, start_key->key, start_key->length, inf_byte);
pleft_key = &left_key;
} else {
pleft_key = NULL;
}
if (end_key) {
uchar inf_byte = (end_key->flag == HA_READ_BEFORE_KEY) ? COL_NEG_INF : COL_POS_INF;
pack_key(&right_key, keynr, key_buff2, end_key->key, end_key->length, inf_byte);
pright_key = &right_key;
} else {
pright_key = NULL;
}
// keys_range64 can not handle a degenerate range (left_key > right_key), so we filter here
if (pleft_key && pright_key && tokudb_cmp_dbt_key(kfile, pleft_key, pright_key) > 0) {
rows = 0;
} else {
uint64_t less, equal1, middle, equal2, greater;
bool is_exact;
error = kfile->keys_range64(kfile, transaction, pleft_key, pright_key,
&less, &equal1, &middle, &equal2, &greater, &is_exact);
if (error) {
ret_val = HA_TOKUDB_RANGE_COUNT;
goto cleanup;
}
rows = middle;
}
// MySQL thinks a return value of 0 means there are exactly 0 rows
// Therefore, always return non-zero so this assumption is not made
ret_val = (ha_rows) (rows <= 1 ? 1 : rows);
cleanup:
if (tokudb_debug & TOKUDB_DEBUG_RETURN) {
TOKUDB_HANDLER_TRACE("return %" PRIu64 " %" PRIu64, (uint64_t) ret_val, rows);
}
DBUG_RETURN(ret_val);
}
用户创建的表可能只有数据没有索引,也可能有好几个索引。optimizer选择索引的过程:用search condition找出可用索引的集合,然后尝试用每个可选索引计算代价,也就是计算read_time。这个值是根据records_in_range返回的记录条数计算出来的。 Optimizer会选择代价最小的索引(在server层被称为access path)去引擎里面取数据,访问access path的方式可能是index point query/index range query也可能是full index scan,也可能是full table scan。
选定了索引,server层会把索引信息(keynr)传给引擎,在引擎层创建索引的cursor去读取数据。一般来说,对于full table scan引擎层会隐式转成pk index scan。
我们先来看一下full table scan的函数:
int ha_tokudb::rnd_init(bool scan) {
int error = 0;
range_lock_grabbed = false;
error = index_init(MAX_KEY, 0);
if (error) { goto cleanup;}
if (scan) {
error = prelock_range(NULL, NULL);
if (error) { goto cleanup; }
range_lock_grabbed = true;
}
error = 0;
cleanup:
if (error) {
index_end();
last_cursor_error = error;
}
TOKUDB_HANDLER_DBUG_RETURN(error);
}
int ha_tokudb::rnd_next(uchar * buf) {
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
int error = get_next(buf, 1, NULL, false);
TOKUDB_HANDLER_DBUG_RETURN(error);
}
int ha_tokudb::rnd_end() {
range_lock_grabbed = false;
TOKUDB_HANDLER_DBUG_RETURN(index_end());
}
当search condition非空时,server层可能会选择使用ICP (index condition pushdown),把search condition下推到引擎层来做过滤。
Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
toku_pushed_idx_cond_keyno = keyno_arg;
toku_pushed_idx_cond = idx_cond_arg;
return idx_cond_arg;
}
前面提到full table scan会隐式转为pk index scan,在rnd_init中调用index_init把tokudb_active_index设置为primary_key。 Handler类成员active_index表示当前索引index,这个值等于MAX_KEY(64)表示full table scan。 Tokudb类成员tokudb_active_index表示tokudb当前的索引index,一般来说这个值跟active_index是一样的。 Full table scan是个例外,active_index等于MAX_KEY,tokudb_active_index等于primary_key。 Index_init中最重要的工作就是创建cursor,并且重置bulk fetch信息。bulk fetch将在get_next函数中详细讨论。
int ha_tokudb::index_init(uint keynr, bool sorted) {
int error;
THD* thd = ha_thd();
/*
Under some very rare conditions (like full joins) we may already have
an active cursor at this point
*/
if (cursor) {
int r = cursor->c_close(cursor);
assert(r==0);
remove_from_trx_handler_list();
}
active_index = keynr;
if (active_index < MAX_KEY) {
DBUG_ASSERT(keynr <= table->s->keys);
} else {
DBUG_ASSERT(active_index == MAX_KEY);
keynr = primary_key;
}
tokudb_active_index = keynr;
#if TOKU_CLUSTERING_IS_COVERING
if (keynr < table->s->keys && table->key_info[keynr].option_struct->clustering)
key_read = false;
#endif
last_cursor_error = 0;
range_lock_grabbed = false;
range_lock_grabbed_null = false;
DBUG_ASSERT(share->key_file[keynr]);
cursor_flags = get_cursor_isolation_flags(lock.type, thd);
if (use_write_locks) {
cursor_flags |= DB_RMW;
}
if (get_disable_prefetching(thd)) {
cursor_flags |= DBC_DISABLE_PREFETCHING;
}
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
last_cursor_error = error;
cursor = NULL; // Safety
goto exit;
}
cursor->c_set_check_interrupt_callback(cursor, tokudb_killed_thd_callback, thd);
memset((void *) &last_key, 0, sizeof(last_key));
add_to_trx_handler_list();
if (thd_sql_command(thd) == SQLCOM_SELECT) {
set_query_columns(keynr);
unpack_entire_row = false;
}
else {
unpack_entire_row = true;
}
invalidate_bulk_fetch();
doing_bulk_fetch = false;
maybe_index_scan = false;
error = 0;
exit:
TOKUDB_HANDLER_DBUG_RETURN(error);
}
初始化cursor之后,server层会调下面四个函数之一去拿<start_key,end_key>区间的range锁。
这四个函数都是调用prelock_range去拿rangelock。
Start_key和end_key就是server层传下来的range区间的起点和终点,是server层的数据结构,prelock_range会生成相应的索引key并获取索引key的rangelock。
Full table scan的时候,rnd_init直接调用prelock_range拿<负无穷,正无穷>区间的rangelock,也就是锁表。 由于full table scan转pk index scan是在引擎内部做隐式转换,sever层并不知道,不走prepare_index_scan。
Rangelock的机制在之前的月报有提到。 如果既不是SERIALIZABLE隔离级别,也不是为写操作读取数据,调用prelock_range是不会真的去拿rangelock锁的。此种情况的rangelock锁是在query成功返回前拿的,防止并发事务更新相应的数据。
static int
c_set_bounds(DBC *dbc, const DBT *left_key, const DBT *right_key, bool pre_acquire, int out_of_range_error) {
if (out_of_range_error != DB_NOTFOUND &&
out_of_range_error != TOKUDB_OUT_OF_RANGE &&
out_of_range_error != 0) {
return toku_ydb_do_error(
dbc->dbp->dbenv,
EINVAL,
"Invalid out_of_range_error [%d] for %s\n",
out_of_range_error,
__FUNCTION__
);
}
if (left_key == toku_dbt_negative_infinity() && right_key == toku_dbt_positive_infinity()) {
out_of_range_error = 0;
}
DB *db = dbc->dbp;
DB_TXN *txn = dbc_struct_i(dbc)->txn;
HANDLE_PANICKED_DB(db);
toku_ft_cursor_set_range_lock(dbc_ftcursor(dbc), left_key, right_key,
(left_key == toku_dbt_negative_infinity()),
(right_key == toku_dbt_positive_infinity()),
out_of_range_error);
if (!db->i->lt || !txn || !pre_acquire)
return 0;
//READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
if (!dbc_struct_i(dbc)->rmw && dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE)
return 0;
toku::lock_request::type lock_type = dbc_struct_i(dbc)->rmw ?
toku::lock_request::type::WRITE : toku::lock_request::type::READ;
int r = toku_db_get_range_lock(db, txn, left_key, right_key, lock_type);
return r;
}
如果server层指定了range的start_key和end_key,handler的执行框架会根据execution plan指定的方式访问索引数据。
第一行数据的访问方式:
Index_read函数比较长,举几个常见的场景来说明 1) index point query:
2) index range query:
3) reverse index range query:
int ha_tokudb::index_read(
uchar* buf,
const uchar* key,
uint key_len,
enum ha_rkey_function find_flag) {
invalidate_bulk_fetch();
DBT row;
DBT lookup_key;
int error = 0;
uint32_t flags = 0;
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data*)thd_get_ha_data(thd, tokudb_hton);
struct smart_dbt_info info;
struct index_read_info ir_info;
HANDLE_INVALID_CURSOR();
// if we locked a non-null key range and we now have a null key, then
// remove the bounds from the cursor
if (range_lock_grabbed &&
!range_lock_grabbed_null &&
index_key_is_null(table, tokudb_active_index, key, key_len)) {
range_lock_grabbed = range_lock_grabbed_null = false;
cursor->c_remove_restriction(cursor);
}
ha_statistic_increment(&SSV::ha_read_key_count);
memset((void *) &row, 0, sizeof(row));
info.ha = this;
info.buf = buf;
info.keynr = tokudb_active_index;
ir_info.smart_dbt_info = info;
ir_info.cmp = 0;
flags = SET_PRELOCK_FLAG(0);
switch (find_flag) {
case HA_READ_KEY_EXACT: /* Find first record else error */ {
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
DBT lookup_bound;
pack_key(&lookup_bound, tokudb_active_index, key_buff4, key, key_len, COL_POS_INF);
ir_info.orig_key = &lookup_key;
error = cursor->c_getf_set_range_with_bound(cursor, flags, &lookup_key, &lookup_bound, SMART_DBT_IR_CALLBACK(key_read), &ir_info);
if (ir_info.cmp) {
error = DB_NOTFOUND;
}
break;
}
case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_POS_INF);
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
break;
case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
break;
case HA_READ_KEY_OR_NEXT: /* Record or next record */
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
break;
//
// This case does not seem to ever be used, it is ok for it to be slow
//
case HA_READ_KEY_OR_PREV: /* Record or previous */
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
ir_info.orig_key = &lookup_key;
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK(key_read), &ir_info);
if (error == DB_NOTFOUND) {
error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK(key_read), &info);
}
else if (ir_info.cmp) {
error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(key_read), &info);
}
break;
case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_POS_INF);
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
break;
case HA_READ_PREFIX_LAST:
pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_POS_INF);
ir_info.orig_key = &lookup_key;
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK(key_read), &ir_info);
if (ir_info.cmp) {
error = DB_NOTFOUND;
}
break;
default:
TOKUDB_HANDLER_TRACE("unsupported:%d", find_flag);
error = HA_ERR_UNSUPPORTED;
break;
}
error = handle_cursor_error(error,HA_ERR_KEY_NOT_FOUND,tokudb_active_index);
if (!error && !key_read && tokudb_active_index != primary_key && !key_is_clustering(&table->key_info[tokudb_active_index])) {
error = read_full_row(buf);
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_HANDLER_DBUG_RETURN(error);
}
Index_read在调用ydb_cursor.cc中的回调函数时,flags参数初始化为0。ydb_cursor.cc中注册的回调函数会检查tokudb_cursor->rmw标记,如果tokudb_cursor->rmw是0,并且不是SERIALIZABLE隔离级别,函数 query_context_with_input_init会设置context的do_locking字段,告诉toku_ft_cursor_set_range在成功返回前去拿rangelock。
static int
c_getf_set_range_with_bound(DBC *c, uint32_t flag, DBT *key, DBT *key_bound, YDB_CALLBACK_FUNCTION f, void *extra) {
HANDLE_PANICKED_DB(c->dbp);
HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
int r = 0;
QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
while (r == 0) {
//toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, key_bound, c_getf_set_range_callback, &context);
if (r == DB_LOCK_NOTGRANTED) {
r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
} else {
break;
}
}
query_context_base_destroy(&context.base);
return r;
}
static int
c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
QUERY_CONTEXT_BASE context = &super_context->base;
int r;
DBT found_key = { .data = (void *) key, .size = keylen };
//Lock:
// left(key,val) = (input_key, -infinity)
// right(key) = found ? found_key : infinity
// right(val) = found ? found_val : infinity
if (context->do_locking) {
const DBT *left_key = super_context->input_key;
const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key, query_context_determine_lock_type(context), &context->request);
} else {
r = 0;
}
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL && !lock_only) {
DBT found_val = { .data = (void *) val, .size = vallen };
context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
r = context->r_user_callback;
}
//Give ft-layer an error (if any) to return from toku_ft_cursor_set_range
return r;
}
取到第一行数据后,server层会根据execution plan来调用 index_next(index_next_same)或者index_prev来取后面的记录。
还有两个读取数据的方法:
这几个函数比较简单,这里只分析index_next函数,感兴趣的朋友可以自行分析其余的函数。 index_next直接调用get_next函数读取下一条记录。
int ha_tokudb::index_next(uchar * buf) {
TOKUDB_HANDLER_DBUG_ENTER("");
ha_statistic_increment(&SSV::ha_read_next_count);
int error = get_next(buf, 1, NULL, key_read);
TOKUDB_HANDLER_DBUG_RETURN(error);
}
Tokudb为range query做了一个优化,被称作bulk fetch。对当前basement节点上落在range区间的key进行批量读取,一次msg apply多次读key操作,同时也减轻leaf节点读写锁争抢,避免频繁拿锁放锁。 Tokudb为提供bulk fetch功能,增加了如下几个数据成员:
class ha_tokudb : public handler {
private:
...
uchar* range_query_buff; // range query buffer
uint32_t size_range_query_buff; // size of the allocated range query buffer
uint32_t bytes_used_in_range_query_buff; // number of bytes used in the range query buffer
uint32_t curr_range_query_buff_offset; // current offset into the range query buffer for queries to read
uint64_t bulk_fetch_iteration;
uint64_t rows_fetched_using_bulk_fetch;
bool doing_bulk_fetch;
...
};
Get_next函数首先判读是否可以从当前的bulk fetch bufffer中读取数据,判读的标准是bytes_used_in_range_query_buff - curr_range_query_buff_offset > 0,表示bulk fetch buffer有数据可以读取。
如果用户禁掉bulk fetch的功能,该如何处理呢?禁掉bulk fetch,第1和第2两个条件都不满足,直接执行invalidate_bulk_fetch,然后检查doing_bulk_fetch标记为false,调用cursor读取数据。这部分比较简单,请读者自行分析。
bulk fetch的处理跟非bulk fetch的处理类似,最大的区别在于cursor->c_getf_XXX方法的回调函数和回调函数参数。它们分别被设置为smart_dbt_bf_callback和struct smart_dbt_bf_info结构的指针。
smart_dbt_bf_info结构告诉回调函数如何缓存当前数据,并且如何读取下一行数据。
typedef struct smart_dbt_bf_info {
ha_tokudb* ha;
bool need_val;
int direction;
THD* thd;
uchar* buf;
DBT* key_to_compare;
} *SMART_DBT_BF_INFO
一个批量读取完成后,get_next会调用read_data_from_range_query_buff,从bulk fetch buffer中取数据。
Get_next成功读取一行数据后,需要判断是否是需要回表读取row。对于pk和cluster index的情况,index中存储了完整数据不需要回表。
int ha_tokudb::get_next(
uchar* buf,
int direction,
DBT* key_to_compare,
bool do_key_read) {
int error = 0;
HANDLE_INVALID_CURSOR();
if (maybe_index_scan) {
maybe_index_scan = false;
if (!range_lock_grabbed) {
error = prepare_index_scan();
}
}
if (!error) {
uint32_t flags = SET_PRELOCK_FLAG(0);
// we need to read the val of what we retrieve if
// we do NOT have a covering index AND we are using a clustering secondary
// key
bool need_val =
(do_key_read == 0) &&
(tokudb_active_index == primary_key ||
key_is_clustering(&table->key_info[tokudb_active_index]));
if ((bytes_used_in_range_query_buff -
curr_range_query_buff_offset) > 0) {
error = read_data_from_range_query_buff(buf, need_val, do_key_read);
} else if (icp_went_out_of_range) {
icp_went_out_of_range = false;
error = HA_ERR_END_OF_FILE;
} else {
invalidate_bulk_fetch();
if (doing_bulk_fetch) {
struct smart_dbt_bf_info bf_info;
bf_info.ha = this;
// you need the val if you have a clustering index and key_read is not 0;
bf_info.direction = direction;
bf_info.thd = ha_thd();
bf_info.need_val = need_val;
bf_info.buf = buf;
bf_info.key_to_compare = key_to_compare;
//
// call c_getf_next with purpose of filling in range_query_buff
//
rows_fetched_using_bulk_fetch = 0;
// it is expected that we can do ICP in the smart_dbt_bf_callback
// as a result, it's possible we don't return any data because
// none of the rows matched the index condition. Therefore, we need
// this while loop. icp_out_of_range will be set if we hit a row that
// the index condition states is out of our range. When that hits,
// we know all the data in the buffer is the last data we will retrieve
while (bytes_used_in_range_query_buff == 0 &&
!icp_went_out_of_range && error == 0) {
if (direction > 0) {
error =
cursor->c_getf_next(
cursor,
flags,
smart_dbt_bf_callback,
&bf_info);
} else {
error =
cursor->c_getf_prev(
cursor,
flags,
smart_dbt_bf_callback,
&bf_info);
}
}
// if there is no data set and we went out of range,
// then there is nothing to return
if (bytes_used_in_range_query_buff == 0 &&
icp_went_out_of_range) {
icp_went_out_of_range = false;
error = HA_ERR_END_OF_FILE;
}
if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
bulk_fetch_iteration++;
}
error =
handle_cursor_error(
error,
HA_ERR_END_OF_FILE,
tokudb_active_index);
if (error) {
goto cleanup;
}
//
// now that range_query_buff is filled, read an element
//
error =
read_data_from_range_query_buff(buf, need_val, do_key_read);
} else {
struct smart_dbt_info info;
info.ha = this;
info.buf = buf;
info.keynr = tokudb_active_index;
if (direction > 0) {
error =
cursor->c_getf_next(
cursor,
flags,
SMART_DBT_CALLBACK(do_key_read),
&info);
} else {
error =
cursor->c_getf_prev(
cursor,
flags,
SMART_DBT_CALLBACK(do_key_read),
&info);
}
error =
handle_cursor_error(
error,
HA_ERR_END_OF_FILE,
tokudb_active_index);
}
}
}
//
// at this point, one of two things has happened
// either we have unpacked the data into buf, and we
// are done, or we have unpacked the primary key
// into last_key, and we use the code below to
// read the full row by doing a point query into the
// main table.
//
if (!error &&
!do_key_read &&
(tokudb_active_index != primary_key) &&
!key_is_clustering(&table->key_info[tokudb_active_index])) {
error = read_full_row(buf);
}
if (!error) {
THD *thd = ha_thd();
tokudb_trx_data* trx =
static_cast<tokudb_trx_data*>(thd_get_ha_data(thd, tokudb_hton));
trx->stmt_progress.queried++;
track_progress(thd);
if (thd_killed(thd))
error = ER_ABORTING_CONNECTION;
}
cleanup:
return error;
}
下面我们一起看看回调函数smart_dbt_bf_callback的处理。这个函数是fill_range_query_buf简单封装,当成功读取一行索引数据后,把结果缓存到bulk fetch buffer中,并继续读取下一行数据。
static int smart_dbt_bf_callback(
DBT const* key,
DBT const* row,
void* context) {
SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context;
return
info->ha->fill_range_query_buf(
info->need_val,
key,
row,
info->direction,
info->thd,
info->buf,
info->key_to_compare);
}
接下来,让我们把目光聚焦在fill_range_query_buf函数。参数key和value是当前读取到索引key和value;其余的参数是从smart_dbt_bf_info中结构提取出来的server层调用时指定的信息。
如果指定了key_to_compare,需要判断当前读取的key是否等于key_to_compare,因为二级索引的key后面拼了pk,所以这里做的是前缀比较。如果前缀不匹配,表示已经读到一个新key,设置icp_went_out_of_range并退出。
如果server层设置了ICP信息,需要判断当前读取的索引key是否在range范围内。 一般来说,判断是否在range范围内的方法是跟prelocked_right_range(range scan)或者prelocked_left_range(reverse range scan)比较的。 而ICP的情况下,判断是否在range范围内是跟end_range做比较的。 对于索引key不在range范围内的情况,设置icp_went_out_of_range并返回。
如果当前读到的索引key是在range范围内,ICP的情况还要做过滤条件检查。如果满足过滤条件,就存储到bulk fetch buffer中;不满足过滤条件,就跳过这条记录取下一条。
把key存储到bulk fetch buffer中时,需要检查need_val。为true时,先存key后存value;否则,只存key。 Value要存的数据可能是整个row,可能是set_query_columns函数记录的那些字段的数据。如果是第二种情况,需要把相应字段的数据提取出来。 Bulk fetch buffer中的数据按照一定格式存储,先存4个字节的size,接着存data。 当前key的存储位置是在bytes_used_in_range_query_buff偏移位置。 把key/value数据缓存到bulk fetch buffer中以后,还需要更新bytes_used_in_range_query_buff指向下一次写入的位置。
对于非ICP的情况,在fill_range_query_buf函数的最后判断是否超出range范围。这里跟prelocked_right_range(range scan)或者prelocked_left_range(reverse range scan)做比较。这两个值是在prelock_range函数设置的,也是rangelock的范围。
如果当前读取的key属于range范围内,需要继续读取下一条数据到bulk fetch buffer中,fill_range_query_buf返回TOKUDB_CURSOR_CONTINUE告诉toku_ft_search继续读取当前basement节点的下一条数据。 bulk fetch不能跨越basement节点,因为无法保证其他basement节点上是否做过msg apply。
int ha_tokudb::fill_range_query_buf(
bool need_val,
DBT const* key,
DBT const* row,
int direction,
THD* thd,
uchar* buf,
DBT* key_to_compare) {
int error;
//
// first put the value into range_query_buf
//
uint32_t size_remaining =
size_range_query_buff - bytes_used_in_range_query_buff;
uint32_t size_needed;
uint32_t user_defined_size = tokudb::sysvars::read_buf_size(thd);
uchar* curr_pos = NULL;
if (key_to_compare) {
int cmp = tokudb_prefix_cmp_dbt_key(
share->key_file[tokudb_active_index],
key_to_compare,
key);
if (cmp) {
icp_went_out_of_range = true;
error = 0;
goto cleanup;
}
}
// if we have an index condition pushed down, we check it
if (toku_pushed_idx_cond &&
(tokudb_active_index == toku_pushed_idx_cond_keyno)) {
unpack_key(buf, key, tokudb_active_index);
enum icp_result result =
toku_handler_index_cond_check(toku_pushed_idx_cond);
// If we have reason to stop, we set icp_went_out_of_range and get out
// otherwise, if we simply see that the current key is no match,
// we tell the cursor to continue and don't store
// the key locally
if (result == ICP_OUT_OF_RANGE || thd_killed(thd)) {
icp_went_out_of_range = true;
error = 0;
DEBUG_SYNC(ha_thd(), "tokudb_icp_asc_scan_out_of_range");
goto cleanup;
} else if (result == ICP_NO_MATCH) {
// if we are performing a DESC ICP scan and have no end_range
// to compare to stop using ICP filtering as there isn't much more
// that we can do without going through contortions with remembering
// and comparing key parts.
if (!end_range &&
direction < 0) {
cancel_pushed_idx_cond();
DEBUG_SYNC(ha_thd(), "tokudb_icp_desc_scan_invalidate");
}
error = TOKUDB_CURSOR_CONTINUE;
goto cleanup;
}
}
// at this point, if ICP is on, we have verified that the key is one
// we are interested in, so we proceed with placing the data
// into the range query buffer
if (need_val) {
if (unpack_entire_row) {
size_needed = 2*sizeof(uint32_t) + key->size + row->size;
} else {
// this is an upper bound
size_needed =
// size of key length
sizeof(uint32_t) +
// key and row
key->size + row->size +
// lengths of varchars stored
num_var_cols_for_query * (sizeof(uint32_t)) +
// length of blobs
sizeof(uint32_t);
}
} else {
size_needed = sizeof(uint32_t) + key->size;
}
if (size_remaining < size_needed) {
range_query_buff =
static_cast<uchar*>(tokudb::memory::realloc(
static_cast<void*>(range_query_buff),
bytes_used_in_range_query_buff + size_needed,
MYF(MY_WME)));
if (range_query_buff == NULL) {
error = ENOMEM;
invalidate_bulk_fetch();
goto cleanup;
}
size_range_query_buff = bytes_used_in_range_query_buff + size_needed;
}
//
// now we know we have the size, let's fill the buffer, starting with the key
//
curr_pos = range_query_buff + bytes_used_in_range_query_buff;
*reinterpret_cast<uint32_t*>(curr_pos) = key->size;
curr_pos += sizeof(uint32_t);
memcpy(curr_pos, key->data, key->size);
curr_pos += key->size;
if (need_val) {
if (unpack_entire_row) {
*reinterpret_cast<uint32_t*>(curr_pos) = row->size;
curr_pos += sizeof(uint32_t);
memcpy(curr_pos, row->data, row->size);
curr_pos += row->size;
} else {
// need to unpack just the data we care about
const uchar* fixed_field_ptr = static_cast<const uchar*>(row->data);
fixed_field_ptr += table_share->null_bytes;
const uchar* var_field_offset_ptr = NULL;
const uchar* var_field_data_ptr = NULL;
var_field_offset_ptr =
fixed_field_ptr +
share->kc_info.mcp_info[tokudb_active_index].fixed_field_size;
var_field_data_ptr =
var_field_offset_ptr +
share->kc_info.mcp_info[tokudb_active_index].len_of_offsets;
// first the null bytes
memcpy(curr_pos, row->data, table_share->null_bytes);
curr_pos += table_share->null_bytes;
// now the fixed fields
//
// first the fixed fields
//
for (uint32_t i = 0; i < num_fixed_cols_for_query; i++) {
uint field_index = fixed_cols_for_query[i];
memcpy(
curr_pos,
fixed_field_ptr + share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val,
share->kc_info.field_lengths[field_index]);
curr_pos += share->kc_info.field_lengths[field_index];
}
//
// now the var fields
//
for (uint32_t i = 0; i < num_var_cols_for_query; i++) {
uint field_index = var_cols_for_query[i];
uint32_t var_field_index =
share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val;
uint32_t data_start_offset;
uint32_t field_len;
get_var_field_info(
&field_len,
&data_start_offset,
var_field_index,
var_field_offset_ptr,
share->kc_info.num_offset_bytes);
memcpy(curr_pos, &field_len, sizeof(field_len));
curr_pos += sizeof(field_len);
memcpy(
curr_pos,
var_field_data_ptr + data_start_offset,
field_len);
curr_pos += field_len;
}
if (read_blobs) {
uint32_t blob_offset = 0;
uint32_t data_size = 0;
//
// now the blobs
//
get_blob_field_info(
&blob_offset,
share->kc_info.mcp_info[tokudb_active_index].len_of_offsets,
var_field_data_ptr,
share->kc_info.num_offset_bytes);
data_size =
row->size -
blob_offset -
static_cast<uint32_t>((var_field_data_ptr -
static_cast<const uchar*>(row->data)));
memcpy(curr_pos, &data_size, sizeof(data_size));
curr_pos += sizeof(data_size);
memcpy(curr_pos, var_field_data_ptr + blob_offset, data_size);
curr_pos += data_size;
}
}
}
bytes_used_in_range_query_buff = curr_pos - range_query_buff;
assert_always(bytes_used_in_range_query_buff <= size_range_query_buff);
//
// now determine if we should continue with the bulk fetch
// we want to stop under these conditions:
// - we overran the prelocked range
// - we are close to the end of the buffer
// - we have fetched an exponential amount of rows with
// respect to the bulk fetch iteration, which is initialized
// to 0 in index_init() and prelock_range().
rows_fetched_using_bulk_fetch++;
// if the iteration is less than the number of possible shifts on
// a 64 bit integer, check that we haven't exceeded this iterations
// row fetch upper bound.
if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
uint64_t row_fetch_upper_bound = 1LLU << bulk_fetch_iteration;
assert_always(row_fetch_upper_bound > 0);
if (rows_fetched_using_bulk_fetch >= row_fetch_upper_bound) {
error = 0;
goto cleanup;
}
}
if (bytes_used_in_range_query_buff +
table_share->rec_buff_length >
user_defined_size) {
error = 0;
goto cleanup;
}
if (direction > 0) {
// compare what we got to the right endpoint of prelocked range
// because we are searching keys in ascending order
if (prelocked_right_range_size == 0) {
error = TOKUDB_CURSOR_CONTINUE;
goto cleanup;
}
DBT right_range;
memset(&right_range, 0, sizeof(right_range));
right_range.size = prelocked_right_range_size;
right_range.data = prelocked_right_range;
int cmp = tokudb_cmp_dbt_key(
share->key_file[tokudb_active_index],
key,
&right_range);
error = (cmp > 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
} else {
// compare what we got to the left endpoint of prelocked range
// because we are searching keys in descending order
if (prelocked_left_range_size == 0) {
error = TOKUDB_CURSOR_CONTINUE;
goto cleanup;
}
DBT left_range;
memset(&left_range, 0, sizeof(left_range));
left_range.size = prelocked_left_range_size;
left_range.data = prelocked_left_range;
int cmp = tokudb_cmp_dbt_key(
share->key_file[tokudb_active_index],
key,
&left_range);
error = (cmp < 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
}
cleanup:
return error;
}
Bulk fetch buffer数据准备好了,我们就可以从read_data_from_range_query_buff读取数据了。 Curr_range_query_buff_offset表示当前读取的位置。 首先读key信息。如果need_value为true,还要读取data信息。可能读整行数据,也可能只需要读取函数set_query_columns设置的那些字段。 读取完成之后,调整curr_range_query_buff_offset指向下一次读取的位置。
int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val, bool do_key_read) {
// buffer has the next row, get it from there
int error;
uchar* curr_pos = range_query_buff+curr_range_query_buff_offset;
DBT curr_key;
memset((void *) &curr_key, 0, sizeof(curr_key));
// get key info
uint32_t key_size = *(uint32_t *)curr_pos;
curr_pos += sizeof(key_size);
uchar* curr_key_buff = curr_pos;
curr_pos += key_size;
curr_key.data = curr_key_buff;
curr_key.size = key_size;
// if this is a covering index, this is all we need
if (do_key_read) {
assert_always(!need_val);
extract_hidden_primary_key(tokudb_active_index, &curr_key);
read_key_only(buf, tokudb_active_index, &curr_key);
error = 0;
}
// we need to get more data
else {
DBT curr_val;
memset((void *) &curr_val, 0, sizeof(curr_val));
uchar* curr_val_buff = NULL;
uint32_t val_size = 0;
// in this case, we don't have a val, we are simply extracting the pk
if (!need_val) {
curr_val.data = curr_val_buff;
curr_val.size = val_size;
extract_hidden_primary_key(tokudb_active_index, &curr_key);
error = read_primary_key( buf, tokudb_active_index, &curr_val, &curr_key);
}
else {
extract_hidden_primary_key(tokudb_active_index, &curr_key);
// need to extract a val and place it into buf
if (unpack_entire_row) {
// get val info
val_size = *(uint32_t *)curr_pos;
curr_pos += sizeof(val_size);
curr_val_buff = curr_pos;
curr_pos += val_size;
curr_val.data = curr_val_buff;
curr_val.size = val_size;
error = unpack_row(buf,&curr_val, &curr_key, tokudb_active_index);
}
else {
if (!(hidden_primary_key && tokudb_active_index == primary_key)) {
unpack_key(buf,&curr_key,tokudb_active_index);
}
// read rows we care about
// first the null bytes;
memcpy(buf, curr_pos, table_share->null_bytes);
curr_pos += table_share->null_bytes;
// now the fixed sized rows
for (uint32_t i = 0; i < num_fixed_cols_for_query; i++) {
uint field_index = fixed_cols_for_query[i];
Field* field = table->field[field_index];
unpack_fixed_field(
buf + field_offset(field, table),
curr_pos,
share->kc_info.field_lengths[field_index]
);
curr_pos += share->kc_info.field_lengths[field_index];
}
// now the variable sized rows
for (uint32_t i = 0; i < num_var_cols_for_query; i++) {
uint field_index = var_cols_for_query[i];
Field* field = table->field[field_index];
uint32_t field_len = *(uint32_t *)curr_pos;
curr_pos += sizeof(field_len);
unpack_var_field(
buf + field_offset(field, table),
curr_pos,
field_len,
share->kc_info.length_bytes[field_index]
);
curr_pos += field_len;
}
// now the blobs
if (read_blobs) {
uint32_t blob_size = *(uint32_t *)curr_pos;
curr_pos += sizeof(blob_size);
error = unpack_blobs(
buf,
curr_pos,
blob_size,
true
);
curr_pos += blob_size;
if (error) {
invalidate_bulk_fetch();
goto exit;
}
}
error = 0;
}
}
}
curr_range_query_buff_offset = curr_pos - range_query_buff;
exit:
return error;
}
所有数据都读完之后,handler框架会调用index_end关闭cursor,并重置一些状态变量。
int ha_tokudb::index_end() {
range_lock_grabbed = false;
range_lock_grabbed_null = false;
if (cursor) {
int r = cursor->c_close(cursor);
assert_always(r==0);
cursor = NULL;
remove_from_trx_handler_list();
last_cursor_error = 0;
}
active_index = tokudb_active_index = MAX_KEY;
//
// reset query variables
//
unpack_entire_row = true;
read_blobs = true;
read_key = true;
num_fixed_cols_for_query = 0;
num_var_cols_for_query = 0;
invalidate_bulk_fetch();
invalidate_icp();
doing_bulk_fetch = false;
close_dsmrr();
TOKUDB_HANDLER_DBUG_RETURN(0);
}
这就是一条query语句在tokudb引擎执行的大致过程。下个月见!