数据库内核月报

数据库内核月报 - 2018 / 02

PgSQL · 应用案例 · 自定义并行聚合函数的原理与实践

Author: digoal

背景

PostgreSQL 9.6开始就支持并行计算了,意味着聚合、扫描、排序、JOIN等都开始支持并行计算。对于聚合操作来说,并行计算与非并行计算是有差异的。

例如avg聚合,对一张表进行计算时,一个任务中操作和多个并行任务操作,算法是不一样的。

PostgreSQL提供了一套标准的接口,可以支持聚合函数的并行操作。

自定义并行聚合的原理和例子

创建聚合函数的语法如下:

CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , SSPACE = state_data_size ]  
    [ , FINALFUNC = ffunc ]  
    [ , FINALFUNC_EXTRA ]  
    [ , COMBINEFUNC = combinefunc ]  
    [ , SERIALFUNC = serialfunc ]  
    [ , DESERIALFUNC = deserialfunc ]  
    [ , INITCOND = initial_condition ]  
    [ , MSFUNC = msfunc ]  
    [ , MINVFUNC = minvfunc ]  
    [ , MSTYPE = mstate_data_type ]  
    [ , MSSPACE = mstate_data_size ]  
    [ , MFINALFUNC = mffunc ]  
    [ , MFINALFUNC_EXTRA ]  
    [ , MINITCOND = minitial_condition ]  
    [ , SORTOP = sort_operator ]  
    [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]  
)  

相比非并行,多了一个过程,那就是combinefunc的过程(也叫partial agg)。

非并行模式的聚合流程大致如下:

循环  
sfunc( internal-state, next-data-values ) ---> next-internal-state  
  
最后调用一次(可选)  
ffunc( internal-state ) ---> aggregate-value  

pic

并行模式的聚合流程大致如下,如果没有写combinefunc,那么实际上聚合过程并没有实现并行而只是扫描并行:

pic

下面这个例子,我们可以观察到一个COUNT操作的并行聚合。

postgres=# set max_parallel_workers=4;  
SET  
postgres=# set max_parallel_workers_per_gather =4;  
SET  
postgres=# set parallel_setup_cost =0;  
SET  
postgres=# set parallel_tuple_cost =0;  
SET  
postgres=# alter table test set (parallel_workers =4);  
ALTER TABLE  
postgres=# explain (analyze,verbose,timing,costs,buffers) select count(*) from test;  
                                                                  QUERY PLAN                                                                     
-----------------------------------------------------------------------------------------------------------------------------------------------  
 -- final并行,可有可无,看具体的聚合算法  
 Finalize Aggregate  (cost=15837.02..15837.03 rows=1 width=8) (actual time=57.296..57.296 rows=1 loops=1)  
   Output: count(*)  
   Buffers: shared hit=3060  
   ->  Gather  (cost=15837.00..15837.01 rows=4 width=8) (actual time=57.287..57.292 rows=5 loops=1)  
         Output: (PARTIAL count(*))  
         Workers Planned: 4  
         Workers Launched: 4  
         Buffers: shared hit=3060  
           
	 -- 一下就是combinefunc完成的聚合并行(显示为PARTIAL agg)  
	 ->  Partial Aggregate  (cost=15837.00..15837.01 rows=1 width=8) (actual time=52.333..52.333 rows=1 loops=5)  
               Output: PARTIAL count(*)  
               Buffers: shared hit=12712  
               Worker 0: actual time=50.917..50.918 rows=1 loops=1  
                 Buffers: shared hit=2397  
               Worker 1: actual time=51.293..51.294 rows=1 loops=1  
                 Buffers: shared hit=2423  
               Worker 2: actual time=51.062..51.063 rows=1 loops=1  
                 Buffers: shared hit=2400  
               Worker 3: actual time=51.436..51.436 rows=1 loops=1  
                 Buffers: shared hit=2432  
               ->  Parallel Seq Scan on public.test  (cost=0.00..15212.00 rows=250000 width=0) (actual time=0.010..30.499 rows=200000 loops=5)  
                     Buffers: shared hit=12712  
                     Worker 0: actual time=0.013..30.343 rows=190269 loops=1  
                       Buffers: shared hit=2397  
                     Worker 1: actual time=0.010..30.401 rows=192268 loops=1  
                       Buffers: shared hit=2423  
                     Worker 2: actual time=0.013..30.467 rows=190350 loops=1  
                       Buffers: shared hit=2400  
                     Worker 3: actual time=0.009..30.221 rows=192861 loops=1  
                       Buffers: shared hit=2432  
 Planning time: 0.074 ms  
 Execution time: 60.169 ms  
