数据库内核月报

数据库内核月报 - 2025 / 09

MySQL 8.0 SQL Iterator

Author: lichang

概述

MySQL 8.0 使用 Iterator 作为查询执行框架的核心。Iterator 模式提供了统一的接口来处理各种数据访问和加工操作,使得查询执行引擎具有更好的可扩展性、可维护性。本文档基于8.0.43 详细分析 MySQL 中各种 Iterator 的实现、使用场景和执行流程。

Iterator 概览

Iterator 使用场景对照表

Iterator 类型 主要功能 典型 SQL 场景 性能特点 MySQL 特性
基础访问类        
TableScanIterator 全表扫描 SELECT * FROM table 顺序I/O,适合小表或全表处理 支持记录缓冲区优化
IndexScanIterator 索引全扫描 SELECT * FROM table ORDER BY indexed_col 有序读取,支持覆盖扫描 模板化支持正反向扫描
SortBufferIterator 内存排序结果 ORDER BY (内存够用时) 快速内存访问 支持压缩addon fields
SortFileIterator 外部排序结果 ORDER BY (大数据量) 磁盘排序,适合大数据 文件缓冲优化
复合操作类        
FilterIterator 条件过滤 WHERE condition 减少数据传输 简单高效的条件评估
LimitOffsetIterator 结果限制 LIMIT 10 OFFSET 20 早期终止,节省资源 支持计数所有行模式
SortingIterator 数据排序 ORDER BY col1, col2 外部排序,支持大数据 Filesort集成
HashJoinIterator 哈希连接 t1 JOIN t2 ON t1.id = t2.id 适合大表连接 混合哈希连接,内存溢出保护
NestedLoopIterator 嵌套循环连接 t1 JOIN t2 (小表场景) 简单实现,适合小数据 支持各种JOIN类型
AggregateIterator 聚合操作 GROUP BY, COUNT, SUM 流式聚合,无需物化 支持ROLLUP
MaterializeIterator 结果物化 子查询、CTE 避免重复计算 支持递归CTE
特别场景优化        
WindowIterator 窗口函数 ROW_NUMBER() OVER (...) 复杂分析计算 多窗口优化
UnqualifiedCountIterator COUNT(*)优化 SELECT COUNT(*) FROM table 存储引擎级别优化 快速计数路径
WeedoutIterator 半连接去重 半连接优化 流式去重 使用临时表记录row ID
FollowTailIterator 递归查询 WITH RECURSIVE ... 迭代计算 支持迭代次数限制

SQL 到 Iterator 映射示例

-- 示例1: 简单查询
SELECT * FROM orders WHERE order_date > '2023-01-01' LIMIT 100;

Iterator 链: LimitOffsetIteratorFilterIteratorTableScanIterator

-- 示例2: 连接查询
SELECT o.*, c.name 
FROM orders o JOIN customers c ON o.customer_id = c.id 
WHERE c.region = 'Asia';

Iterator 链: NestedLoopIteratorFilterIterator + TableScanIterator (orders) + FilterIteratorIndexScanIterator (customers)

-- 示例3: 聚合查询
SELECT customer_id, SUM(amount) as total
FROM orders 
GROUP BY customer_id 
HAVING SUM(amount) > 1000
ORDER BY total DESC;

Iterator 链: SortingIteratorFilterIterator (HAVING) → AggregateIteratorTableScanIterator

-- 示例4: 窗口函数查询
SELECT *, ROW_NUMBER() OVER (PARTITION BY region ORDER BY amount DESC) as rn
FROM sales;

Iterator 链: WindowIteratorSortingIteratorTableScanIterator

核心 Iterator 类型详细分析

1. 基础访问 Iterator

1.1 RowIterator - 基类接口

位置: /sql/iterators/row_iterator.h

核心接口定义:

class RowIterator {
public:
  explicit RowIterator(THD *thd) : m_thd(thd) {}
  virtual ~RowIterator() = default;

  // 核心接口方法
  virtual bool Init() = 0;           // 初始化或重新初始化
  virtual int Read() = 0;            // 读取下一行,返回0成功,-1EOF,1错误
  virtual void SetNullRowFlag(bool is_null_row) = 0;  // 设置NULL行标志
  virtual void UnlockRow() = 0;      // 解锁当前行

  // 性能监控接口
  virtual void StartPSIBatchMode() {}
  virtual void EndPSIBatchModeIfStarted() {}
  virtual const IteratorProfiler *GetProfiler() const { return nullptr; }

protected:
  THD *thd() const { return m_thd; }
private:
  THD *const m_thd;
};

设计特点:

  1. 统一接口: 所有迭代器都继承自RowIterator,提供一致的操作接口
  2. 状态管理: 通过Init()重置状态,Read()执行迭代
  3. 错误处理: 标准化的返回值约定
  4. 性能监控: 内置Performance Schema批处理模式支持

1.2 TableScanIterator - 全表扫描

位置: /sql/iterators/basic_row_iterators.h

主要结构变量:

