Author: kunyu
POLARIS是微软Azure Synapse的分布式查询引擎,由于近来数据的数量与多样性都在极快的增长,相较于传统数仓的ETL再进行分析的步骤,直接面向DataLake的数据分析服务在使用体验上较前者会更好,因此微软开发了POLARIS用于取代Azure的SQL DW,该系统主要一个比较有趣的点是系统的设计大量复用了已有的组件,整个引擎几乎到处都可以看到SQL Server的影子。
POLARIS 的架构和snowflake 的架构似乎相似,都是为了彻底分离存储层与计算层,如图所示:
总体分三部分组成:
data cell是一块数据的抽象,对cell的抽象细节在论文中并没有多讲,但是我猜测可能是一个类似与ToSQLServerTable()
这样的接口,将若干data cell转换为SQL Server中的表放入compute server的local disk中,这样的好处是对上层的执行节点(SQL Server)改动比较小,比较符合整个系统的设计考量。
POLARIS对输入的数据做了双重分区,第一重是哈希分区,哈希分区的主要是用于拆分query到不同的分区执行;另一重则是通过用户自定义的partition进行分区(如果用户提供了partition算法),partition算法主要用于做pruning,减少不必要的数据访问,为每个compute server上的执行加速。
分布式查询与单机查询的一个区别是分布式查询需要将query分发到多个机器上执行(实际上单机上的并行查询也有类似的性质),这要求输入的数据必须按照某种方式切分,为此POLARIS引入了distribution properties,用来描述数据集的分区状况:
在这之后POLARIS引入了required distribution properties的概念,因为部分算子为了并行的执行,会对输入的数据分布有一定要求(比如hash join要求数据按key分区),这个词就是用来形容这些要求的,比如hash join的要求,用论文中这一套形容一下就是 \(P{\bowtie}^{a=b}Q: \{\{P^{[a]}\and Q^{[b]}\}\or P^{1}\or Q^{1}\}\) 这个性质要求join的两个表必须按照join key做分区,或其中一张表小到可以被广播到各个计算节点上,POLARIS中的很多算子都有对应的required distribution properties,为了使数据重新分布以满足这些要求,我们需要算子来对数据做re-partition,POLARIS为执行计划中添加了两个算子:
上面的Op是用来改变数据分布以符合算子的equired distribution properties的,现在我们通过在cascade optimizer通过为每个算子添加enforcer(即算子的required distribution properties),再给予HashOp与BroadcastOp合适的cost,单机数据库系统的优化器就可以用于处理分布式查询了,以下是优化器处理Join的一个例子,在枚举过程中,不满足required distribution properties的计划将会被丢弃,之后从可用的plan中按照cost择优选择即可:
关于各个算子的required distribution properties文中附录都列举了出来,如果想要了解可以直接查看原论文的附录。cascade optimizer相关的概念可以参看Greenplum的优化器Orca,比较细致的讲解了优化器相关的概念
通过上文的分布式优化器,我们可以得到一个合适的物理计划,实际上,SQL中的查询计划可以视作一个有向无环图(DAG),无论是单机执行器还是分布式执行器,本质上都是自底向上执行查询计划中对应节点,POLARIS的执行流程很简单:自底向上执行每个节点,每个节点的执行会被拆分为多个task进行分布式执行,直到整个计划执行完毕,如图所示:
图中的红框部分是要执行的节点,每个节点会被拆分为若干task,每个task由3部分组成:
我们现在可以基于task实现对query的分发,以下是一个Hash Join的例子:
文中还提到了可以根据计算节点的缓存状况以及节点的负载来选择适合的计算节点执行查询,在POLARIS中,计算节点的execution service会提供提供相关信息给DQP,DQP将结合这些信息决定task分发到哪些计算节点执行
上面两节已经描述了一个POLARIS集群在理想状况(没有节点故障,资源足够)下如何处理用户的查询,因此我们还需要处理一些现实世界中的分布式系统的常见问题:
这一节主要讲讲POLARIS如何解决这两个问题
POLARIS的容错机制实现基于其计算存储分离的特性,其计算节点只负责两件事:
因此一旦某个任务超时或失败,我们可以在任意一个正常的节点重试部分任务,为了实现这一点,POLARIS将任务分为5种状态:
查询的处理过程现在实际上已经变成了各个节点的状态转移过程,在这个过程种query DAG中各个节点的状态不断转移,直到根节点的状态变为success意味着query执行完成。通过在DQP层添加一个状态机,我们就可以优雅的实现任务的重试,从而实现我们的容错机制。从另一个角度看,状态机结合状态转移时的日志能够很容易复现调度与执行时的BUG,即使query很复杂也可以通过调度日志很好的发现问题。
下图左半部分是DQP节query task DAG对应的状态机:如果叶子节点的任务失败,就直接重试(failed$\rightarrow$run),如果是非叶子节点的任务失败,则分为两种情况:
下图右半部分则是一个执行query的例子,展示了POLARIS如何重试task以从失败中恢复
至于扩缩容,由于POLARIS的计算节点是无状态的,我们可以随时为集群增加计算节点,增加的节点不会影响增加时集群中正在运行的任务,新的查询将会基于扩容后的集群进行调度执行,缩容也类似。
这一章讲讲POLARIS是如何实现基于资源的任务调度的,回想单个任务的调度情况:所有ready状态的任务以一定顺序执行完毕,随后他们的父任务状态将会改为ready并参与调度,现在由于资源有限,在调度执行之前需要检查一下任务需要的资源情况,POLARIS利用WorkLoad Graph来处理这个问题:
其实没有什么特别的地方,只是给每个查询计划对应的节点添加了一个所需资源数目的属性而已。在这里我们调度的实际上不是这两个查询,而是图中的10个task,我们需要决定哪些task允许被执行(ready$\rightarrow$run),因此我们的调度对象是所有状态为ready的task,考虑到这些,POLARIS给出了一个短小的调度算法:
这个调度算法做的事情非常简单:每次被唤醒就将所有当前ready状态的task加入队列,然后依照调度策略取出下一个应该被执行的任务,如果我们能够满足资源要求,就执行该任务。
这个算法提供了一个调度策略的配置(图中的SchedulingPolicy
),文中提到了三种调度策略:
这个问题实际上和内存分配的策略是相似的,所以不同的策略实质上是资源利用率/响应时间的取舍
POLARIS和snowflake同为云原生数仓,整体的架构实际具有相似性,但是除此之外,论文清晰的向我们阐述了如何从单机数据库(SQL Server)演进到云原生数仓,对于从单机分析型数据库出发转向分布式数据库的产品有一定参考意义