(31 rows)  

了解了并行聚合的原理后,我们就可以写自定义聚合函数的并行计算了。

例子

例如我们要支持一个数组的聚合,并且在聚合过程中我们要实现对元素去重。

1、创建测试表

create table test(id int, col int[]);  

2、生成测试数据

CREATE OR REPLACE FUNCTION public.gen_arr(integer, integer)  
 RETURNS integer[]  
 LANGUAGE sql  
 STRICT  
AS $function$  
  select array(select ($1*random())::int from generate_series(1,$2));  
$function$;  
  
insert into test select random()*1000, gen_arr(500,10) from generate_series(1,10000);  

3、创建聚合函数

例子1,没有combinefunc,只支持扫描并行。

数组去重函数

postgres=# create or replace function uniq(int[]) returns int[] as $$  
  select array( select unnest($1) group by 1);  
$$ language sql strict parallel safe;  
CREATE FUNCTION  

数组合并与去重函数

postgres=# create or replace function array_uniq_cat(anyarray,anyarray) returns anyarray as $$  
  select uniq(array_cat($1,$2));   
$$ language sql strict parallel safe;  
CREATE FUNCTION  

聚合函数(不带COMBINEFUNC)

create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, PARALLEL=safe);  

并行查询例子:

postgres=# set max_parallel_workers=4;  
SET  
postgres=# set max_parallel_workers_per_gather =4;  
SET  
postgres=# set parallel_setup_cost =0;  
SET  
postgres=# set parallel_tuple_cost =0;  
SET  
postgres=# alter table test set (parallel_workers =4);  
ALTER TABLE  
postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  

很明显没有设置COMBINEFUNC时,未使用并行聚合。

postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  
                                                            QUERY PLAN                                                               
-----------------------------------------------------------------------------------------------------------------------------------  
 HashAggregate  (cost=4139.74..4141.74 rows=200 width=36) (actual time=602.957..603.195 rows=1001 loops=1)  
   Output: id, arragg(col)  
   Group Key: test.id  
   Buffers: shared hit=6  
   ->  Gather  (cost=0.00..163.37 rows=15748 width=36) (actual time=0.328..43.734 rows=10000 loops=1)  
         Output: id, col  
         Workers Planned: 4  
         Workers Launched: 4  
         Buffers: shared hit=6  
         -- 只有并行扫描,没有并行聚合。  
	 ->  Parallel Seq Scan on public.test  (cost=0.00..163.37 rows=3937 width=36) (actual time=0.017..0.891 rows=2000 loops=5)  
               Output: id, col  
               Buffers: shared hit=124  
               Worker 0: actual time=0.019..0.177 rows=648 loops=1  
                 Buffers: shared hit=8  
               Worker 1: actual time=0.022..0.180 rows=648 loops=1  
                 Buffers: shared hit=8  
               Worker 2: actual time=0.017..3.772 rows=7570 loops=1  
                 Buffers: shared hit=94  
               Worker 3: actual time=0.015..0.189 rows=648 loops=1  
                 Buffers: shared hit=8  
 Planning time: 0.084 ms  
 Execution time: 603.450 ms  
(22 rows)  

例子2,有combinefunc,支持并行聚合。

drop aggregate arragg(anyarray);  
  
create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, COMBINEFUNC = array_uniq_cat, PARALLEL=safe);   

使用了并行聚合。

postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  
                                                               QUERY PLAN                                                                  