class TableScanIterator final : public TableRowIterator {
private:
  uchar *const m_record;              // 记录缓冲区
  const double m_expected_rows;       // 预期行数
  ha_rows *const m_examined_rows;     // 检查行数统计
  ulonglong m_remaining_dups{0};      // 剩余重复行数(INTERSECT/EXCEPT)
  const ha_rows m_limit_rows;         // 行数限制
  ha_rows m_stored_rows{0};           // 已存储行数
};

核心代码分析:

初始化方法 (basic_row_iterators.cc):

bool TableScanIterator::Init() {
  // 初始化随机访问(全表扫描)
  int error = table()->file->ha_rnd_init(1);
  if (error) {
    PrintError(error);
    return true;
  }
  
  // 设置记录缓冲区(性能优化)
  if (m_expected_rows > 0) {
    table()->file->extra(HA_EXTRA_CACHE);
  }
  
  return false;
}

读取方法:

int TableScanIterator::Read() {
  int tmp;
  while ((tmp = table()->file->ha_rnd_next(m_record))) {
    // MyISAM 特殊处理:处理并发删除情况
    if (tmp == HA_ERR_RECORD_DELETED && !thd()->killed) continue;
    return HandleError(tmp);
  }
  
  // 统计检查行数
  if (m_examined_rows != nullptr) {
    ++*m_examined_rows;
  }
  
  // 处理INTERSECT/EXCEPT的重复行逻辑
  if (m_remaining_dups > 0) {
    --m_remaining_dups;
    return 0;  // 返回相同行的副本
  }
  
  return 0;  // 成功读取一行
}

操作解释:

  1. 存储引擎接口: 通过ha_rnd_init()ha_rnd_next()与存储引擎交互
  2. 缓冲优化: 根据预期行数启用缓存机制
  3. 并发处理: 处理MyISAM等存储引擎的并发删除情况
  4. 集合操作支持: 专门处理INTERSECT和EXCEPT的重复行语义

使用场景:

1.3 IndexScanIterator - 索引扫描

位置: /sql/iterators/basic_row_iterators.h

主要结构变量:

template <bool Reverse>
class IndexScanIterator final : public TableRowIterator {
private:
  uchar *const m_record;              // 记录缓冲区
  const int m_idx;                    // 索引号
  const bool m_use_order;             // 是否需要有序
  const double m_expected_rows;       // 预期行数
  ha_rows *const m_examined_rows;     // 检查行数
  bool m_first = true;                // 是否第一次读取
};

核心代码分析:

初始化方法:

bool IndexScanIterator<Reverse>::Init() {
  // 检查索引是否覆盖所需列,启用covering index scan
  if (table()->covering_keys.is_set(m_idx) && !table()->no_keyread) {
    table()->set_keyread(true);
  }
  
  // 初始化索引扫描
  int error = table()->file->ha_index_init(m_idx, m_use_order);
  if (error) {
    PrintError(error);
    return true;
  }
  
  m_first = true;
  return false;
}

正向读取方法:

template <>
int IndexScanIterator<false>::Read() {  // Forward scan
  int error;
  if (m_first) {
    error = table()->file->ha_index_first(m_record);
    m_first = false;
  } else {
    error = table()->file->ha_index_next(m_record);
  }
  
  if (error) return HandleError(error);
  if (m_examined_rows != nullptr) ++*m_examined_rows;
  return 0;
}

反向读取方法:

template <>
int IndexScanIterator<true>::Read() {  // Backward scan
  int error;
  if (m_first) {
    error = table()->file->ha_index_last(m_record);
    m_first = false;
  } else {
    error = table()->file->ha_index_prev(m_record);
  }
  
  if (error) return HandleError(error);
  if (m_examined_rows != nullptr) ++*m_examined_rows;
  return 0;
}

操作解释:

  1. 模板化设计: 通过模板参数支持正向和反向扫描
  2. 覆盖索引优化: 自动检测并启用covering index扫描,避免回表
  3. 有序性控制: use_order参数控制是否需要保持索引顺序
  4. 分区表支持: 在分区表场景下可提供更高效的扫描策略

使用场景:

2. 复合操作 Iterator

2.1 FilterIterator - 过滤条件

位置: /sql/iterators/composite_iterators.h

主要结构变量:

class FilterIterator final : public RowIterator {
private:
  unique_ptr_destroy_only<RowIterator> m_source;  // 源Iterator
  Item *m_condition;                              // 过滤条件
};

核心代码分析:

读取方法 (composite_iterators.cc):

int FilterIterator::Read() {
  for (;;) {
    // 从源Iterator读取下一行
    int err = m_source->Read();
    if (err != 0) return err;  // 错误或EOF
    
    // 评估过滤条件
    bool matched = m_condition->val_int();
    
    // 检查是否被用户中断
    if (thd()->killed) {
      thd()->send_kill_message();
      return 1;
    }
    
    // 检查条件评估是否出错
    if (thd()->is_error()) return 1;
    
    // 条件不匹配,释放行锁并继续读取下一行
    if (!matched) {
      m_source->UnlockRow();
      continue;
    }
    
    // 条件匹配,返回该行
    return 0;
  }
}

操作解释:

  1. 条件评估循环: 持续从源Iterator读取数据直到找到满足条件的行
  2. 错误处理: 检查用户中断和条件评估错误
  3. 行锁管理: 对不匹配的行调用UnlockRow()释放锁
  4. 高效过滤: 在数据流中尽早进行过滤,减少上层处理开销

