Author: lichang
MySQL 8.0 使用 Iterator 作为查询执行框架的核心。Iterator 模式提供了统一的接口来处理各种数据访问和加工操作,使得查询执行引擎具有更好的可扩展性、可维护性。本文档基于8.0.43 详细分析 MySQL 中各种 Iterator 的实现、使用场景和执行流程。
| Iterator 类型 | 主要功能 | 典型 SQL 场景 | 性能特点 | MySQL 特性 |
|---|---|---|---|---|
| 基础访问类 | ||||
| TableScanIterator | 全表扫描 | SELECT * FROM table |
顺序I/O,适合小表或全表处理 | 支持记录缓冲区优化 |
| IndexScanIterator | 索引全扫描 | SELECT * FROM table ORDER BY indexed_col |
有序读取,支持覆盖扫描 | 模板化支持正反向扫描 |
| SortBufferIterator | 内存排序结果 | ORDER BY (内存够用时) |
快速内存访问 | 支持压缩addon fields |
| SortFileIterator | 外部排序结果 | ORDER BY (大数据量) |
磁盘排序,适合大数据 | 文件缓冲优化 |
| 复合操作类 | ||||
| FilterIterator | 条件过滤 | WHERE condition |
减少数据传输 | 简单高效的条件评估 |
| LimitOffsetIterator | 结果限制 | LIMIT 10 OFFSET 20 |
早期终止,节省资源 | 支持计数所有行模式 |
| SortingIterator | 数据排序 | ORDER BY col1, col2 |
外部排序,支持大数据 | Filesort集成 |
| HashJoinIterator | 哈希连接 | t1 JOIN t2 ON t1.id = t2.id |
适合大表连接 | 混合哈希连接,内存溢出保护 |
| NestedLoopIterator | 嵌套循环连接 | t1 JOIN t2 (小表场景) |
简单实现,适合小数据 | 支持各种JOIN类型 |
| AggregateIterator | 聚合操作 | GROUP BY, COUNT, SUM |
流式聚合,无需物化 | 支持ROLLUP |
| MaterializeIterator | 结果物化 | 子查询、CTE | 避免重复计算 | 支持递归CTE |
| 特别场景优化 | ||||
| WindowIterator | 窗口函数 | ROW_NUMBER() OVER (...) |
复杂分析计算 | 多窗口优化 |
| UnqualifiedCountIterator | COUNT(*)优化 | SELECT COUNT(*) FROM table |
存储引擎级别优化 | 快速计数路径 |
| WeedoutIterator | 半连接去重 | 半连接优化 | 流式去重 | 使用临时表记录row ID |
| FollowTailIterator | 递归查询 | WITH RECURSIVE ... |
迭代计算 | 支持迭代次数限制 |
-- 示例1: 简单查询
SELECT * FROM orders WHERE order_date > '2023-01-01' LIMIT 100;
Iterator 链: LimitOffsetIterator → FilterIterator → TableScanIterator
-- 示例2: 连接查询
SELECT o.*, c.name
FROM orders o JOIN customers c ON o.customer_id = c.id
WHERE c.region = 'Asia';
Iterator 链: NestedLoopIterator → FilterIterator + TableScanIterator (orders) + FilterIterator → IndexScanIterator (customers)
-- 示例3: 聚合查询
SELECT customer_id, SUM(amount) as total
FROM orders
GROUP BY customer_id
HAVING SUM(amount) > 1000
ORDER BY total DESC;
Iterator 链: SortingIterator → FilterIterator (HAVING) → AggregateIterator → TableScanIterator
-- 示例4: 窗口函数查询
SELECT *, ROW_NUMBER() OVER (PARTITION BY region ORDER BY amount DESC) as rn
FROM sales;
Iterator 链: WindowIterator → SortingIterator → TableScanIterator
位置: /sql/iterators/row_iterator.h
核心接口定义:
class RowIterator {
public:
explicit RowIterator(THD *thd) : m_thd(thd) {}
virtual ~RowIterator() = default;
// 核心接口方法
virtual bool Init() = 0; // 初始化或重新初始化
virtual int Read() = 0; // 读取下一行,返回0成功,-1EOF,1错误
virtual void SetNullRowFlag(bool is_null_row) = 0; // 设置NULL行标志
virtual void UnlockRow() = 0; // 解锁当前行
// 性能监控接口
virtual void StartPSIBatchMode() {}
virtual void EndPSIBatchModeIfStarted() {}
virtual const IteratorProfiler *GetProfiler() const { return nullptr; }
protected:
THD *thd() const { return m_thd; }
private:
THD *const m_thd;
};
设计特点:
位置: /sql/iterators/basic_row_iterators.h
主要结构变量:
class TableScanIterator final : public TableRowIterator {
private:
uchar *const m_record; // 记录缓冲区
const double m_expected_rows; // 预期行数
ha_rows *const m_examined_rows; // 检查行数统计
ulonglong m_remaining_dups{0}; // 剩余重复行数(INTERSECT/EXCEPT)
const ha_rows m_limit_rows; // 行数限制
ha_rows m_stored_rows{0}; // 已存储行数
};
核心代码分析:
初始化方法 (basic_row_iterators.cc):
bool TableScanIterator::Init() {
// 初始化随机访问(全表扫描)
int error = table()->file->ha_rnd_init(1);
if (error) {
PrintError(error);
return true;
}
// 设置记录缓冲区(性能优化)
if (m_expected_rows > 0) {
table()->file->extra(HA_EXTRA_CACHE);
}
return false;
}
读取方法:
int TableScanIterator::Read() {
int tmp;
while ((tmp = table()->file->ha_rnd_next(m_record))) {
// MyISAM 特殊处理:处理并发删除情况
if (tmp == HA_ERR_RECORD_DELETED && !thd()->killed) continue;
return HandleError(tmp);
}
// 统计检查行数
if (m_examined_rows != nullptr) {
++*m_examined_rows;
}
// 处理INTERSECT/EXCEPT的重复行逻辑
if (m_remaining_dups > 0) {
--m_remaining_dups;
return 0; // 返回相同行的副本
}
return 0; // 成功读取一行
}
操作解释:
ha_rnd_init()和ha_rnd_next()与存储引擎交互使用场景:
位置: /sql/iterators/basic_row_iterators.h
主要结构变量:
template <bool Reverse>
class IndexScanIterator final : public TableRowIterator {
private:
uchar *const m_record; // 记录缓冲区
const int m_idx; // 索引号
const bool m_use_order; // 是否需要有序
const double m_expected_rows; // 预期行数
ha_rows *const m_examined_rows; // 检查行数
bool m_first = true; // 是否第一次读取
};
核心代码分析:
初始化方法:
bool IndexScanIterator<Reverse>::Init() {
// 检查索引是否覆盖所需列,启用covering index scan
if (table()->covering_keys.is_set(m_idx) && !table()->no_keyread) {
table()->set_keyread(true);
}
// 初始化索引扫描
int error = table()->file->ha_index_init(m_idx, m_use_order);
if (error) {
PrintError(error);
return true;
}
m_first = true;
return false;
}
正向读取方法:
template <>
int IndexScanIterator<false>::Read() { // Forward scan
int error;
if (m_first) {
error = table()->file->ha_index_first(m_record);
m_first = false;
} else {
error = table()->file->ha_index_next(m_record);
}
if (error) return HandleError(error);
if (m_examined_rows != nullptr) ++*m_examined_rows;
return 0;
}
反向读取方法:
template <>
int IndexScanIterator<true>::Read() { // Backward scan
int error;
if (m_first) {
error = table()->file->ha_index_last(m_record);
m_first = false;
} else {
error = table()->file->ha_index_prev(m_record);
}
if (error) return HandleError(error);
if (m_examined_rows != nullptr) ++*m_examined_rows;
return 0;
}
操作解释:
use_order参数控制是否需要保持索引顺序使用场景:
位置: /sql/iterators/composite_iterators.h
主要结构变量:
class FilterIterator final : public RowIterator {
private:
unique_ptr_destroy_only<RowIterator> m_source; // 源Iterator
Item *m_condition; // 过滤条件
};
核心代码分析:
读取方法 (composite_iterators.cc):
int FilterIterator::Read() {
for (;;) {
// 从源Iterator读取下一行
int err = m_source->Read();
if (err != 0) return err; // 错误或EOF
// 评估过滤条件
bool matched = m_condition->val_int();
// 检查是否被用户中断
if (thd()->killed) {
thd()->send_kill_message();
return 1;
}
// 检查条件评估是否出错
if (thd()->is_error()) return 1;
// 条件不匹配,释放行锁并继续读取下一行
if (!matched) {
m_source->UnlockRow();
continue;
}
// 条件匹配,返回该行
return 0;
}
}
操作解释:
UnlockRow()释放锁使用场景:
位置: /sql/iterators/hash_join_iterator.h
主要结构变量:
class HashJoinIterator final : public RowIterator {
private:
// Build 和 Probe 侧的迭代器
unique_ptr_destroy_only<RowIterator> m_build_iterator;
unique_ptr_destroy_only<RowIterator> m_probe_iterator;
// 哈希表和缓冲区管理
hash_join_buffer::HashJoinBuffer m_hash_map;
std::vector<HashJoinChunk> m_build_chunks;
std::vector<HashJoinChunk> m_probe_chunks;
// 连接条件和状态
std::vector<Item_eq_base *> m_join_conditions;
std::vector<Item *> m_extra_conditions;
JoinType m_join_type;
bool m_allow_spill_to_disk;
// 状态机管理
enum class State {
LOADING_NEXT_CHUNK_PAIR, // 加载下一个chunk对
READING_ROW_FROM_PROBE_ITERATOR, // 从probe iterator读取
READING_ROW_FROM_PROBE_CHUNK_FILE, // 从probe chunk文件读取
READING_FIRST_ROW_FROM_HASH_TABLE, // 从哈希表读取第一行
READING_FROM_HASH_TABLE // 从哈希表读取后续行
} m_state;
};
操作解释:
使用场景:
位置: /sql/iterators/composite_iterators.h
主要结构变量:
class NestedLoopIterator final : public RowIterator {
private:
enum {
NEEDS_OUTER_ROW, // 需要外表行
READING_FIRST_INNER_ROW, // 读取内表第一行
READING_INNER_ROWS, // 读取内表后续行
END_OF_ROWS // 行结束
} m_state;
unique_ptr_destroy_only<RowIterator> const m_source_outer;
unique_ptr_destroy_only<RowIterator> const m_source_inner;
const JoinType m_join_type;
const bool m_pfs_batch_mode; // 是否启用性能监控批模式
};
操作解释:
位置: /sql/iterators/sorting_iterator.h
主要结构变量:
class SortingIterator final : public RowIterator {
private:
Filesort *m_filesort; // 文件排序对象
unique_ptr_destroy_only<RowIterator> m_source_iterator; // 源迭代器
unique_ptr_destroy_only<RowIterator> m_result_iterator; // 结果迭代器
Filesort_info m_fs_info; // 文件排序信息
Sort_result m_sort_result; // 排序结果
const ha_rows m_num_rows_estimate; // 预估行数
const table_map m_tables_to_get_rowid_for; // 需要获取rowid的表
ha_rows *m_examined_rows; // 检查的行数
// 联合体存储不同类型的结果迭代器
union IteratorHolder {
SortBufferIterator<true> sort_buffer_packed_addons; // 压缩addon字段
SortBufferIterator<false> sort_buffer; // 普通内存缓冲
SortBufferIndirectIterator sort_buffer_indirect; // 间接内存缓冲
SortFileIterator<true> sort_file_packed_addons; // 压缩文件排序
SortFileIterator<false> sort_file; // 普通文件排序
SortFileIndirectIterator sort_file_indirect; // 间接文件排序
} m_result_iterator_holder;
};
核心代码分析:
初始化方法:
bool SortingIterator::Init() {
// 初始化源迭代器
if (m_source_iterator->Init()) {
return true;
}
// 执行实际排序
return DoSort();
}
排序执行方法:
int SortingIterator::DoSort() {
// 从源迭代器收集所有数据进行排序
if (filesort(thd(), table, m_filesort, /*sort_positions=*/true,
examined_rows, &found_rows, &sort_result)) {
return true;
}
// 根据排序结果创建相应的结果迭代器
if (m_sort_result.has_filesort_result_in_memory()) {
if (m_sort_result.using_addon_fields()) {
// 使用addon字段的内存排序
m_result_iterator = &m_result_iterator_holder.sort_buffer_packed_addons;
} else {
// 普通内存排序
m_result_iterator = &m_result_iterator_holder.sort_buffer;
}
} else {
// 文件排序
if (m_sort_result.using_addon_fields()) {
m_result_iterator = &m_result_iterator_holder.sort_file_packed_addons;
} else {
m_result_iterator = &m_result_iterator_holder.sort_file;
}
}
return m_result_iterator->Init();
}
操作解释:
使用场景:
位置: /sql/iterators/composite_iterators.h
主要结构变量:
class AggregateIterator final : public RowIterator {
private:
enum {
READING_FIRST_ROW, // 读取第一行
LAST_ROW_STARTED_NEW_GROUP, // 最后一行开始新组
OUTPUTTING_ROLLUP_ROWS, // 输出ROLLUP行
DONE_OUTPUTTING_ROWS // 完成输出
} m_state;
unique_ptr_destroy_only<RowIterator> m_source; // 源迭代器
JOIN *m_join = nullptr; // 关联的JOIN对象
bool m_seen_eof; // 是否已到EOF
table_map m_save_nullinfo; // 保存的NULL信息
const bool m_rollup; // 是否是ROLLUP查询
// ROLLUP相关变量
int m_last_unchanged_group_item_idx; // 最后未变化的分组项索引
int m_current_rollup_position; // 当前ROLLUP位置
pack_rows::TableCollection m_tables; // 表集合
String m_first_row_this_group; // 当前组的第一行
String m_first_row_next_group; // 下一组的第一行
int m_output_slice = -1; // 输出切片
};
核心代码分析:
读取方法 (composite_iterators.cc):
int AggregateIterator::Read() {
switch (m_state) {
case READING_FIRST_ROW:
// 读取并处理第一行
if (m_source->Read() != 0) {
return -1; // 空结果集
}
// 保存第一行并初始化聚合函数
StoreFromTableBuffers(m_tables, &m_first_row_this_group);
init_tmptable_sum_functions(m_join->sum_funcs);
m_state = LAST_ROW_STARTED_NEW_GROUP;
return Read(); // 递归调用处理
case LAST_ROW_STARTED_NEW_GROUP:
// 处理分组变化
while (m_source->Read() == 0) {
if (compare_group_fields()) {
// 分组发生变化
StoreFromTableBuffers(m_tables, &m_first_row_next_group);
break;
}
// 在同一组内,更新聚合函数
update_tmptable_sum_func(m_join->sum_funcs);
}
// 恢复当前组的第一行并输出聚合结果
LoadIntoTableBuffers(m_tables, m_first_row_this_group);
if (m_rollup) {
m_state = OUTPUTTING_ROLLUP_ROWS;
SetRollupLevel(m_join->send_group_parts);
} else {
m_state = READING_FIRST_ROW;
// 准备处理下一组
m_first_row_this_group = std::move(m_first_row_next_group);
}
return 0;
case OUTPUTTING_ROLLUP_ROWS:
// 输出ROLLUP的各级聚合
if (--m_current_rollup_position >= m_last_unchanged_group_item_idx) {
SetRollupLevel(m_current_rollup_position);
return 0;
}
m_state = READING_FIRST_ROW;
return Read();
case DONE_OUTPUTTING_ROWS:
return -1;
}
}
ROLLUP处理:
void AggregateIterator::SetRollupLevel(int level) {
// 设置GROUP BY项为NULL(从level开始)
for (int i = level; i < m_join->send_group_parts; ++i) {
m_join->group_fields[i]->set_null();
}
// 更新聚合函数的ROLLUP状态
for (Item_sum **func = m_join->sum_funcs; *func; ++func) {
(*func)->set_aggregator_level(level);
}
}
操作解释:
关键特性:
使用场景:
位置: /sql/iterators/composite_iterators.cc
主要结构变量:
template <typename Profiler>
class MaterializeIterator final : public TableRowIterator {
private:
// 需要物化的查询块列表
Mem_root_array<materialize_iterator::QueryBlock> m_query_blocks_to_materialize;
// 用于扫描物化后临时表的迭代器
unique_ptr_destroy_only<RowIterator> m_table_iterator;
Common_table_expr *m_cte; // CTE指针(如果是CTE物化)
Query_expression *m_query_expression; // 查询表达式
JOIN *const m_join; // 关联的JOIN对象
const int m_ref_slice; // 引用切片
const bool m_rematerialize; // 是否需要重新物化
const bool m_reject_multiple_rows; // 是否拒绝多行(标量子查询检查)
const ha_rows m_limit_rows; // 行数限制
// 缓存失效检测器列表
struct Invalidator {
const CacheInvalidatorIterator *iterator;
int64_t generation_at_last_materialize;
};
Mem_root_array<Invalidator> m_invalidators;
Profiler m_profiler; // 性能分析器
};
查询块结构:
struct QueryBlock {
unique_ptr_destroy_only<RowIterator> subquery_iterator; // 子查询迭代器
int select_number; // SELECT编号(用于跟踪)
JOIN *join; // 关联的JOIN
bool disable_deduplication_by_hash_field = false; // 是否禁用哈希去重
bool copy_items; // 是否复制字段
// 集合操作相关
ulonglong m_total_operands{0}; // 操作数总数
ulonglong m_operand_idx{0}; // 当前操作数索引
uint m_first_distinct{0}; // 第一个DISTINCT操作数
Temp_table_param *temp_table_param; // 临时表参数
// 递归CTE相关
bool is_recursive_reference = false; // 是否为递归引用
FollowTailIterator *recursive_reader = nullptr; // 递归读取器
};
核心代码分析:
初始化方法:
bool MaterializeIterator<Profiler>::Init() {
// 检查是否需要重新物化
if (!m_rematerialize && table()->materialized) {
return m_table_iterator->Init();
}
// 检查CTE是否已被其他地方物化
if (m_cte != nullptr && m_cte->tmp_result != nullptr) {
table()->m_materialize_info = m_cte->tmp_result;
return m_table_iterator->Init();
}
// 检查缓存失效器
for (const Invalidator &invalidator : m_invalidators) {
if (invalidator.iterator->generation() !=
invalidator.generation_at_last_materialize) {
// 缓存已失效,需要重新物化
need_rematerialize = true;
break;
}
}
if (!need_rematerialize) {
return m_table_iterator->Init();
}
// 执行物化
if (MaterializeQueryBlocks()) {
return true;
}
return m_table_iterator->Init();
}
物化执行方法:
bool MaterializeIterator<Profiler>::MaterializeQueryBlocks() {
// 清空临时表
empty_record(table());
table()->file->ha_delete_all_rows();
for (const auto &query_block : m_query_blocks_to_materialize) {
if (MaterializeQueryBlock(query_block)) {
return true;
}
}
table()->materialized = true;
return false;
}
单个查询块物化:
bool MaterializeIterator<Profiler>::MaterializeQueryBlock(
const materialize_iterator::QueryBlock &query_block) {
// 初始化子查询迭代器
if (query_block.subquery_iterator->Init()) {
return true;
}
ha_rows stored_rows = 0;
// 逐行读取并存储到临时表
PFSBatchMode batch_mode(query_block.subquery_iterator.get());
for (;;) {
int error = query_block.subquery_iterator->Read();
if (error > 0) return true; // 错误
if (error < 0) break; // EOF
// 复制字段值
if (query_block.copy_items) {
if (copy_fields(query_block.temp_table_param, thd())) {
return true;
}
}
// 处理集合操作(UNION/INTERSECT/EXCEPT)
if (query_block.m_total_operands > 1) {
if (HandleSetOperation(query_block)) {
return true;
}
}
// 检查唯一性约束(UNION DISTINCT)
if (!query_block.disable_deduplication_by_hash_field) {
if (check_unique_constraint(table())) {
continue; // 重复行,跳过
}
}
// 写入临时表
if ((error = table()->file->ha_write_row(table()->record[0]))) {
return report_handler_error(table(), error);
}
++stored_rows;
// 检查行数限制
if (m_limit_rows != HA_POS_ERROR && stored_rows >= m_limit_rows) {
break;
}
}
return false;
}
操作解释:
使用场景:
位置: /sql/iterators/window_iterators.h
主要结构变量:
class WindowIterator final : public RowIterator {
private:
/// 源迭代器
unique_ptr_destroy_only<RowIterator> const m_source;
/// 临时表参数(包含窗口信息)
Temp_table_param *m_temp_table_param;
/// 窗口函数对象
Window *m_window;
/// 关联的JOIN对象
JOIN *m_join;
/// 读取行时使用的切片
int m_input_slice;
/// 输出行时使用的切片
int m_output_slice;
};
BufferingWindowIterator结构:
class BufferingWindowIterator final : public RowIterator {
private:
unique_ptr_destroy_only<RowIterator> const m_source;
Temp_table_param *m_temp_table_param;
Window *m_window;
JOIN *m_join;
int m_input_slice;
int m_output_slice;
// 缓冲区相关
bool m_possibly_buffered_rows = false; // 是否有缓冲行
bool m_last_input_row_started_new_partition; // 最后输入行是否开始新分区
bool m_eof = false; // 是否到达EOF
};
核心代码分析:
WindowIterator读取方法:
int WindowIterator::Read() {
// 从源迭代器读取行
const int result = m_source->Read();
if (result != 0) {
return result;
}
// 切换到输入切片进行字段复制
Switch_ref_item_slice slice_switch(m_join, m_input_slice);
// 复制字段到临时表
if (copy_fields(m_temp_table_param, thd())) {
return 1;
}
// 复制函数结果(包括窗口函数)
if (copy_funcs(m_temp_table_param, thd(), CFT_HAS_WF)) {
return 1;
}
// 在窗口边界重置窗口函数
if (m_window->needs_card() && m_window->check_partition_boundary()) {
m_window->reset_all_wf_state();
}
// 切换到输出切片
Switch_ref_item_slice output_slice_switch(m_join, m_output_slice);
return 0;
}
BufferingWindowIterator读取方法:
int BufferingWindowIterator::Read() {
for (;;) {
// 如果有缓冲行需要处理
if (m_possibly_buffered_rows) {
// 处理缓冲的窗口记录
bool output_row_ready = false;
if (process_buffered_windowing_record(
thd(), m_temp_table_param, &output_row_ready)) {
return 1;
}
if (output_row_ready) {
// 行已准备好输出
return 0;
}
if (m_eof) {
return -1; // 所有行都已处理完毕
}
}
// 读取下一行进行缓冲
int result = m_source->Read();
if (result != 0) {
if (result < 0) {
m_eof = true;
// 继续处理剩余的缓冲行
continue;
}
return result; // 错误
}
// 复制字段和函数到临时表
Switch_ref_item_slice slice_switch(m_join, m_input_slice);
if (copy_fields(m_temp_table_param, thd())) {
return 1;
}
if (copy_funcs(m_temp_table_param, thd(), CFT_HAS_NO_WF)) {
return 1;
}
// 缓冲当前行
if (buffer_windowing_record(thd(), m_temp_table_param,
&m_last_input_row_started_new_partition)) {
return 1;
}
m_possibly_buffered_rows = true;
}
}
窗口函数执行流程:
操作解释:
使用场景:
位置: /sql/iterators/basic_row_iterators.h
主要结构变量:
class UnqualifiedCountIterator final : public RowIterator {
private:
bool m_has_row; // 是否有行要返回
JOIN *const m_join; // 关联的JOIN对象
};
核心代码分析:
初始化方法:
bool UnqualifiedCountIterator::Init() {
m_has_row = true; // 标记有一行结果要返回
return false;
}
读取方法 (basic_row_iterators.cc):
int UnqualifiedCountIterator::Read() {
if (m_has_row) {
m_has_row = false;
// 获取第一个表(通常是要计数的主表)
TABLE *table = m_join->qep_tab[m_join->const_tables].table();
// 调用存储引擎的快速计数接口
ha_rows count;
int error = table->file->ha_records(&count);
if (error) {
// 快速计数失败,报告错误
return HandleError(error);
}
// 将计数结果存储到相应的字段中
// 通常是COUNT(*)函数的result_field
for (Item_sum **sum_func = m_join->sum_funcs; *sum_func; ++sum_func) {
if ((*sum_func)->sum_func() == Item_sum::COUNT_FUNC) {
(*sum_func)->make_const(static_cast<longlong>(count));
break;
}
}
return 0; // 成功返回一行
} else {
return -1; // EOF,没有更多行
}
}
操作解释:
ha_records()接口获取行数统计使用场景:
SELECT COUNT(*) FROM table (无WHERE条件)位置: /sql/iterators/composite_iterators.h
主要结构变量:
class WeedoutIterator final : public RowIterator {
private:
unique_ptr_destroy_only<RowIterator> m_source; // 源迭代器
SJ_TMP_TABLE *m_sj; // 半连接临时表
const table_map m_tables_to_get_rowid_for; // 需要获取row ID的表映射
};
SJ_TMP_TABLE结构:
struct SJ_TMP_TABLE {
TABLE *tmp_table; // 临时表用于存储row ID
SJ_TMP_TABLE_TAB *tabs; // 表信息数组
SJ_TMP_TABLE_TAB *tabs_end; // 表信息数组结束
uint null_bits; // NULL位数
uint null_bytes; // NULL字节数
uint rowid_len; // Row ID长度
ha_rows row_count; // 行计数
};
核心代码分析:
读取方法 (composite_iterators.cc):
int WeedoutIterator::Read() {
for (;;) {
// 从源迭代器读取下一行
int ret = m_source->Read();
if (ret != 0) {
return ret; // 错误或EOF
}
// 获取相关表的row ID
for (SJ_TMP_TABLE_TAB *tab = m_sj->tabs; tab != m_sj->tabs_end; ++tab) {
TABLE *table = tab->qep_tab->table();
if ((m_tables_to_get_rowid_for & table->pos_in_table_list->map()) &&
can_call_position(table)) {
// 获取当前行的position(row ID)
table->file->position(table->record[0]);
}
}
// 执行去重检查
ret = do_sj_dups_weedout(thd(), m_sj);
if (ret == -1) {
return 1; // 错误
}
if (ret == 0) {
// 不是重复行,返回该行
return 0;
}
// 是重复行,跳过并读取下一行
}
}
去重检查方法 (sql_executor.cc):
int do_sj_dups_weedout(THD *thd, SJ_TMP_TABLE *sjtbl) {
// 构建包含所有相关row ID的键
uchar *ptr = sjtbl->tmp_table->record[0] + 1; // 跳过NULL字节
// 复制每个表的row ID到临时表记录中
for (SJ_TMP_TABLE_TAB *tab = sjtbl->tabs; tab != sjtbl->tabs_end; ++tab) {
if (tab->rowid_offset) {
memcpy(ptr + tab->rowid_offset, tab->qep_tab->table()->file->ref,
tab->qep_tab->table()->file->ref_length);
}
}
// 尝试在临时表中插入这个row ID组合
int error = sjtbl->tmp_table->file->ha_write_row(sjtbl->tmp_table->record[0]);
if (error == 0) {
return 0; // 插入成功,是新行
} else if (error == HA_ERR_FOUND_DUPP_KEY) {
return 1; // 重复键,是重复行
} else {
return -1; // 其他错误
}
}
操作解释:
使用场景:
SELECT * FROM t1 WHERE a IN (SELECT b FROM t2) 的半连接优化位置: /sql/iterators/basic_row_iterators.h
主要结构变量:
class FollowTailIterator final : public TableRowIterator {
private:
bool m_inited = false; // 是否已初始化
uchar *const m_record; // 记录缓冲区
const double m_expected_rows; // 预期行数
ha_rows *const m_examined_rows; // 检查行数
ha_rows m_read_rows; // 已读取行数
ha_rows m_end_of_current_iteration; // 当前迭代结束行数
unsigned m_recursive_iteration_count; // 递归迭代计数
ha_rows *m_stored_rows = nullptr; // 指向MaterializeIterator的存储行数
};
核心代码分析:
初始化方法 (basic_row_iterators.cc):
bool FollowTailIterator::Init() {
if (!m_inited) {
// 第一次初始化,开始表扫描
if (table()->file->ha_rnd_init(1)) {
return true;
}
m_inited = true;
m_read_rows = 0;
m_end_of_current_iteration = *m_stored_rows;
m_recursive_iteration_count = 0;
}
// 后续调用不重置游标位置,继续从当前位置读取
return false;
}
读取方法 (basic_row_iterators.cc):
int FollowTailIterator::Read() {
if (m_read_rows == *m_stored_rows) {
// 已读取所有当前存储的行,返回EOF等待更多行
return -1;
}
// 检查递归迭代限制
if (m_read_rows >= m_end_of_current_iteration) {
// 开始新的迭代
m_end_of_current_iteration = *m_stored_rows;
++m_recursive_iteration_count;
// 检查迭代次数限制(防止无限递归)
if (m_recursive_iteration_count > thd()->variables.cte_max_recursion_depth) {
my_error(ER_CTE_MAX_RECURSION_DEPTH, MYF(0),
thd()->variables.cte_max_recursion_depth);
return 1;
}
}
// 读取下一行
int error = table()->file->ha_rnd_next(m_record);
if (error) {
if (error == HA_ERR_END_OF_FILE) {
// 暂时到达文件末尾,等待更多数据
return -1;
}
return HandleError(error);
}
++m_read_rows;
if (m_examined_rows != nullptr) {
++*m_examined_rows;
}
return 0;
}
溢出到磁盘处理:
bool FollowTailIterator::RepositionCursorAfterSpillToDisk() {
// 表从MEMORY转换为InnoDB后重新定位游标
if (table()->file->ha_rnd_init(1)) {
return true;
}
// 跳过已读取的行
for (ha_rows i = 0; i < m_read_rows; ++i) {
int error = table()->file->ha_rnd_next(m_record);
if (error) {
return true;
}
}
return false;
}
操作解释:
WITH RECURSIVE执行流程:
使用场景:
WITH RECURSIVE cte AS (...) 递归公用表表达式// 在最内层表启用批处理模式减少性能监控开销
virtual void StartPSIBatchMode() {}
virtual void EndPSIBatchModeIfStarted() {}
virtual void UnlockRow() = 0; // 释放不需要的行锁
考虑以下 SQL 查询:
SELECT o.order_id, c.customer_name, SUM(oi.amount)
FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id
INNER JOIN order_items oi ON o.order_id = oi.order_id
WHERE o.order_date >= '2023-01-01'
AND c.region = 'Asia'
GROUP BY o.order_id, c.customer_name
HAVING SUM(oi.amount) > 1000
ORDER BY SUM(oi.amount) DESC
LIMIT 100;
Iterator 执行树结构:
执行过程详解:
customers表通过region索引扫描,应用region过滤orders表全表扫描,应用日期过滤order_items表通过order_id索引进行连接扫描AggregateIterator按GROUP BY字段分组聚合FilterIterator应用HAVING条件SortingIterator按SUM(amount)降序排序LimitOffsetIterator限制输出100行MySQL 的 Iterator 具有以下特点:
这种设计使得 MySQL 能够高效执行各种复杂查询,同时保持良好的代码可维护性和扩展性。