数据库内核月报

数据库内核月报 - 2020 / 04

MySQL · 引擎特性 · 8.0 Window Functions 剖析

Author: $马腾

什么是window function

window function 是在满足某种条件的记录集合上执行一个特殊的函数。这一句话,记录集合就是窗口,特殊的函数就是在这个窗口上执行的函数。

SELECT
function_name OVER ( window_definition )
1. FROM (...)

window_definition : 定义要计算的记录的集合。 function_name : 指定对于集合要使用的函数。 接下来我们通过一个简单的例子看一个窗口函数:

SELECT
  f.id, f.release_year, 
  f.category_id, f.rating,
  AVG(rating) OVER 
  (PARTITION BY release_year) AS year_avg
FROM films f

这个语句是使用了partition by 作为了集合的定义,即把release year 相同的放在了一起,然后对集合进行 avg() 函数运算。 可以通过这样一个图来较为清晰的看到 undefined.

窗口函数的分类

我个人的理解是窗口函数的分类可以通过两个基准分,一个是窗口大小是否固定,一个是按照函数功能分类

按照窗口大小是否固定划分可以分为:

窗口大小固定:静态窗口函数 窗口大小不固定,不同的记录对应着不同的窗口:动态(滑动)窗口函数

按照功能划分

序号函数:row_number(); rank(); dense_rank(); 分布函数:percent_rank(); cume_dist(); 前后函数:lag(expr, n); // 返回当前行的前 n 行expr 的值; lead(expr, n); // 返回当前行的后n 行expr 的值 头尾函数:first(expr); // 返回第一个 expr 的值; last_value(expr); //返回最后一个expr 的值 其他函数:nth_value(expr, n); // 返回第 n 个expr 的值; ntile(n); // 将有序数据分为n 个桶,记录等级数

窗口集合的定义

窗口函数的基本用法是

函数名 over 子句

over 是定义窗口集合的关键字; 一般有四种定义集合的方法

  1. 什么也不写; 这样意味着窗口包含满足where 所有的行,窗口是基于所有的行进行计算。
  2. partition 字句: 窗口按照哪些字段进行分组。
  3. order by 字句: 按照哪些字段进行排序。
  4. frame 字句: frame 是当前分区的一个子集,子句用来定义子集的规则通常用来作为滑动窗口使用。

滑动窗口

滑动窗口有两种指定范围的方式,一种是基于行,一种是基于范围。

基于行

通常使用BETWEEN frame_start AND frame_end语法来表示行范围,frame_start和frame_end可以支持如下关键字,来确定不同的动态行记录, 也可以自己定义行来表示范围

CURRENT ROW 边界是当前行,一般和其他范围关键字一起使用 UNBOUNDED PRECEDING 边界是分区中的第一行 UNBOUNDED FOLLOWING 边界是分区中的最后一行 expr PRECEDING 边界是当前行减去expr的值 expr FOLLOWING 边界是当前行加上expr的值

