當查詢計劃有BroadcastNestedLoopJoin時禁用廣播

當查詢計劃有BroadcastNestedLoopJoin時,如何禁用廣播。

寫的亞當Pavlacka

最後發布時間:2022年5月23日

本文解釋如何在查詢計劃具有BroadcastNestedLoopJoin在物理計劃中。

您希望廣播在禁用廣播閾值後停止,通過設置spark.sql.autoBroadcastJoinThreshold-1,但Apache Spark試圖廣播更大的表,但廣播錯誤失敗。

這種行為不是bug,但是它可能是出乎意料的。我們將審查預期的行為,並為該問題提供一個緩解選項。

創建表

首先創建兩個表,其中一個表的值為空table_withNull另一個沒有空值tblA_NoNull

%sql sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull") sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

試圖禁用廣播

我們試圖通過設置禁用廣播spark.sql.autoBroadcastJoinThreshold的子查詢條款。

% sql spark.conf.set(“spark.sql。autoBroadcastJoinThreshold", -1) sql("select * from table_withNull where id not in(select id from tblA_NoNull)").explain(true)

如果您檢查查詢計劃,BroadcastNestedLoopJoin是這種情況下最後的退路了。即使在試圖關閉廣播後,它仍然出現。

== Physical Plan == *(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)):- *(2)文件可以parquet默認。table_withnull[id#2482L] Batched: true, DataFilters:[],格式:Parquet,位置:InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange IdentityBroadcastMode, [id=#2586] +- *(1) FileScan Parquet默認。tbla_nonull[id#2483L] Batched: true, DataFilters:[],格式:Parquet,位置:InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct . dbafilters: []

如果正在處理的數據足夠大,當Spark試圖廣播表時,會導致廣播錯誤。

重寫查詢使用不存在而不是

可以通過重寫查詢來解決此問題不存在而不是

%sql //它可以重寫為NOT EXISTS,這將成為一個常規的連接:sql(“select * from table_withNull where NOT EXISTS (select 1 from tblA_NoNull where table_withNull.”id = tblA_NoNull.id)”).explain(真正的)

通過使用不存在,查詢運行與SortMergeJoin

== Physical Plan == SortMergeJoin [id#2482L], [id#2483L], LeftAnti:- Sort [id#2482L ASC NULLS FIRST], false, 0: +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]: +- *(1) FileScan parquet default. [id#2482L], [id#2483L], LeftAnti:-table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- Sort [id#2483L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#2483L, 200), [id=#2656] +- *(2) Project [id#2483L] +- *(2) Filter isnotnull(id#2483L) +- *(2) FileScan Parquet default。tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)],格式:Parquet,位置:InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [isnotnull(id)], ReadSchema: struct

解釋

Spark不會自動執行此操作,因為Spark和SQL對空處理的語義略有不同。

在SQL中,不是在對象中有任何空值不是在值,結果為空。這就是為什麼它隻能用BroadcastNestedLoopJoin.所有不是在值必須是已知的,以確保集合中沒有空值。

例如筆記本電腦

這本筆記本有一個完整的例子,說明了為什麼Spark不能自動切換BroadcastNestedLoopJoinSortMergeJoin

檢查BroadcastNestedLoopJoin例如筆記本電腦