取消
顯示的結果
而不是尋找
你的意思是:

使自適應查詢執行和結構化流foreachBatch基於成本的優化器

tlecomte
新的貢獻者三世

親愛的磚社區,

我用火花結構化流將數據從白銀和黃金在ETL時尚。源流是一個增量的變化數據提要表銀。流dataframe轉換和與幾個(non-streamed)三角洲表。

由於多個連接,任務顯著傾斜。所以我們認為,操作將大大受益於自適應查詢執行和基於成本的優化器。這兩個選項是默認禁用流數據集。我的理解的評論https://github.com/apache/spark/blob/87a235c2143449bd8da0acee4ec3cd99993155bb/sql/core/src/main/scal..。是,這些選項被禁用,因為不兼容的狀態流查詢(stream-stream連接或流聚合)。因為我流查詢是無狀態的,我已經測試了使兩個選擇。

代碼是這樣的:

val streamingDF = spark.readStream.format(“δ”)。選項(“readChangeFeed”,值= true) .load (“silver.sourceTable”) val joinedTableDF = spark.read.format .load(“δ”)(“silver.joinedTable”) streamingDF.as (“T”) . join (joinedTableDF.as (“TJoin”)、“TJoin美元。JoinKey T.JoinKey“= = = $) .select .writeStream (…)。foreachBatch (outputDf: DataFrame報價:長)= >{/ /啟用AQE和國會預算辦公室自流式查詢性能是無狀態outputDf.sparkSession.conf.set (SQLConf.ADAPTIVE_EXECUTION_ENABLED。關鍵,“真正的”)outputDf.sparkSession.conf.set (SQLConf.CBO_ENABLED。關鍵,“真正的”)val goldTable = DeltaTable.forName val mergeCondition = matchingColumns (“gold.targetTable”)。地圖(x = > s”黃金。$ x = change_data。$ x”)。mkString(”和“)goldTable.as(“黃金”).merge (latestChangeData.as (“change_data”),“黃金。關鍵= change_data。關鍵”).whenMatched (“change_data。_change_type =‘刪除’”)delete () .whenMatched (“change_data。_change_type = update_postimage”) .updateAll () .whenNotMatched (“change_data。_change_type ! =‘刪除’”).insertAll () . execute ()}) .start ()

當測試3.3香草火花在本地獨立集群,或在CI管道,查詢是否按預期的方式工作。然而,當運行相同的代碼在磚集群(11.3 LTS運行時),我得到以下錯誤:

org.apache.spark。SparkException:意想不到的構建計劃執行人端廣播加入:ReusedStaticSubplan交換SinglePartition EXECUTOR_BROADCAST, [plan_id = 1687939],交換SinglePartition EXECUTOR_BROADCAST, [plan_id = 1619980] org.apache.spark.sql.execution.joins.ExecutorBroadcast .getShuffleIdFromPlan美元(ExecutorBroadcast.scala: 160)美元org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast lzycompute (BroadcastHashJoinExec.scala: 73) org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast (BroadcastHashJoinExec.scala: 71) org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast (BroadcastHashJoinExec.scala: 301) org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation (BroadcastHashJoinExec.scala: 321) org.apache.spark.sql.execution.joins.HashJoin.codegenInner (HashJoin.scala: 391)…

這是不正確的,使AQE和國會預算辦公室?

有什麼具體磚火花的需要調整,以避免“意想不到的構建計劃執行人廣播加入”錯誤呢?

非常感謝你的幫助!

2接受解決方案

接受的解決方案

Lingesh
新的貢獻者三世

不推薦有AQE流查詢你共享相同的原因的描述。它已經記錄在這裏

在原帖子查看解決方案

tlecomte
新的貢獻者三世

這篇文章是我的問題的答案:自適應查詢執行結構化流|磚的博客

總之:需要確認,磚13.1運行時使AQE默認情況下在non-Photon集群foreachBatch下沉。期待它使光子集群。

在原帖子查看解決方案

6個回答6

Debayan
尊敬的貢獻者三世
尊敬的貢獻者三世

嗨,搜索錯誤我得到了一個變更請求,請你確認是否這有助於:https://issues.apache.org/jira/browse/spark - 17556

tlecomte
新的貢獻者三世

謝謝debayan。火星- 17556看起來有點陳舊。有兩個鏈接拉請求都是封閉但不合並,我找不到上麵的文件從加在這些2中。看起來的代碼運行在磚火花實際上是不同的。

Hubert_Dudek1
尊敬的貢獻者三世

批有多大?不確定AQE將提供任何優勢有一些限製(斜上方連接256 MB,甚至激活傾斜連接)。廣播相反隻有當數據很小,數據沒有分區工人激流機製將會不習慣。在我看來,這沒有意義。

tlecomte
新的貢獻者三世

好問題。名義場景流來自變化數據捕獲來自數據源的記錄。這些批次通常小,峰值~ 10 k記錄批量操作時發生在數據源。然而,即使當批本身很小,它是與更大的表。

相同的流也用於(re)初始化表內容通過發送全表的快照內容從數據源。在這種情況下,批量可以上升到500米記錄,GB的數據。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map