使用场景:

2.2 HashJoinIterator - 哈希连接

位置: /sql/iterators/hash_join_iterator.h

主要结构变量:

class HashJoinIterator final : public RowIterator {
private:
  // Build 和 Probe 侧的迭代器
  unique_ptr_destroy_only<RowIterator> m_build_iterator;
  unique_ptr_destroy_only<RowIterator> m_probe_iterator;
  
  // 哈希表和缓冲区管理
  hash_join_buffer::HashJoinBuffer m_hash_map;
  std::vector<HashJoinChunk> m_build_chunks;
  std::vector<HashJoinChunk> m_probe_chunks;
  
  // 连接条件和状态
  std::vector<Item_eq_base *> m_join_conditions;
  std::vector<Item *> m_extra_conditions;
  JoinType m_join_type;
  bool m_allow_spill_to_disk;
  
  // 状态机管理
  enum class State {
    LOADING_NEXT_CHUNK_PAIR,              // 加载下一个chunk对
    READING_ROW_FROM_PROBE_ITERATOR,      // 从probe iterator读取
    READING_ROW_FROM_PROBE_CHUNK_FILE,    // 从probe chunk文件读取
    READING_FIRST_ROW_FROM_HASH_TABLE,    // 从哈希表读取第一行
    READING_FROM_HASH_TABLE               // 从哈希表读取后续行
  } m_state;
};

操作解释:

  1. 三阶段执行:
    • Build阶段: 读取较小的表构建哈希表
    • Probe阶段: 读取较大的表在哈希表中查找匹配
    • Chunk处理阶段: 内存不足时处理溢出的chunk文件
  2. 混合哈希连接:
    • 内存充足时进行内存哈希连接
    • 内存不足时自动降级为磁盘哈希连接
    • 使用分区技术将数据分割到多个chunk文件
  3. 连接类型支持:
    • INNER JOIN: 只返回匹配的行
    • LEFT JOIN: 返回左表所有行,右表无匹配时补NULL
    • SEMI JOIN: 左表行有匹配即返回(不重复)
    • ANTI JOIN: 返回左表中无匹配的行

使用场景:

2.3 NestedLoopIterator - 嵌套循环连接

位置: /sql/iterators/composite_iterators.h

主要结构变量:

class NestedLoopIterator final : public RowIterator {
private:
  enum {
    NEEDS_OUTER_ROW,           // 需要外表行
    READING_FIRST_INNER_ROW,   // 读取内表第一行
    READING_INNER_ROWS,        // 读取内表后续行
    END_OF_ROWS                // 行结束
  } m_state;

  unique_ptr_destroy_only<RowIterator> const m_source_outer;
  unique_ptr_destroy_only<RowIterator> const m_source_inner;
  const JoinType m_join_type;
  const bool m_pfs_batch_mode;  // 是否启用性能监控批模式
};

操作解释:

  1. 状态机驱动: 使用状态机管理复杂的连接逻辑
  2. JOIN类型支持: 支持INNER、LEFT、RIGHT、SEMI、ANTI JOIN
  3. 批处理模式: 内表扫描时可启用Performance Schema批处理模式
  4. 简单高效: 适合小表连接或有良好索引的场景

3. 排序和聚合 Iterator

3.1 SortingIterator - 排序

位置: /sql/iterators/sorting_iterator.h

主要结构变量:

class SortingIterator final : public RowIterator {
private:
  Filesort *m_filesort;                              // 文件排序对象
  unique_ptr_destroy_only<RowIterator> m_source_iterator;  // 源迭代器
  unique_ptr_destroy_only<RowIterator> m_result_iterator;  // 结果迭代器
  
  Filesort_info m_fs_info;                           // 文件排序信息
  Sort_result m_sort_result;                         // 排序结果
  const ha_rows m_num_rows_estimate;                 // 预估行数
  const table_map m_tables_to_get_rowid_for;        // 需要获取rowid的表
  ha_rows *m_examined_rows;                          // 检查的行数
  
  // 联合体存储不同类型的结果迭代器
  union IteratorHolder {
    SortBufferIterator<true> sort_buffer_packed_addons;   // 压缩addon字段
    SortBufferIterator<false> sort_buffer;                // 普通内存缓冲
    SortBufferIndirectIterator sort_buffer_indirect;      // 间接内存缓冲
    SortFileIterator<true> sort_file_packed_addons;       // 压缩文件排序
    SortFileIterator<false> sort_file;                    // 普通文件排序
    SortFileIndirectIterator sort_file_indirect;          // 间接文件排序
  } m_result_iterator_holder;
};

核心代码分析:

初始化方法:

bool SortingIterator::Init() {
  // 初始化源迭代器
  if (m_source_iterator->Init()) {
    return true;
  }
  
  // 执行实际排序
  return DoSort();
}

排序执行方法:

int SortingIterator::DoSort() {
  // 从源迭代器收集所有数据进行排序
  if (filesort(thd(), table, m_filesort, /*sort_positions=*/true,
               examined_rows, &found_rows, &sort_result)) {
    return true;
  }
  
  // 根据排序结果创建相应的结果迭代器
  if (m_sort_result.has_filesort_result_in_memory()) {
    if (m_sort_result.using_addon_fields()) {
      // 使用addon字段的内存排序
      m_result_iterator = &m_result_iterator_holder.sort_buffer_packed_addons;
    } else {
      // 普通内存排序
      m_result_iterator = &m_result_iterator_holder.sort_buffer;
    }
  } else {
    // 文件排序
    if (m_sort_result.using_addon_fields()) {
      m_result_iterator = &m_result_iterator_holder.sort_file_packed_addons;
    } else {
      m_result_iterator = &m_result_iterator_holder.sort_file;
    }
  }
  
  return m_result_iterator->Init();
}

操作解释:

  1. 两阶段处理: Init()阶段收集并排序所有数据,Read()阶段返回排序结果
  2. 自适应策略: 根据数据量和内存情况选择内存排序或文件排序
  3. 多种结果类型: 支持直接排序、间接排序、压缩字段等多种优化方式
  4. Filesort集成: 深度集成MySQL的Filesort排序引擎

使用场景:

3.2 AggregateIterator - 聚合操作

位置: /sql/iterators/composite_iterators.h

主要结构变量:

class AggregateIterator final : public RowIterator {
private:
  enum {
    READING_FIRST_ROW,           // 读取第一行
    LAST_ROW_STARTED_NEW_GROUP,  // 最后一行开始新组
    OUTPUTTING_ROLLUP_ROWS,      // 输出ROLLUP行
    DONE_OUTPUTTING_ROWS         // 完成输出
  } m_state;

  unique_ptr_destroy_only<RowIterator> m_source;  // 源迭代器
  JOIN *m_join = nullptr;                         // 关联的JOIN对象
  bool m_seen_eof;                                // 是否已到EOF
  table_map m_save_nullinfo;                     // 保存的NULL信息
  const bool m_rollup;                            // 是否是ROLLUP查询
  
  // ROLLUP相关变量
  int m_last_unchanged_group_item_idx;            // 最后未变化的分组项索引
  int m_current_rollup_position;                 // 当前ROLLUP位置
  
  pack_rows::TableCollection m_tables;           // 表集合
  String m_first_row_this_group;                 // 当前组的第一行
  String m_first_row_next_group;                 // 下一组的第一行
  int m_output_slice = -1;                       // 输出切片
};

核心代码分析:

读取方法 (composite_iterators.cc):

int AggregateIterator::Read() {
  switch (m_state) {
    case READING_FIRST_ROW:
      // 读取并处理第一行
      if (m_source->Read() != 0) {
        return -1;  // 空结果集
      }
      
      // 保存第一行并初始化聚合函数
      StoreFromTableBuffers(m_tables, &m_first_row_this_group);
      init_tmptable_sum_functions(m_join->sum_funcs);
      
      m_state = LAST_ROW_STARTED_NEW_GROUP;
      return Read();  // 递归调用处理
      
    case LAST_ROW_STARTED_NEW_GROUP:
      // 处理分组变化
      while (m_source->Read() == 0) {
        if (compare_group_fields()) {
          // 分组发生变化
          StoreFromTableBuffers(m_tables, &m_first_row_next_group);
          break;
        }
        // 在同一组内,更新聚合函数
        update_tmptable_sum_func(m_join->sum_funcs);
      }
      
      // 恢复当前组的第一行并输出聚合结果
      LoadIntoTableBuffers(m_tables, m_first_row_this_group);
      
      if (m_rollup) {
        m_state = OUTPUTTING_ROLLUP_ROWS;
        SetRollupLevel(m_join->send_group_parts);
      } else {
        m_state = READING_FIRST_ROW;
        // 准备处理下一组
        m_first_row_this_group = std::move(m_first_row_next_group);
      }
      return 0;
      
    case OUTPUTTING_ROLLUP_ROWS:
      // 输出ROLLUP的各级聚合
      if (--m_current_rollup_position >= m_last_unchanged_group_item_idx) {
        SetRollupLevel(m_current_rollup_position);
        return 0;
      }
      
      m_state = READING_FIRST_ROW;
      return Read();
      
    case DONE_OUTPUTTING_ROWS:
      return -1;
  }
}

ROLLUP处理:

void AggregateIterator::SetRollupLevel(int level) {
  // 设置GROUP BY项为NULL(从level开始)
  for (int i = level; i < m_join->send_group_parts; ++i) {
    m_join->group_fields[i]->set_null();
  }
  
  // 更新聚合函数的ROLLUP状态
  for (Item_sum **func = m_join->sum_funcs; *func; ++func) {
    (*func)->set_aggregator_level(level);
  }
}

操作解释:

  1. 流式聚合: 无需预先物化,边读边聚合
  2. 行保存机制: 使用pack_rows保存分组边界的行数据
  3. 状态机驱动: 通过状态机管理复杂的聚合逻辑
  4. ROLLUP支持: 为每个分组级别生成聚合结果
  5. 内存高效: 只保存必要的边界行,不缓存所有数据