-----------------------------------------------------------------------------------------------------------------------------------------  
 Finalize HashAggregate  (cost=1361.46..1363.46 rows=200 width=36) (actual time=285.489..285.732 rows=1001 loops=1)  
   Output: id, arragg(col)  
   Group Key: test.id  
   Buffers: shared hit=36  
   ->  Gather  (cost=1157.46..1159.46 rows=800 width=36) (actual time=63.654..74.163 rows=4297 loops=1)  
         Output: id, (PARTIAL arragg(col))  
         Workers Planned: 4  
         Workers Launched: 4  
         Buffers: shared hit=36  
         -- 并行聚合  
	 ->  Partial HashAggregate  (cost=1157.46..1159.46 rows=200 width=36) (actual time=57.367..57.727 rows=859 loops=5)  
               Output: id, PARTIAL arragg(col)  
               Group Key: test.id  
               Buffers: shared hit=886  
               Worker 0: actual time=54.788..54.997 rows=857 loops=1  
                 Buffers: shared hit=213  
               Worker 1: actual time=56.881..57.255 rows=861 loops=1  
                 Buffers: shared hit=213  
               Worker 2: actual time=55.415..55.813 rows=856 loops=1  
                 Buffers: shared hit=212  
               Worker 3: actual time=56.453..56.854 rows=838 loops=1  
                 Buffers: shared hit=212  
               ->  Parallel Seq Scan on public.test  (cost=0.00..163.37 rows=3937 width=36) (actual time=0.011..0.736 rows=2000 loops=5)  
                     Output: id, col  
                     Buffers: shared hit=124  
                     Worker 0: actual time=0.009..0.730 rows=1981 loops=1  
                       Buffers: shared hit=25  
                     Worker 1: actual time=0.012..0.773 rows=2025 loops=1  
                       Buffers: shared hit=25  
                     Worker 2: actual time=0.015..0.741 rows=1944 loops=1  
                       Buffers: shared hit=24  
                     Worker 3: actual time=0.012..0.751 rows=1944 loops=1  
                       Buffers: shared hit=24  
 Planning time: 0.073 ms  
 Execution time: 285.949 ms  
(34 rows)  

实际上并行聚合与分布式数据库聚合阶段原理是一样的,分布式数据库自定义聚合可以参考末尾的文章。

例子3,将多个一元数组聚合为一个一元数组

PostgreSQL内置的array_agg会将数组聚合为多元数组,有些场景无法满足需求。

                                    List of functions
   Schema   |          Name           | Result data type |  Argument data types  |  Type  
------------+-------------------------+------------------+-----------------------+--------
 pg_catalog | array_agg               | anyarray         | anyarray              | agg
 pg_catalog | array_agg               | anyarray         | anynonarray           | agg
postgres=# \set VERBOSITY verbose
postgres=# select array_agg(info) from (values(array[1,2,3]),(array[2,3,4,5])) t(info);
ERROR:  2202E: cannot accumulate arrays of different dimensionality
LOCATION:  accumArrayResultArr, arrayfuncs.c:5270
postgres=# select array_agg(info) from (values(array[1,2,3]),(array[3,4,5])) t(info);
     array_agg     
-------------------
 {\{1,2,3\},\{3,4,5\}}
(1 row)

如果要将数组合并为一元数组,可以自定义一个聚合函数如下:

postgres=# create aggregate arragg (anyarray) (sfunc = array_cat, stype=anyarray, PARALLEL=safe);  
CREATE AGGREGATE

postgres=# select arragg(info) from (values(array[1,2,3]),(array[3,4,5])) t(info);
    arragg     
---------------
 {1,2,3,3,4,5}
(1 row)

postgres=# select arragg(info) from (values(array[1,2,3]),(array[2,3,4,5])) t(info);
     arragg      
-----------------
 {1,2,3,2,3,4,5}
(1 row)

参考

https://www.postgresql.org/docs/10/static/sql-createaggregate.html

https://www.postgresql.org/docs/10/static/xaggr.html#XAGGR-PARTIAL-AGGREGATES

《PostgreSQL aggregate function customize》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》