Author: $马腾
window function 是在满足某种条件的记录集合上执行一个特殊的函数。这一句话,记录集合就是窗口,特殊的函数就是在这个窗口上执行的函数。
SELECT
function_name OVER ( window_definition )
1. FROM (...)
window_definition
: 定义要计算的记录的集合。
function_name
: 指定对于集合要使用的函数。
接下来我们通过一个简单的例子看一个窗口函数:
SELECT
f.id, f.release_year,
f.category_id, f.rating,
AVG(rating) OVER
(PARTITION BY release_year) AS year_avg
FROM films f
这个语句是使用了partition by 作为了集合的定义,即把release year 相同的放在了一起,然后对集合进行 avg() 函数运算。 可以通过这样一个图来较为清晰的看到 .
我个人的理解是窗口函数的分类可以通过两个基准分,一个是窗口大小是否固定,一个是按照函数功能分类
窗口大小固定:静态窗口函数 窗口大小不固定,不同的记录对应着不同的窗口:动态(滑动)窗口函数
序号函数:row_number(); rank(); dense_rank(); 分布函数:percent_rank(); cume_dist(); 前后函数:lag(expr, n); // 返回当前行的前 n 行expr 的值; lead(expr, n); // 返回当前行的后n 行expr 的值 头尾函数:first(expr); // 返回第一个 expr 的值; last_value(expr); //返回最后一个expr 的值 其他函数:nth_value(expr, n); // 返回第 n 个expr 的值; ntile(n); // 将有序数据分为n 个桶,记录等级数
窗口函数的基本用法是
函数名 over 子句
over 是定义窗口集合的关键字; 一般有四种定义集合的方法
partition
字句: 窗口按照哪些字段进行分组。order by
字句: 按照哪些字段进行排序。frame
字句: frame 是当前分区的一个子集,子句用来定义子集的规则通常用来作为滑动窗口使用。滑动窗口有两种指定范围的方式,一种是基于行,一种是基于范围。
通常使用BETWEEN frame_start AND frame_end语法来表示行范围,frame_start和frame_end可以支持如下关键字,来确定不同的动态行记录, 也可以自己定义行来表示范围
CURRENT ROW 边界是当前行,一般和其他范围关键字一起使用 UNBOUNDED PRECEDING 边界是分区中的第一行 UNBOUNDED FOLLOWING 边界是分区中的最后一行 expr PRECEDING 边界是当前行减去expr的值 expr FOLLOWING 边界是当前行加上expr的值
select avg(amount) over(partition by user_no ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as row_num from order_info;
有一些范围不是可以直接使用行数表示,这种情况就要用范围,比如窗口范围是一周前的订单开始,截止到当前行; INTERVAL 7 DAY PRECEDING
Hint: 有些函数不管有没有用frame 子句,它的窗口都是固定的,使用这种函数一定是静态窗口函数
cume_dist() dense_rank() lag() lead() ntile() percent_rank() rank() row_number()
以下内容通过源码角度分析执行过程
Window::setup_windows
;
这个方法里面的核心接口是 Window::check_window_functions(THD *thd, SELECT_LEX *select)
a. 这个方法中首先判断的是当前是静态窗口还是动态窗口; 静态窗口即判断了 frame 的定义是否有定义上下边界。m_static_aggregates
为 true, 意味着是静态窗口,同时对每一个分区都可以进行一次评估。
如果 ma_static_aggregates
为 false, 则进一步判断其滑动窗口使用的是基于范围还是基于行。
m_row_optimizable
基于行
m_range_optimizable
基于范围
b. 获取聚合函数作为窗口函数时候窗口的特殊规格要求 wfs->check_wf_semantics(thd, select, &reqs)
这个方法其实就是判断是不是需要 row_buffer 作为评估,什么时候需要使用 row_buffer 呢:如果我们只看当前分区的行无法进行正确的计算,我们需要看之后的或者之前的行,就需要使用row_buffer。
bool Window::check_window_functions(THD *thd, SELECT_LEX *select) {
List_iterator<Item_sum> li(m_functions);
Item *wf;
m_static_aggregates =
(m_frame->m_from->m_border_type == WBT_UNBOUNDED_PRECEDING &&
m_frame->m_to->m_border_type == WBT_UNBOUNDED_FOLLOWING);
// If static aggregates, inversion isn't necessary
m_row_optimizable = (m_frame->m_unit == WFU_ROWS) && !m_static_aggregates;
m_range_optimizable = (m_frame->m_unit == WFU_RANGE) && !m_static_aggregates;
. ..
while ((wf = li++)) {
Evaluation_requirements reqs;
Item_sum *wfs = down_cast<Item_sum *>(wf);
if (wfs->check_wf_semantics(thd, select, &reqs)) return true;
m_needs_frame_buffering |= reqs.needs_buffer;
....
}
Window::check_window_functions(THD *thd, SELECT_LEX *select)
接口中 求得的 row_buffer 决定的,如果row_buffer 为 true 则需要创建一个 temp table....
if (m_windows[wno]->needs_buffering()) {
/*
Create the window frame buffer tmp table. We create a
temporary table with same contents as the output tmp table
in the windowing pipeline (columns defined by
curr_all_fields), but used for intermediate storage, saving
the window's frame buffer now that we know the window needs
buffering.
*/
Temp_table_param *par =
new (thd->mem_root) Temp_table_param(tmp_table_param);
par->m_window = nullptr; // Only OUT table needs access to Window
List<Item> tmplist(*curr_all_fields, thd->mem_root);
TABLE *table =
create_tmp_table(thd, par, tmplist, nullptr, false, false,
select_lex->active_options(), HA_POS_ERROR, "");
if (table == nullptr) DBUG_RETURN(true);
if (alloc_ref_item_slice(thd, fbidx)) DBUG_RETURN(true);
if (change_to_use_tmp_fields(
thd, ref_items[fbidx], tmp_fields_list[fbidx],
tmp_all_fields[fbidx], curr_fields_list->elements,
*curr_all_fields))
DBUG_RETURN(true);
m_windows[wno]->set_frame_buffer_param(par);
m_windows[wno]->set_frame_buffer(table);
}
...
执行调用栈
unit->first_select()->join->exec()->evaluate_join_record()->sub_select_op()->QEP_tmp_table::put_record()->end_write_wf()
重点在 end_write_wf() 这个接口上面 .
整个window function 的计算是要有两个或者三个temp table 参与的
分别为:
Input table: 对应于 qep_tab-1
这个表中是准备进行计算的window 窗口记录
output table: 对用于 qep_tab
这个表是用来写入窗口函数计算结果的
buffer_tmp_table: 如果之前setup 时候判断出需要使用 row_buffer, 那么在优化阶段 make_tmp_tables_info
也会创建一个临时表。
下面简述整个计算过程入下图所示:
在这个过程中比较难理解的是第三个临时表作为 frame buffer 的使用和 process_buffered_windowing_record
从代码中注释给出的一个例子简述。
SUM(A+FLOOR(B)) OVER (ROWS 2 FOLLOWING)
首先要做的事情是先把和 window frame 无关的函数计算都做完,然后把结果放入到 frame buffer 中,在 frame buffer 中判断 是否满足window frame 的集合 row 都已经计算了,(这个就是 process_buffered_windowing_record
做的事情)如果当前的结果不满足 window frame 的定义,我们把结果拿出来,再继续计算。整个过程是一个循环处理,直到最后 窗口函数确实计算完毕了,把结果也放回 frame buffer 中。进而继续计算一些非window function 算子。
process_buffered_windowing_record
这个方法中涉及了两种 move 滑动窗口的策略
分别是 native strategy 和 optimizable strategy; 而具体选择哪一种策略,正是在优化阶段 对 m_row_optimizable 和 m_row_optimizable 的赋值。 if (m_row_optimizable || m_row_optimizable)== true
就选择使用optimizable strategy.
两种策略的滑动方式如代码注释所述。
Moving (sliding) frames can be executed using a naive or optimized strategy
for aggregate window functions, like SUM or AVG (but not MAX, or MIN).
In the naive approach, for each row considered for processing from the buffer,
we visit all the rows defined in the frame for that row, essentially leading
to N*M complexity, where N is the number of rows in the result set, and M is
the number for rows in the frame. This can be slow for large frames,
obviously, so we can choose an optimized evaluation strategy using inversion.
This means that when rows leave the frame as we move it forward, we re-use
the previous aggregate state, but compute the *inverse* function to eliminate
the contribution to the aggregate by the row(s) leaving the frame, and then
use the normal aggregate function to add the contribution of the rows moving
into the frame