关键特性:

  1. 行保存和恢复: 使用pack_rows机制保存分组边界的行
  2. 状态管理: 跟踪当前组和下一组的状态
  3. ROLLUP支持: 为每个ROLLUP级别生成额外的聚合行

使用场景:

4. 物化和窗口 Iterator

4.1 MaterializeIterator - 物化

位置: /sql/iterators/composite_iterators.cc

主要结构变量:

template <typename Profiler>
class MaterializeIterator final : public TableRowIterator {
private:
  // 需要物化的查询块列表
  Mem_root_array<materialize_iterator::QueryBlock> m_query_blocks_to_materialize;
  
  // 用于扫描物化后临时表的迭代器
  unique_ptr_destroy_only<RowIterator> m_table_iterator;
  
  Common_table_expr *m_cte;                    // CTE指针(如果是CTE物化)
  Query_expression *m_query_expression;       // 查询表达式
  JOIN *const m_join;                          // 关联的JOIN对象
  const int m_ref_slice;                       // 引用切片
  const bool m_rematerialize;                 // 是否需要重新物化
  const bool m_reject_multiple_rows;          // 是否拒绝多行(标量子查询检查)
  const ha_rows m_limit_rows;                 // 行数限制
  
  // 缓存失效检测器列表
  struct Invalidator {
    const CacheInvalidatorIterator *iterator;
    int64_t generation_at_last_materialize;
  };
  Mem_root_array<Invalidator> m_invalidators;
  
  Profiler m_profiler;                         // 性能分析器
};

查询块结构:

struct QueryBlock {
  unique_ptr_destroy_only<RowIterator> subquery_iterator;  // 子查询迭代器
  int select_number;                                       // SELECT编号(用于跟踪)
  JOIN *join;                                              // 关联的JOIN
  bool disable_deduplication_by_hash_field = false;       // 是否禁用哈希去重
  bool copy_items;                                         // 是否复制字段
  
  // 集合操作相关
  ulonglong m_total_operands{0};                          // 操作数总数
  ulonglong m_operand_idx{0};                             // 当前操作数索引
  uint m_first_distinct{0};                               // 第一个DISTINCT操作数
  
  Temp_table_param *temp_table_param;                    // 临时表参数
  
  // 递归CTE相关
  bool is_recursive_reference = false;                    // 是否为递归引用
  FollowTailIterator *recursive_reader = nullptr;        // 递归读取器
};

核心代码分析:

初始化方法:

bool MaterializeIterator<Profiler>::Init() {
  // 检查是否需要重新物化
  if (!m_rematerialize && table()->materialized) {
    return m_table_iterator->Init();
  }
  
  // 检查CTE是否已被其他地方物化
  if (m_cte != nullptr && m_cte->tmp_result != nullptr) {
    table()->m_materialize_info = m_cte->tmp_result;
    return m_table_iterator->Init();
  }
  
  // 检查缓存失效器
  for (const Invalidator &invalidator : m_invalidators) {
    if (invalidator.iterator->generation() != 
        invalidator.generation_at_last_materialize) {
      // 缓存已失效,需要重新物化
      need_rematerialize = true;
      break;
    }
  }
  
  if (!need_rematerialize) {
    return m_table_iterator->Init();
  }
  
  // 执行物化
  if (MaterializeQueryBlocks()) {
    return true;
  }
  
  return m_table_iterator->Init();
}

物化执行方法:

bool MaterializeIterator<Profiler>::MaterializeQueryBlocks() {
  // 清空临时表
  empty_record(table());
  table()->file->ha_delete_all_rows();
  
  for (const auto &query_block : m_query_blocks_to_materialize) {
    if (MaterializeQueryBlock(query_block)) {
      return true;
    }
  }
  
  table()->materialized = true;
  return false;
}

单个查询块物化:

bool MaterializeIterator<Profiler>::MaterializeQueryBlock(
    const materialize_iterator::QueryBlock &query_block) {
  
  // 初始化子查询迭代器
  if (query_block.subquery_iterator->Init()) {
    return true;
  }
  
  ha_rows stored_rows = 0;
  
  // 逐行读取并存储到临时表
  PFSBatchMode batch_mode(query_block.subquery_iterator.get());
  
  for (;;) {
    int error = query_block.subquery_iterator->Read();
    if (error > 0) return true;  // 错误
    if (error < 0) break;        // EOF
    
    // 复制字段值
    if (query_block.copy_items) {
      if (copy_fields(query_block.temp_table_param, thd())) {
        return true;
      }
    }
    
    // 处理集合操作(UNION/INTERSECT/EXCEPT)
    if (query_block.m_total_operands > 1) {
      if (HandleSetOperation(query_block)) {
        return true;
      }
    }
    
    // 检查唯一性约束(UNION DISTINCT)
    if (!query_block.disable_deduplication_by_hash_field) {
      if (check_unique_constraint(table())) {
        continue;  // 重复行,跳过
      }
    }
    
    // 写入临时表
    if ((error = table()->file->ha_write_row(table()->record[0]))) {
      return report_handler_error(table(), error);
    }
    
    ++stored_rows;
    
    // 检查行数限制
    if (m_limit_rows != HA_POS_ERROR && stored_rows >= m_limit_rows) {
      break;
    }
  }
  
  return false;
}