select avg(amount) over(partition by user_no  ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as row_num from order_info;

基于范围

有一些范围不是可以直接使用行数表示,这种情况就要用范围,比如窗口范围是一周前的订单开始,截止到当前行; INTERVAL 7 DAY PRECEDING

Hint: 有些函数不管有没有用frame 子句,它的窗口都是固定的,使用这种函数一定是静态窗口函数

cume_dist() dense_rank() lag() lead() ntile() percent_rank() rank() row_number()

mysql8.0 源码分析窗口函数的执行过程

以下内容通过源码角度分析执行过程

优化阶段:

  1. setup windows : 在优化阶段判断出select_lex->m_windows 不为空,就首先调用 Window::setup_windows; 这个方法里面的核心接口是 Window::check_window_functions(THD *thd, SELECT_LEX *select)

a. 这个方法中首先判断的是当前是静态窗口还是动态窗口; 静态窗口即判断了 frame 的定义是否有定义上下边界。m_static_aggregates 为 true, 意味着是静态窗口,同时对每一个分区都可以进行一次评估。

如果 ma_static_aggregates 为 false, 则进一步判断其滑动窗口使用的是基于范围还是基于行。 m_row_optimizable 基于行 m_range_optimizable 基于范围 b. 获取聚合函数作为窗口函数时候窗口的特殊规格要求 wfs->check_wf_semantics(thd, select, &reqs) 这个方法其实就是判断是不是需要 row_buffer 作为评估,什么时候需要使用 row_buffer 呢:如果我们只看当前分区的行无法进行正确的计算,我们需要看之后的或者之前的行,就需要使用row_buffer。

bool Window::check_window_functions(THD *thd, SELECT_LEX *select) {
  List_iterator<Item_sum> li(m_functions);
  Item *wf;

  m_static_aggregates =
      (m_frame->m_from->m_border_type == WBT_UNBOUNDED_PRECEDING &&
       m_frame->m_to->m_border_type == WBT_UNBOUNDED_FOLLOWING);

  // If static aggregates, inversion isn't necessary
  m_row_optimizable = (m_frame->m_unit == WFU_ROWS) && !m_static_aggregates;
  m_range_optimizable = (m_frame->m_unit == WFU_RANGE) && !m_static_aggregates;
  . ..
  while ((wf = li++)) {
    Evaluation_requirements reqs;

    Item_sum *wfs = down_cast<Item_sum *>(wf);
    if (wfs->check_wf_semantics(thd, select, &reqs)) return true;

    m_needs_frame_buffering |= reqs.needs_buffer;
	....
}
  1. Optimize-> make_tmp_tables_info: 这里是看是否需要创建一个临时表作为 window frame buffer. 而是否创建的判断条件就是之前 Window::check_window_functions(THD *thd, SELECT_LEX *select) 接口中 求得的 row_buffer 决定的,如果row_buffer 为 true 则需要创建一个 temp table.
...
      if (m_windows[wno]->needs_buffering()) {
        /*
          Create the window frame buffer tmp table.  We create a
          temporary table with same contents as the output tmp table
          in the windowing pipeline (columns defined by
          curr_all_fields), but used for intermediate storage, saving
          the window's frame buffer now that we know the window needs
          buffering.
        */
        Temp_table_param *par =
            new (thd->mem_root) Temp_table_param(tmp_table_param);
        par->m_window = nullptr;  // Only OUT table needs access to Window

        List<Item> tmplist(*curr_all_fields, thd->mem_root);
        TABLE *table =
            create_tmp_table(thd, par, tmplist, nullptr, false, false,
                             select_lex->active_options(), HA_POS_ERROR, "");
        if (table == nullptr) DBUG_RETURN(true);

        if (alloc_ref_item_slice(thd, fbidx)) DBUG_RETURN(true);

        if (change_to_use_tmp_fields(
                thd, ref_items[fbidx], tmp_fields_list[fbidx],
                tmp_all_fields[fbidx], curr_fields_list->elements,
                *curr_all_fields))
          DBUG_RETURN(true);

        m_windows[wno]->set_frame_buffer_param(par);
        m_windows[wno]->set_frame_buffer(table);
      }
...

执行阶段

执行调用栈

unit->first_select()->join->exec()->evaluate_join_record()->sub_select_op()->QEP_tmp_table::put_record()->end_write_wf()

重点在 end_write_wf() 这个接口上面 undefined.

整个window function 的计算是要有两个或者三个temp table 参与的 分别为: Input table: 对应于 qep_tab-1 这个表中是准备进行计算的window 窗口记录 output table: 对用于 qep_tab 这个表是用来写入窗口函数计算结果的 buffer_tmp_table: 如果之前setup 时候判断出需要使用 row_buffer, 那么在优化阶段 make_tmp_tables_info 也会创建一个临时表。

下面简述整个计算过程入下图所示: undefined

在这个过程中比较难理解的是第三个临时表作为 frame buffer 的使用和 process_buffered_windowing_record 从代码中注释给出的一个例子简述。

 SUM(A+FLOOR(B)) OVER (ROWS 2 FOLLOWING)

首先要做的事情是先把和 window frame 无关的函数计算都做完,然后把结果放入到 frame buffer 中,在 frame buffer 中判断 是否满足window frame 的集合 row 都已经计算了,(这个就是 process_buffered_windowing_record 做的事情)如果当前的结果不满足 window frame 的定义,我们把结果拿出来,再继续计算。整个过程是一个循环处理,直到最后 窗口函数确实计算完毕了,把结果也放回 frame buffer 中。进而继续计算一些非window function 算子。