桶式存儲通常用於Hive和Spark SQL中,通過消除Join或group-by-aggregate場景中的Shuffle來提高性能。這對於字節跳動的各種寫一次和讀多次的數據集來說是理想的。
然而,Spark SQL bucket有各種限製:
這些努力的直接結果是,我們看到字節跳動整個數據倉庫中利用桶存儲的查詢增長了90%以上。在本次演講中,我們將介紹如何設計和實現一種新的桶機製,以解決上述所有限製,並顯著提高連接和按聚合分組的性能。
-大家好,感謝大家參加這次會議。主題是“通過移除Shuffle改進SPARK SQL性能的下一代bucket”。我叫郭軍,我的英文名字叫Jason。我是字節跳動公司數據引擎團隊的負責人。讓我介紹一下我們是誰以及我們的工作。我們是字節跳動的數據引擎團隊。我們為OLAP構建了一Beplay体育安卓版本站式體驗平台,用戶可以在平台上構建PB級數據倉庫,並通過編寫SQL來分析PB級數據,而無需關心底層的執行引擎。我們提供開放的API和自助服務平台。Beplay体育安卓版本此外,我們還優化了Spark SQL, Presto和Hive數據引擎。
同時我們為字節跳動的許多業務線設計數據架構。我的演講將由四個部分組成,第一部分是字節跳動的Spark SQL。下一個問題是,桶是什麼?然後我將介紹Spark bucket的限製,最後我將說明優化Spark bucket的方法。讓我們進入第一部分,Spark SQL如何在字節跳動工作。我們在2015年將Spark引入字節跳動。
在2016年之前,我們隻是運行Spark SQL進行小規模的實驗,在2017年底,Spark SQL支持字節跳動的部分ad-hoc查詢工作負載。然後在2018年底,Spark SQL支持大多數臨時查詢和生產中的一些ETL管道。在2019年底,Spark SQL將支持大部分臨時查詢和大部分生產中的ETL管道。現在Spark SQL是字節跳動公司數據倉庫領域的主要引擎。
讓我簡單介紹一下什麼是桶裝。現在,首先我們應該在Spark SQL中以兩種方式創建bucket表。在左邊,我們可以用堆棧的方式創建一個桶狀表。使用parquet創建一個表的順序,由user_id按user_id排序並分成1024個桶。
在這個例子中,我們可以說我們需要指定一個用戶,我們需要用CLUSTERED BY子句指定打包鍵集,我們還需要指定按鍵集排序,我們按子句排序。我們需要知道的是按鍵集排序的鍵集可能與CLUSTERED by鍵集不同。
在右邊我們可以創建一個。多麼兼容的桶形表。創建一個表order CLUSTERED BY user_id,按user_id分成1024個bucket,存儲為parquet。
被存儲為拚花而不是使用拚花,所以用這種方法我們創建了一個與Hive引擎兼容的表。當我們得到一個桶形表時,我們可以像這樣向桶形表中插入一些數據集。INSERT INTO order select order_id, user_id, product, amount FROM order_staging我們可以說,我們隻是將一些數據插入到bucket表中。比如如何將數據插入到非bucket表中?換句話說,當用戶向bucket中插入某個數據集時,她不需要知道表是否被bucket。當我們得到一個桶形表並填充該表時,我們如何從桶形表中獲益?
讓我們以ShuffledHashJoin為例。我們知道ShuffledHashJoin是Spark SQL中常用的shuffle機製之一。當選擇打亂哈希表時,Spark SQL需要確保兩個表是共分區的。也就是說,如果我們想要在user_id上聯接訂單表和表,Spark SQL需要確保它們在user_id上是共分區的。如果它們不是共分區,Spark SQL將添加一些shuffle。在Spark SQL中,我們使用交換操作符進行shuffle,因此在這個圖中,我們可以說交換節點。它們中的大多數都在user_id上,這樣交換後,它們中的大多數都在user_id上進行了共同分區,這樣就可以使用ShuffledHashJoin了。但如果它們都是桶形表,這意味著我們在填充它們之前將表設置為桶形表。然後,我們不需要在連接之前添加一些交換節點,因為大多數交換節點都是根據user_id進行預洗牌的。這就是為什麼我們要使用桶形桌子的原因。 Let’s go to the SortMergeJoin. SortMergeJoin is another commonly used join mechanism.
對於SortMergeJoin, SortMergeJoin要求子節點或(聽不清)應該是共分區的,而且它們應該在連接鍵集上排序。例如,我們是否希望連接訂單和user_id?Spark SQL將確保它們中的大多數根據user_id進行共分區,並且每個分區都按照user_id進行排序。否則,Spark SQL將添加交換操作符和排序操作符來確保這一點。但如果它們都是bucket表,並且在填充表之前也根據user_id進行排序。那麼我們不需要……
我們不需要再次添加exchange和sort節點,因為它們大部分都是預洗牌和預排序的。這就是為什麼我們要將表設置為桶形表。讓我來介紹一下當前Spark SQL bucket機製的局限性。最……
最重要的限製之一是小文件。以下麵的SQL為例,我們可以用INSERT INTO order SELECT order_id, user_id, product, amount FROM order_staging填充表。當SQL運行時,我們對單個任務文件夾中有多少文件進行快照。我們通過hdfs dfs -ls來計數,然後我們來計數文件。我們發現在一個任務文件夾下有988個文件。每個任務將生成最多1024個小文件。1024是桶號。請記住,我們創建了另一個有1024桶號的表,總共將有1024乘以M個小文件。M是任務編號。當M為1024時,將有多達100萬個小文件,而實際上M可能比1024大得多。 For example 10,000 ,then there will be up to 10 million small files.
我們知道對於hadoop echo係統來說,小文件可能會招致一些災難,因為當小文件太多的時候,Spark SQL需要頻繁地與HDFS NameNode進行通信,而HDFS NameNode是單點的,會成為瓶頸。所以我們不會等它這樣做,SQL會運行得非常慢。
即使我們完成了數據填充,下遊sql也會運行得非常慢,因為它們需要打開太多的小文件,這是非常慢的。所以我認為這應該是最大的限製之一。我們也可以用一些其他的機製來解決這個問題。例如,我們可以添加DISTRIBUTE BY子句,還需要將spark.sql.shuffle.partition配置設置為某個值。當1024是M的倍數時,最多有1024個文件,當M是1024的倍數時,最多有M個文件。
否則,仍然會有高達1024乘以M的小文件。M等於值spark.sql.shuffle.partitions。
是的,我們可以通過這個配置和DISTRIBUTE by子句來減少文件數量,但是任何用戶都很難設置這個配置,所以我們需要一種機製來自動減少文件數量。下一個限製是Spark SQL bucket在SQL引擎之間是不兼容的。例如,Spark SQL桶與Hive桶不同,Spark桶與Presto桶也不同。Presto和Hive bucket兼容。
它們不相容有兩個原因。將數據寫入Hive bucket表中。
一個額外的shuffle將被引入,以確保它應該減少任務,將寫入一個桶文件。但是對於Spark SQL來說,不會有額外的shuffle,所以每個任務都會寫入多達M個bucket文件。所有的文件,每個都很小。其次,它們使用不同的哈希機製。對於Hive, Hive將使用HiveHash,而對於Spark SQL將使用Murmur3,因此數據分布將非常不同。這就是他們不相容的原因。由於不兼容,所以在Spark SQL中用Hive bucket連接表或在Hive中用Spark bucket連接表時需要exchange和Sort。
2019年,我們將成千上萬的Hive SQL遷移到Spark SQL,但其中許多都使用桶形表。
所以當你必須讓它們兼容時,我們就可以自動遷移sql。
此外,在Spark SQL中,由於Spark SQL中的bucket將由多個文件組成,因此需要額外的排序,因此Spark必須對單個bucket內的文件進行排序,以確保整個bucket在join上排序,這樣才能使用SortMergeJoin。
Spark SQL的另一個限製是,如果用戶想要連接兩個表,如果他們想要使用Spark和join,他們中的大多數應該聚集在鍵上,並且桶號應該相同。例如,如果另一個表左邊有4096個桶,而用戶表右邊有1024個桶,那麼Spark會將用戶表交換為4096個桶,這樣它就可以與另一個表合並。所以在這張圖中,我們可以說,盡管它們都是桶表,但由於它們的桶號不同,所以引入了交換。另一個限製是,Spark SQL要求大多數表都放在與連接鍵集相同的鍵集上。例如,如果兩個表都在user_id上,但我們想通過user_id和location_id來連接它們,那麼exchange將為它們兩個都引入。下一個是當我們使用Union All條款時,將需要交換。例如,也許另一個將來自web和移動,我們想要聯合另一個web和另一個移動,然後用user_id加入他們。
在這個例子中,將引入exchange,因為在Union之後,outputPartitioning和outputOrdering將被設置為unknown,而Spark SQL無法知道底層表是bucket表,因此將引入exchange。讓我來介紹一下字節跳動如何優化bucket。
首先,我們將Spark bucket與Hive對齊。
之前我介紹過,兩者的區別在於文件號不同。對於Hive來說,每個桶隻有一個文件,但對於Spark來說,每個桶有多個文件。因此,我們需要確保在Spark中每個bucket都隻包含一個文件。還有一件事我們需要知道,Spark使用Murmur3Hash, Hive使用HiveHash,所以我們改變了Spark SQL
使用HiveHash當桶和當我們
讀或寫回表。
因此,我們需要做以下事情來確保Spark將,
Spark會像Hive一樣將數據寫入bucket表。首先,我們改變InsertIntoHiveTable計劃所需的分布,我們設置,
並且我們在桶形鍵上用HiveHash將值設置為HashClusteredDistribution,並且我們在桶形鍵上用升序覆蓋所需的排序為SortOrder。這樣,我們就能確保,
將引入一個額外的shuffle,以確保每個任務隻寫入一個bucket表,並且任務號將與bucket號相同。這樣就有M個桶文件,每個桶對應一個桶。
那麼我們如何以與Hive相同的方式讀取這些數據呢?
下一步是我們需要…Spark需要識別Hive桶表,所以我們覆蓋HiveTableScanExec的其他預分區到hashparpartitioning with HiveHash在桶鍵上,然後我們覆蓋outputOrdering into SortOrder在桶鍵上與acending。讓我用這張圖進一步說明。在左邊,我們看到沒有我們的改變,HiveTableScan的outputPartitioning是UnknownPartitioning, outputOrdering是Nil。這就是為什麼Spark仍然需要為SortMergeJoin添加exchange和SortNode,即使它們有bucket表。因為SortMergeJoin的requireChildDistribution是HashClusteredDistribution,而requireChildOrdering是SortOrder。
HiveTableScan的outputPartitioning是UnknownPartitioning,它不滿足SortMergeJoin的requireChildDistribution。好的。更改之後,我們將outputPartitioning設置為右側的HashPartitioning,這滿足要求
SortMergeJoin的requireChildDistribution,它是HashClusteredDistribution。並且HiveTableScan的outputPartitioning被更改為SortOrder,這可以滿足SortMergeJoin的requireChildOrdering,也就是SortOrder。所以交換和排序不再需要了。
這樣Spark SQL就可以從Hive bucket table中讀取數據,並且Spark SQL可以在沒有shuffle和sort的情況下連接兩個bucket table。
現在Spark SQL和Hive bucket表是兼容的。
我們要做的下一件事是支持一對多的桶連接。讓我們以這個為例。對於表A,我們看到有三個桶,對於表B,有六個桶。對於Spark,如果我們在Spark中加入它們,就需要額外的交換,因為它們有不同的存儲桶。但是在我們公司我們需要支持這個案子,不需要交換。第一種方法是將表B中的0號桶和3號桶合並,將表B中的1號桶和4號桶合並,將表B中的2號桶和5號桶合並,這樣合並後表B就有3個桶,可以與表A合並而不shuffle。在右邊我們可以看到,我們可以把它們和排序結合起來。所以物理的,就像它說的,我們掃描了表B並對表B排序,然後,你知道,它可以與表A連接而不需要洗牌。此外,我們還提供了另一種機製來支持一對多的桶連接,因為前麵的方法有一些局限性。並行度將是3,這可能太小了,我們提供了另一種機製,以便我們可以使用6作為並行度,將有6個任務使用桶連接。 But how can we do that? We can clone the Table A, so that Table A will have six buckets and mostly we clone them by a new mechanism named
桶聯盟。我們知道Spark支持一個名為Union的機製,但如果我們簡單地使用Union, Union操作符,outputPartitioning將是未知的,所以我們創建了另一個,我們提供了另一個新的(聽不清楚)Bucket Union,這樣outputPartitioning和outputOrdering就被保留了。因此在Bucket Union之後,它可以與表B連接,而不需要洗牌和排序。
是的,即使我們移除了洗牌和排序,還有一些其他問題。例如,如果B左連接A是B左半連接A或B反連接A,或B內連接A,這種機製工作得很好,但如果我們用B右連接A連接B和A,或B完全外部連接A,或B交叉連接A,將會有一些重複的記錄,因為我們隻是掃描了表A兩次,對嗎?
我們動態地添加一個過濾器來解決這個問題。例如,過濾器是這樣的,哈希,連接ID,然後是桶的數量
我們需要確保結果與桶id相同。讓我們以桶0為例。0號桶有6條記錄0,3,6,9,12和15,
我們需要的是0 6和12。所以我們對ID進行哈希然後用6和4 3 9和
15,這不是目標數字。當我們(喃喃)輸入6時,結果將是3,這與桶id 0不同。然後6 3 9 15會被取出來。有了這種機製,就不再有重複的記錄了。我們支持的下一件事是,我們不僅支持bucket key,還支持join。在左手邊,我們可以看到表X被桶裝在A上,A是列名,在右手邊,表Y被桶裝在A上,A也是桶列,列名。但是SQL查詢要求我們同時連接A和B上的表X和表Y,這樣交換和排序就減少了。
在A和B上交換,在A和B上排序。
這種限製要求用戶自行設計表格
桶式存儲非常小心,有時用戶很難找到最好的桶式存儲機製。例如,因為我們經常查詢如下所示,從表1組中按A、B、C、D進行選擇,然後從表2組中按B、C、D進行選擇,從表3組中按B、D、e進行選擇,有時用戶可能想要連接表1和表2
A, B或,在B, C上連接2和表3,在B, d上連接表1和表3,如何設置bucket key ?如果我們用A, B, C和D來代替表1,
所以當我們連接表1和表2時,我們可以使用桶連接。為了解決這些問題,我們支持一種機製,它支持多個鍵的連接。讓我們回到這個例子,表X被桶裝在A上,表Y被桶裝在B上,這樣我們就不需要引入交換,我們需要做的是我們可以在A和B上對表X和表Y進行排序。然後我們可以用SortMergeJoin連接表X和Y,而不交換節點。通過這種方法,我們可以很容易地設計桶鍵集。讓我們回到前一頁。在這個例子中,我們可以說,我們可以將列B設置為存儲機製。因為B是bucket鍵集,
當我們用新的桶連接表1和表2時,其中一個查詢可以從桶機製中受益,因為B是桶鍵集,而連接鍵集是B的超集。現在我們也可以用桶連接表2和表3,因為連接鍵集是B和C,它是桶鍵集的超集。最後一點是我們支持桶式進化。讓我們以這兩種情況為例。案例1,一個非桶狀表是按日期劃分的,用戶希望在沒有開銷的情況下將其轉換為桶狀表。
默認情況下,如果要更改桶表,則將非桶表作為桶表。
我們要麼將現有的詳細信息轉換為bucket分布,要麼在查詢現有數據時,查詢將失敗,因為現有數據還沒有被bucket覆蓋。案例2,桶號可能是X,由於數據量增加,用戶需要將桶號放大到2X。為了解決這些問題,我們提供了一種叫做桶式進化的機製。
為了實現這一點,我們將桶信息放入分區參數中。我們知道在Spark SQL中,桶信息在表屬性中,而不是在分區參數中。通過將桶信息放入分區參數中,我們可以知道每個分區的桶信息,並且隻有當所有目標分區都具有相同的桶信息時,該表才會被讀為桶表?否則,它將被讀取為非bucket表。例如,表是無桶的,今天我們將其轉換為桶表,今天的數據分區將按桶鍵集分布和排序。明天當我們查詢這個表時,如果唯一的目標分區是今天,那麼Spark將知道它是一個桶表,可以使用桶連接。但是如果查詢需要同時讀取今天和昨天的數據,Spark會發現目標分區具有不同的bucket信息。Spark會把這個表讀成一個非bucket表
將鬥型表讀取為非鬥型表隻會影響性能而不會影響正確性,因此在實踐中效果良好。
Bytedance
郭軍,字節跳動數據引擎團隊負責人。他的團隊專注於EB級數據平台的數據倉庫架構開發和優化。Beplay体育安卓版本Spark SQL是這個團隊中最重要的引擎之一,Spark SQL每天處理數百PB的數據。在加入字節跳動之前,他曾在思科和eBay工作,在那裏他專注於數據平台和數據倉庫基礎設施優化。Beplay体育安卓版本