操作解释:

  1. 多查询块支持: 可同时物化多个查询块(用于UNION等集合操作)
  2. 递归CTE支持: 与FollowTailIterator协作实现递归查询
  3. 缓存机制: 支持CTE缓存和失效检测,避免重复物化
  4. 集合操作: 内置支持UNION、INTERSECT、EXCEPT的语义
  5. 去重机制: 支持哈希去重和唯一约束检查

使用场景:

4.2 WindowIterator - 窗口函数

位置: /sql/iterators/window_iterators.h

主要结构变量:

class WindowIterator final : public RowIterator {
private:
  /// 源迭代器
  unique_ptr_destroy_only<RowIterator> const m_source;
  
  /// 临时表参数(包含窗口信息)
  Temp_table_param *m_temp_table_param;
  
  /// 窗口函数对象
  Window *m_window;
  
  /// 关联的JOIN对象
  JOIN *m_join;
  
  /// 读取行时使用的切片
  int m_input_slice;
  
  /// 输出行时使用的切片
  int m_output_slice;
};

BufferingWindowIterator结构:

class BufferingWindowIterator final : public RowIterator {
private:
  unique_ptr_destroy_only<RowIterator> const m_source;
  Temp_table_param *m_temp_table_param;
  Window *m_window;
  JOIN *m_join;
  int m_input_slice;
  int m_output_slice;
  
  // 缓冲区相关
  bool m_possibly_buffered_rows = false;        // 是否有缓冲行
  bool m_last_input_row_started_new_partition;  // 最后输入行是否开始新分区
  bool m_eof = false;                            // 是否到达EOF
};

核心代码分析:

WindowIterator读取方法:

int WindowIterator::Read() {
  // 从源迭代器读取行
  const int result = m_source->Read();
  if (result != 0) {
    return result;
  }
  
  // 切换到输入切片进行字段复制
  Switch_ref_item_slice slice_switch(m_join, m_input_slice);
  
  // 复制字段到临时表
  if (copy_fields(m_temp_table_param, thd())) {
    return 1;
  }
  
  // 复制函数结果(包括窗口函数)
  if (copy_funcs(m_temp_table_param, thd(), CFT_HAS_WF)) {
    return 1;
  }
  
  // 在窗口边界重置窗口函数
  if (m_window->needs_card() && m_window->check_partition_boundary()) {
    m_window->reset_all_wf_state();
  }
  
  // 切换到输出切片
  Switch_ref_item_slice output_slice_switch(m_join, m_output_slice);
  
  return 0;
}

BufferingWindowIterator读取方法:

int BufferingWindowIterator::Read() {
  for (;;) {
    // 如果有缓冲行需要处理
    if (m_possibly_buffered_rows) {
      // 处理缓冲的窗口记录
      bool output_row_ready = false;
      if (process_buffered_windowing_record(
              thd(), m_temp_table_param, &output_row_ready)) {
        return 1;
      }
      
      if (output_row_ready) {
        // 行已准备好输出
        return 0;
      }
      
      if (m_eof) {
        return -1;  // 所有行都已处理完毕
      }
    }
    
    // 读取下一行进行缓冲
    int result = m_source->Read();
    if (result != 0) {
      if (result < 0) {
        m_eof = true;
        // 继续处理剩余的缓冲行
        continue;
      }
      return result;  // 错误
    }
    
    // 复制字段和函数到临时表
    Switch_ref_item_slice slice_switch(m_join, m_input_slice);
    
    if (copy_fields(m_temp_table_param, thd())) {
      return 1;
    }
    
    if (copy_funcs(m_temp_table_param, thd(), CFT_HAS_NO_WF)) {
      return 1;
    }
    
    // 缓冲当前行
    if (buffer_windowing_record(thd(), m_temp_table_param,
                               &m_last_input_row_started_new_partition)) {
      return 1;
    }
    
    m_possibly_buffered_rows = true;
  }
}

窗口函数执行流程:

操作解释:

  1. 多阶段处理: 分为字段复制、函数评估、窗口处理等阶段
  2. 切片管理: 通过input_slice和output_slice管理不同阶段的字段引用
  3. 缓冲策略: BufferingWindowIterator支持需要前瞻或回顾的窗口函数
  4. 分区边界: 自动检测分区边界并重置窗口函数状态
  5. 临时表集成: 与MaterializeIterator紧密配合实现复杂窗口查询

使用场景:

5. 特别场景优化 Iterator

5.1 UnqualifiedCountIterator - COUNT(*)优化

位置: /sql/iterators/basic_row_iterators.h

主要结构变量:

class UnqualifiedCountIterator final : public RowIterator {
private:
  bool m_has_row;              // 是否有行要返回
  JOIN *const m_join;          // 关联的JOIN对象
};

核心代码分析:

初始化方法:

bool UnqualifiedCountIterator::Init() {
  m_has_row = true;    // 标记有一行结果要返回
  return false;
}

读取方法 (basic_row_iterators.cc):

