Author: 程训焘
Hash join 是利用 hash 函数来实现和加速数据库中 join 操作的一类算法。其主要优势来源于 hash 函数可以只通过一次运算就将任意键值映射到固定大小、固定值域的 hash 值。在实践中,针对等值 join 所需的等值比较,一般数据库系统会仔细选择和优化 hash 函数或函数簇,使其能够快速缩小需要和一个键值进行等值比较的其它键值的数量或范围,从而实现了通过减少计算量、内外存访问量等手段来降低 join 算法的执行开销。
经典的 hash join 算法(又称 Simple Hash Join,SHJ)包括两步:
Partitioned Hash Join (分区 hash join,PHJ)算法在多核架构上逐渐取得了性能上的优势。和 SHJ 算法相比,PHJ 算法共用了完全相同的 build 和 probe 两个步骤,但是 PHJ 在 build 之前会先对输入 relation 进行 partition,然后针对每个 partition 应用 SHJ 算法。在这个算法过程中,我们有很多不同的选择:
Hash join 算法的常见性能问题包括同步所需的锁开销、cache miss、TLB miss、多线程间负载不均衡、NUMA 等等,下文简述一二。
上面我们简述了基本的算法概念和常见性能问题,现在我们举一个具体的性能优化的代码实现例子:如何使用 SIMD 指令对 hash 冲突的处理进行向量化。 Hash 冲突是 hash 表上的常见现象,即多个记录 hash 到了相同的 hash bucket 中。Hash 冲突对执行开销和内存空间开销的影响在不同的 hash 表实现中是不同的,但常见共性问题包括需要额外的 overflow space 来存储 hash 表本身装不下的有冲突的记录,以及冲突造成的互斥锁等待等。 向量化是常见的 hash join 实现优化技术,根据 Amdahl’s law,其加速效果取决于 hash join 算法的全过程中有多少部分能够被完完全全的向量化,数据尽可能多地存储在向量寄存器中,从而得到更高的加速比。如果对 hash 冲突的处理不能实现向量化,那么向量化的加速效果就会打折扣了。向量化指令一般属于 SIMD 类型(Single Instruction Multiple Data),如果一个向量中的多个 key 值内出现了 hash conflict,理论上就出现了至少两个 branch(无冲突 key 的处理和有冲突 key 的处理),而 SIMD 一定永远在向量上使用相同的指令,此处如何处理理论上的 branch 呢? 解决办法是使用 SIMD 的 gather 和 scatter 命令,以下是 AVX512 中处理 32位整形数据的 SIMD gether 和 scatter 的相应伪代码。
__m512i _mm512_i32gather_epi32 (__m512i vindex, void const* base_addr, int scale)
{
FOR j := 0 to 15
i := j*32
m := j*32
addr := base_addr + SignExtend64(vindex[m+31:m]) * ZeroExtend64(scale) * 8
dst[i+31:i] := MEM[addr+31:addr]
ENDFOR
dst[MAX:512] := 0
}
void _mm512_mask_i32scatter_epi32 (void* base_addr, __mmask16 k, __m512i vindex, __m512i a, int scale)
{
FOR j := 0 to 15
i := j*32
m := j*32
IF k[j]
addr := base_addr + SignExtend64(vindex[m+31:m]) * ZeroExtend64(scale) * 8
MEM[addr+31:addr] := a[i+31:i]
FI
ENDFOR
}
从中可以看出,对于一个装有 16 个 32 位整形数的 512 位向量来说,虽然表面上看一条读指令在这 16 个数(可以理解为地址 offset)应该同时并行执行,但指令内部其实是存在一个 for loop 的。也就是说,假如这个 for loop 的第 5 个和第 10 个 iteration 都往相同的 offset 写入数据,那么靠后的第 10 个 iteration 会覆盖前面的第 5 个 iteration 写入的结果。 利用这个特性,对于一个存储了 16 个 hash 值(每一个 hash 值对应 array 中的一个 offset 位置)的向量 V0 来说,我们只需要把它按照 scatter 的方式先写进内存中,再通过 gather 读回来获得 V1,然后比较 V0 和 V1 的内容,凡是不相等的 SIMD lane 即为因 hash 冲突而被覆盖的地方。我们只需利用 mask 将其标记出来,放在待处理的向量中留到下个 iteration 再插入 hash 表即可。具体的实现举例如下:
if (size >= 16) do {
// replace keys & payloads processed in the previous iteration with new values from the memory
key = _mm512_mask_expandloadu_epi32 (key, k, &keys[i]);
val = _mm512_mask_expandloadu_epi32 (val, k, &vals[i]);
off = _mm512_mask_xor_epi32(off, k, off, off);
i += _mm_countbits_64(_mm512_kconcatlo_64(blend_0000, k));
// hash keys
__m512i factors = _mm512_mask_blend_epi32(k, mask_factor_2, mask_factor_1);
__m512i buckets = _mm512_mask_blend_epi32(k, mask_buckets_minus_1, mask_buckets);
__m512i hash = simd_hash(key, factors, buckets);
// combine with old offset and fix overflows
off = _mm512_add_epi32(off, hash);
k = _mm512_cmpge_epu32_mask(off, mask_buckets);
off = _mm512_mask_sub_epi32(off, k, off, mask_buckets);
// load keys from table and detect conflicts
__m512i tab = _mm512_i32gather_epi32(off, table, 8);
k = _mm512_cmpeq_epi32_mask(tab, mask_empty);
_mm512_mask_i32scatter_epi32(table, k, off, mask_pack, 8);
tab = _mm512_mask_i32gather_epi32(tab, k, off, table, 8);
k = _mm512_mask_cmpeq_epi32_mask(k, tab, mask_pack);
// mix keys and payloads in pairs
__m512i key_tmp = _mm512_permutevar_epi32(mask_pack, key);
__m512i val_tmp = _mm512_permutevar_epi32(mask_pack, val);
__m512i lo = _mm512_mask_blend_epi32(blend_AAAA, key_tmp, _mm512_swizzle_epi32(val_tmp, _MM_SWIZ_REG_CDAB));
__m512i hi = _mm512_mask_blend_epi32(blend_5555, val_tmp, _mm512_swizzle_epi32(key_tmp, _MM_SWIZ_REG_CDAB));
// store valid pairs
_mm512_mask_i32loscatter_epi64(table, k, off, lo, 8);
__mmask16 rev_k = _mm512_kunpackb (k,k>>8);
__m512i rev_off = _mm512_permute4f128_epi32(off, _MM_PERM_BADC);
_mm512_mask_i32loscatter_epi64(table, rev_k, rev_off, hi, 8);
off = _mm512_add_epi32(off, mask_1);
} while (i <= size_minus_16);