摘要:若聚合函數(shù)不可以下推,則保持不變。目前當且僅當兩種情況下不可以并行執(zhí)行存在某個聚合函數(shù)參數(shù)為時。
作者:徐懷宇聚合算法執(zhí)行原理
在 SQL 中,聚合操作對一組值執(zhí)行計算,并返回單個值。TiDB 實現(xiàn)了 2 種聚合算法:Hash Aggregation 和 Stream Aggregation。
我們首先以 AVG 函數(shù)為例(案例參考 Stack Overflow),簡述這兩種算法的執(zhí)行原理。
假設表 t 如下:
列 a | 列 b |
---|---|
1 | 9 |
1 | -8 |
2 | -7 |
2 | 6 |
1 | 5 |
2 | 4 |
SQL: select avg(b) from t group by a, 要求將表 t 的數(shù)據(jù)按照 a 的值分組,對每一組的 b 值計算平均值。不管 Hash 還是 Stream 聚合,在 AVG 函數(shù)的計算過程中,我們都需要維護 2 個中間結果變量 sum 和 count。Hash 和 Stream 聚合算法的執(zhí)行原理如下。
Hash Aggregate 的執(zhí)行原理在 Hash Aggregate 的計算過程中,我們需要維護一個 Hash 表,Hash 表的鍵為聚合計算的 Group-By 列,值為聚合函數(shù)的中間結果 sum 和 count。在本例中,鍵為 列 a 的值,值為 sum(b) 和 count(b)。
計算過程中,只需要根據(jù)每行輸入數(shù)據(jù)計算出鍵,在 Hash 表中找到對應值進行更新即可。對本例的執(zhí)行過程模擬如下。
輸入數(shù)據(jù) a b | Hash 表 [key] (sum, count) |
---|---|
1 9 | [1] (9, 1) |
1 -8 | [1] (1, 2) |
2 -7 | [1] (1, 2) ?[2] (-7, 1) |
2 6 | [1] (1, 2) ?[2] (-1, 2) |
1 5 | [1] (6, 3) ?[2] (-1, 2) |
2 4 | [1] (6, 3) ?[2] (3, 3) |
輸入數(shù)據(jù)輸入完后,掃描 Hash 表并計算,便可以得到最終結果:
Hash 表 | avg(b) |
---|---|
[1] (6, 3) | 2 |
[2] (3, 3) | 1 |
Stream Aggregate 的計算需要保證輸入數(shù)據(jù)按照 Group-By 列有序。在計算過程中,每當讀到一個新的 Group 的值或所有數(shù)據(jù)輸入完成時,便對前一個 Group 的聚合最終結果進行計算。
對于本例,我們首先對輸入數(shù)據(jù)按照 a 列進行排序。排序后,本例執(zhí)行過程模擬如下。
輸入數(shù)據(jù) | 是否為新 Group 或所有數(shù)據(jù)輸入完成 | (sum, count) | avg(b) |
---|---|---|---|
1 9 | 是 | (1, 9) | 前一個 Group 為空,不進行計算 |
1 -8 | 否 | (2, 1) | |
1 5 | 否 | (3, 6) | |
2 -7 | 是 | (1, -7) | 2 |
2 6 | 否 | (2, -1) | |
2 4 | 否 | (3, 3) | |
是 | 1 |
因為 Stream Aggregate 的輸入數(shù)據(jù)需要保證同一個 Group 的數(shù)據(jù)連續(xù)輸入,所以 Stream Aggregate 處理完一個 Group 的數(shù)據(jù)后可以立刻向上返回結果,不用像 Hash Aggregate 一樣需要處理完所有數(shù)據(jù)后才能正確的對外返回結果。當上層算子只需要計算部分結果時,比如 Limit,當獲取到需要的行數(shù)后,可以提前中斷 Stream Aggregate 后續(xù)的無用計算。
當 Group-By 列上存在索引時,由索引讀入數(shù)據(jù)可以保證輸入數(shù)據(jù)按照 Group-By 列有序,此時同一個 Group 的數(shù)據(jù)連續(xù)輸入 Stream Aggregate 算子,可以避免額外的排序操作。
TiDB 聚合函數(shù)的計算模式由于分布式計算的需要,TiDB 對于聚合函數(shù)的計算階段進行劃分,相應定義了 5 種計算模式:CompleteMode,F(xiàn)inalMode,Partial1Mode,Partial2Mode,DedupMode。不同的計算模式下,所處理的輸入值和輸出值會有所差異,如下表所示:
AggFunctionMode | 輸入值 | 輸出值 |
---|---|---|
CompleteMode | 原始數(shù)據(jù) | 最終結果 |
FinalMode | 中間結果 | 最終結果 |
Partial1Mode | 原始數(shù)據(jù) | 中間結果 |
Partial2Mode | 中間結果 | 進一步聚合的中間結果 |
DedupMode | 原始數(shù)據(jù) | 去重后的原始數(shù)據(jù) |
以上文提到的 select avg(b) from t group by a 為例,通過對計算階段進行劃分,可以有多種不同的計算模式的組合,如:
CompleteMode
此時 AVG 函數(shù)的整個計算過程只有一個階段,如圖所示:
Partial1Mode --> FinalMode
此時我們將 AVG 函數(shù)的計算過程拆成兩個階段進行,如圖所示:
除了上面的兩個例子外,還可能有如下的幾種計算方式:
聚合被下推到 TiKV 上進行計算(Partial1Mode),并返回經(jīng)過預聚合的中間結果。為了充分利用 TiDB server 所在機器的 CPU 和內存資源,加快 TiDB 層的聚合計算,TiDB 層的聚合函數(shù)計算可以這樣進行:Partial2Mode --> FinalMode。
當聚合函數(shù)需要對參數(shù)進行去重,也就是包含 DISTINCT 屬性,且聚合算子因為一些原因不能下推到 TiKV 時,TiDB 層的聚合函數(shù)計算可以這樣進行:DedupMode --> Partial1Mode --> FinalMode。
聚合函數(shù)分為幾個階段執(zhí)行, 每個階段對應的模式是什么,是否要下推到 TiKV,使用 Hash 還是 Stream 聚合算子等都由優(yōu)化器根據(jù)數(shù)據(jù)分布、估算的計算代價等來決定。
TiDB 并行 Hash Aggregation 的實現(xiàn) 如何構建 Hash Aggregation 執(zhí)行器構建邏輯執(zhí)行計劃 時,會調用 NewAggFuncDesc 將聚合函數(shù)的元信息封裝為一個 AggFuncDesc。 其中 AggFuncDesc.RetTp 由 AggFuncDesc.typeInfer 根據(jù)聚合函數(shù)類型及參數(shù)類型推導而來;AggFuncDesc.Mode 統(tǒng)一初始化為 CompleteMode。
構建物理執(zhí)行計劃時,PhysicalHashAgg 和 PhysicalStreamAgg 的 attach2Task 方法會根據(jù)當前 task 的類型嘗試進行下推聚合計算,如果 task 類型滿足下推的基本要求,比如 copTask,接著會調用 newPartialAggregate 嘗試將聚合算子拆成 TiKV 上執(zhí)行的 Partial 算子和 TiDB 上執(zhí)行的 Final 算子,其中 AggFuncToPBExpr 函數(shù)用來判斷某個聚合函數(shù)是否可以下推。若聚合函數(shù)可以下推,則會在 TiKV 中進行預聚合并返回中間結果,因此需要將 TiDB 層執(zhí)行的 Final 聚合算子的 AggFuncDesc.Mode 修改為 FinalMode,并將其 AggFuncDesc.Args 修改為 TiKV 預聚合后返回的中間結果,TiKV 層的 Partial 聚合算子的 AggFuncDesc 也需要作出對應的修改,這里不再詳述。若聚合函數(shù)不可以下推,則 AggFuncDesc.Mode 保持不變。
構建 HashAgg 執(zhí)行器時,首先檢查當前 HashAgg 算子是否可以并行執(zhí)行。目前當且僅當兩種情況下 HashAgg 不可以并行執(zhí)行:
存在某個聚合函數(shù)參數(shù)為 DISTINCT 時。TiDB 暫未實現(xiàn)對 DedupMode 的支持,因此對于含有 DISTINCT 的情況目前僅能單線程執(zhí)行。
系統(tǒng)變量 tidb_hashagg_partial_concurrency 和 tidb_hashagg_final_concurrency 被同時設置為 1 時。這兩個系統(tǒng)變量分別用來控制 Hash Aggregation 并行計算時候,TiDB 層聚合計算 partial 和 final 階段 worker 的并發(fā)數(shù)。當它們都被設置為 1 時,選擇單線程執(zhí)行。
若 HashAgg 算子可以并行執(zhí)行,使用 AggFuncDesc.Split 根據(jù) AggFuncDesc.Mode 將 TiDB 層的聚合算子的計算拆分為 partial 和 final 兩個階段,并分別生成對應的 AggFuncDesc,設為 partialAggDesc 和 finalAggDesc。若 AggFuncDesc.Mode == CompleteMode,則將 TiDB 層的計算階段拆分為 Partial1Mode --> FinalMode;若 AggFuncDesc.Mode == FinalMode,則將 TiDB 層的計算階段拆分為 Partial2Mode --> FinalMode。進一步的,我們可以根據(jù) partialAggDesc 和 finalAggDesc 分別 構造出對應的執(zhí)行函數(shù)。
并行 Hash Aggregation 執(zhí)行過程詳述TiDB 的并行 Hash Aggregation 算子執(zhí)行過程中的主要線程有:Main Thead,Data Fetcher,Partial Worker,和 Final Worker:
Main Thread 一個:
啟動 Input Reader,Partial Workers 及 Final Workers
等待 Final Worker 的執(zhí)行結果并返回
Data Fetcher 一個:
按 batch 讀取子節(jié)點數(shù)據(jù)并分發(fā)給 Partial Worker
Partial Worker 多個:
讀取 Data Fetcher 發(fā)送來的數(shù)據(jù),并做預聚合
將預聚合結果根據(jù) Group 值 shuffle 給對應的 Final Worker
Final Worker 多個:
讀取 PartialWorker 發(fā)送來的數(shù)據(jù),計算最終結果,發(fā)送給 Main Thread
Hash Aggregation 的執(zhí)行階段可分為如下圖所示的 5 步:
啟動 Data Fetcher,Partial Workers 及 Final Workers。
這部分工作由 prepare4Parallel 函數(shù)完成。該函數(shù)會啟動一個 Data Fetcher,多個 Partial Worker 以及 多個 Final Worker。Partial Worker 和 Final Worker 的數(shù)量可以分別通過 tidb_hashgg_partial_concurrency 和 tidb_hashagg_final_concurrency 系統(tǒng)變量進行控制,這兩個系統(tǒng)變量的默認值都為 4。
DataFetcher 讀取子節(jié)點的數(shù)據(jù)并分發(fā)給 Partial Workers。
這部分工作由 fetchChildData 函數(shù)完成。
Partial Workers 預聚合計算,及根據(jù) Group Key shuffle 給對應的 Final Workers。
這部分工作由 HashAggPartialWorker.run 函數(shù)完成。該函數(shù)調用 updatePartialResult 函數(shù)對 DataFetcher 發(fā)來數(shù)據(jù)執(zhí)行 預聚合計算,并將預聚合結果存儲到 partialResultMap 中。其中 partialResultMap 的 key 為根據(jù) Group-By 的值 encode 的結果,value 為 PartialResult 類型的數(shù)組,數(shù)組中的每個元素表示該下標處的聚合函數(shù)在對應 Group 中的預聚合結果。shuffleIntermData 函數(shù)完成根據(jù) Group 值 shuffle 給對應的 Final Worker。
Final Worker 計算最終結果,發(fā)送給 Main Thread。
這部分工作由 HashAggFinalWorker.run 函數(shù)完成。該函數(shù)調用 consumeIntermData 函數(shù) 接收 PartialWorkers 發(fā)送來的預聚合結果,進而 合并 得到最終結果。getFinalResult 函數(shù)完成發(fā)送最終結果給 Main Thread。
Main Thread 接收最終結果并返回。
TiDB 并行 Hash Aggregation 的性能提升此處以 TPC-H query-17 為例,測試并行 Hash Aggregation 相較于單線程計算時的性能提升。引入并行 Hash Aggregation 前,它的計算瓶頸在 HashAgg_35。
該查詢執(zhí)行計劃如下:
在 TiDB 中,使用 EXPLAIN ANALYZE 可以獲取 SQL 的執(zhí)行統(tǒng)計信息。因篇幅原因此處僅貼出 TPC-H query-17 部分算子的 EXPLAIN ANALYZE 結果。
HashAgg 單線程計算時:
查詢總執(zhí)行時間 23 分 24 秒,其中 HashAgg 執(zhí)行時間約 17 分 9 秒。
HashAgg 并行計算時(此時 TiDB 層 Partial 和 Final 階段的 worker 數(shù)量都設置為 16):
總查詢時間 8 分 37 秒,其中 HashAgg 執(zhí)行時間約 1 分 4 秒。
并行計算時,Hash Aggregation 的計算速度提升約 16 倍。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.hztianpu.com/yun/17856.html
摘要:部分主要流程如下把上文提到語法解析階段會把語句中相關信息轉換成然后負責把結構轉換即的元信息。最后把的元信息追加到的元信息中,具體實現(xiàn)在這里。會把要刪除的分區(qū)從元信息刪除掉,刪除前會做的檢查。 作者:肖亮亮 Table Partition 什么是 Table Partition Table Partition 是指根據(jù)一定規(guī)則,將數(shù)據(jù)庫中的一張表分解成多個更小的容易管理的部分。從邏輯上看...
閱讀 991·2021-09-07 09:58
閱讀 1559·2021-09-07 09:58
閱讀 2942·2021-09-04 16:40
閱讀 2552·2019-08-30 15:55
閱讀 2485·2019-08-30 15:54
閱讀 1414·2019-08-30 15:52
閱讀 483·2019-08-30 10:49
閱讀 2651·2019-08-29 13:21