int UnqualifiedCountIterator::Read() {
  if (m_has_row) {
    m_has_row = false;
    
    // 获取第一个表(通常是要计数的主表)
    TABLE *table = m_join->qep_tab[m_join->const_tables].table();
    
    // 调用存储引擎的快速计数接口
    ha_rows count;
    int error = table->file->ha_records(&count);
    
    if (error) {
      // 快速计数失败,报告错误
      return HandleError(error);
    }
    
    // 将计数结果存储到相应的字段中
    // 通常是COUNT(*)函数的result_field
    for (Item_sum **sum_func = m_join->sum_funcs; *sum_func; ++sum_func) {
      if ((*sum_func)->sum_func() == Item_sum::COUNT_FUNC) {
        (*sum_func)->make_const(static_cast<longlong>(count));
        break;
      }
    }
    
    return 0;  // 成功返回一行
  } else {
    return -1; // EOF,没有更多行
  }
}

操作解释:

  1. 单行输出: 只返回一行结果,包含COUNT(*)的值
  2. 存储引擎优化: 直接调用ha_records()接口获取行数统计
  3. 避免扫描: 无需逐行读取,极大提升性能
  4. 适用条件: 仅适用于无WHERE条件、无JOIN的简单COUNT(*)查询

使用场景:

5.2 WeedoutIterator - 半连接去重

位置: /sql/iterators/composite_iterators.h

主要结构变量:

class WeedoutIterator final : public RowIterator {
private:
  unique_ptr_destroy_only<RowIterator> m_source;     // 源迭代器
  SJ_TMP_TABLE *m_sj;                                // 半连接临时表
  const table_map m_tables_to_get_rowid_for;        // 需要获取row ID的表映射
};

SJ_TMP_TABLE结构:

struct SJ_TMP_TABLE {
  TABLE *tmp_table;                    // 临时表用于存储row ID
  SJ_TMP_TABLE_TAB *tabs;             // 表信息数组
  SJ_TMP_TABLE_TAB *tabs_end;         // 表信息数组结束
  uint null_bits;                      // NULL位数
  uint null_bytes;                     // NULL字节数
  uint rowid_len;                      // Row ID长度
  ha_rows row_count;                   // 行计数
};

核心代码分析:

读取方法 (composite_iterators.cc):

int WeedoutIterator::Read() {
  for (;;) {
    // 从源迭代器读取下一行
    int ret = m_source->Read();
    if (ret != 0) {
      return ret;  // 错误或EOF
    }

    // 获取相关表的row ID
    for (SJ_TMP_TABLE_TAB *tab = m_sj->tabs; tab != m_sj->tabs_end; ++tab) {
      TABLE *table = tab->qep_tab->table();
      if ((m_tables_to_get_rowid_for & table->pos_in_table_list->map()) &&
          can_call_position(table)) {
        // 获取当前行的position(row ID)
        table->file->position(table->record[0]);
      }
    }

    // 执行去重检查
    ret = do_sj_dups_weedout(thd(), m_sj);
    if (ret == -1) {
      return 1;  // 错误
    }

    if (ret == 0) {
      // 不是重复行,返回该行
      return 0;
    }

    // 是重复行,跳过并读取下一行
  }
}

去重检查方法 (sql_executor.cc):

int do_sj_dups_weedout(THD *thd, SJ_TMP_TABLE *sjtbl) {
  // 构建包含所有相关row ID的键
  uchar *ptr = sjtbl->tmp_table->record[0] + 1; // 跳过NULL字节
  
  // 复制每个表的row ID到临时表记录中
  for (SJ_TMP_TABLE_TAB *tab = sjtbl->tabs; tab != sjtbl->tabs_end; ++tab) {
    if (tab->rowid_offset) {
      memcpy(ptr + tab->rowid_offset, tab->qep_tab->table()->file->ref,
             tab->qep_tab->table()->file->ref_length);
    }
  }
  
  // 尝试在临时表中插入这个row ID组合
  int error = sjtbl->tmp_table->file->ha_write_row(sjtbl->tmp_table->record[0]);
  
  if (error == 0) {
    return 0;  // 插入成功,是新行
  } else if (error == HA_ERR_FOUND_DUPP_KEY) {
    return 1;  // 重复键,是重复行
  } else {
    return -1; // 其他错误
  }
}

操作解释:

  1. 流式去重: 边读边检查,无需预先物化整个结果集
  2. Row ID去重: 基于行的物理位置而非内容进行去重
  3. 临时表记录: 使用临时表记录已见过的row ID组合
  4. 半连接优化: 将半连接转换为内连接+去重,便于优化器重排序

使用场景:

5.3 FollowTailIterator - 递归CTE

位置: /sql/iterators/basic_row_iterators.h

主要结构变量:

class FollowTailIterator final : public TableRowIterator {
private:
  bool m_inited = false;                          // 是否已初始化
  uchar *const m_record;                          // 记录缓冲区
  const double m_expected_rows;                   // 预期行数
  ha_rows *const m_examined_rows;                 // 检查行数
  ha_rows m_read_rows;                            // 已读取行数
  ha_rows m_end_of_current_iteration;             // 当前迭代结束行数
  unsigned m_recursive_iteration_count;          // 递归迭代计数
  ha_rows *m_stored_rows = nullptr;               // 指向MaterializeIterator的存储行数
};

核心代码分析:

初始化方法 (basic_row_iterators.cc):

bool FollowTailIterator::Init() {
  if (!m_inited) {
    // 第一次初始化,开始表扫描
    if (table()->file->ha_rnd_init(1)) {
      return true;
    }
    m_inited = true;
    m_read_rows = 0;
    m_end_of_current_iteration = *m_stored_rows;
    m_recursive_iteration_count = 0;
  }
  // 后续调用不重置游标位置,继续从当前位置读取
  return false;
}

读取方法 (basic_row_iterators.cc):

int FollowTailIterator::Read() {
  if (m_read_rows == *m_stored_rows) {
    // 已读取所有当前存储的行,返回EOF等待更多行
    return -1;
  }
  
  // 检查递归迭代限制
  if (m_read_rows >= m_end_of_current_iteration) {
    // 开始新的迭代
    m_end_of_current_iteration = *m_stored_rows;
    ++m_recursive_iteration_count;
    
    // 检查迭代次数限制(防止无限递归)
    if (m_recursive_iteration_count > thd()->variables.cte_max_recursion_depth) {
      my_error(ER_CTE_MAX_RECURSION_DEPTH, MYF(0),
               thd()->variables.cte_max_recursion_depth);
      return 1;
    }
  }
  
  // 读取下一行
  int error = table()->file->ha_rnd_next(m_record);
  if (error) {
    if (error == HA_ERR_END_OF_FILE) {
      // 暂时到达文件末尾,等待更多数据
      return -1;
    }
    return HandleError(error);
  }
  
  ++m_read_rows;
  if (m_examined_rows != nullptr) {
    ++*m_examined_rows;
  }
  
  return 0;
}

溢出到磁盘处理:

bool FollowTailIterator::RepositionCursorAfterSpillToDisk() {
  // 表从MEMORY转换为InnoDB后重新定位游标
  if (table()->file->ha_rnd_init(1)) {
    return true;
  }
  
  // 跳过已读取的行
  for (ha_rows i = 0; i < m_read_rows; ++i) {
    int error = table()->file->ha_rnd_next(m_record);
    if (error) {
      return true;
    }
  }
  
  return false;
}

操作解释:

  1. 尾随读取: 在MaterializeIterator写入的同时读取新增的行
  2. 迭代控制: 跟踪递归迭代次数,防止无限递归
  3. 状态保持: 多次Init()调用不重置读取位置
  4. 溢出处理: 支持从MEMORY表溢出到InnoDB的场景
  5. EOF处理: 临时EOF允许后续继续读取新数据

WITH RECURSIVE执行流程:

使用场景:

Iterator 执行模式和优化

1. 生命周期管理

2. 性能优化技术

2.1 Performance Schema 批处理模式

// 在最内层表启用批处理模式减少性能监控开销
virtual void StartPSIBatchMode() {}
virtual void EndPSIBatchModeIfStarted() {}

2.2 记录缓冲区优化

2.3 内存管理

3. 错误处理和事务支持

3.1 标准化错误码

3.2 行锁管理

virtual void UnlockRow() = 0;  // 释放不需要的行锁

实际使用例子

复杂查询的 Iterator 组合示例

考虑以下 SQL 查询:

SELECT o.order_id, c.customer_name, SUM(oi.amount)
FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id  
INNER JOIN order_items oi ON o.order_id = oi.order_id
WHERE o.order_date >= '2023-01-01' 
  AND c.region = 'Asia'
GROUP BY o.order_id, c.customer_name
HAVING SUM(oi.amount) > 1000
ORDER BY SUM(oi.amount) DESC
LIMIT 100;

Iterator 执行树结构:

执行过程详解:

  1. 底层扫描阶段:
    • customers表通过region索引扫描,应用region过滤
    • orders表全表扫描,应用日期过滤
    • order_items表通过order_id索引进行连接扫描
  2. 连接处理阶段:
    • 第一层: orders和customers的嵌套循环连接
    • 第二层: 结果与order_items的嵌套循环连接
  3. 聚合和过滤阶段:
    • AggregateIterator按GROUP BY字段分组聚合
    • FilterIterator应用HAVING条件
  4. 排序和限制阶段:
    • SortingIterator按SUM(amount)降序排序
    • LimitOffsetIterator限制输出100行

总结

MySQL 的 Iterator 具有以下特点:

  1. 统一的接口设计: 所有 Iterator 都继承自 RowIterator,提供一致的 Init()/Read() 接口
  2. 可组合性: 复合 Iterator 可以将多个 Iterator 组合成复杂的执行计划
  3. 高性能: 集成了covering index、批处理模式、内存缓冲等优化技术
  4. 可扩展性: 易于添加新的 Iterator 类型来支持新功能
  5. 鲁棒性: 支持大数据量处理、内存溢出保护、错误恢复
  6. 丰富的连接算法: 支持嵌套循环和哈希连接,适应不同场景
  7. 完善的聚合支持: 流式聚合、窗口函数、递归CTE等高级功能

这种设计使得 MySQL 能够高效执行各种复杂查询,同时保持良好的代码可维护性和扩展性。