親愛的磚社區,
我用火花結構化流將數據從白銀和黃金在ETL時尚。源流是一個增量的變化數據提要表銀。流dataframe轉換和與幾個(non-streamed)三角洲表。
由於多個連接,任務顯著傾斜。所以我們認為,操作將大大受益於自適應查詢執行和基於成本的優化器。這兩個選項是默認禁用流數據集。我的理解的評論https://github.com/apache/spark/blob/87a235c2143449bd8da0acee4ec3cd99993155bb/sql/core/src/main/scal..。是,這些選項被禁用,因為不兼容的狀態流查詢(stream-stream連接或流聚合)。因為我流查詢是無狀態的,我已經測試了使兩個選擇。
代碼是這樣的:
val streamingDF = spark.readStream.format(“δ”)。選項(“readChangeFeed”,值= true) .load (“silver.sourceTable”) val joinedTableDF = spark.read.format .load(“δ”)(“silver.joinedTable”) streamingDF.as (“T”) . join (joinedTableDF.as (“TJoin”)、“TJoin美元。JoinKey T.JoinKey“= = = $) .select .writeStream (…)。foreachBatch (outputDf: DataFrame報價:長)= >{/ /啟用AQE和國會預算辦公室自流式查詢性能是無狀態outputDf.sparkSession.conf.set (SQLConf.ADAPTIVE_EXECUTION_ENABLED。關鍵,“真正的”)outputDf.sparkSession.conf.set (SQLConf.CBO_ENABLED。關鍵,“真正的”)val goldTable = DeltaTable.forName val mergeCondition = matchingColumns (“gold.targetTable”)。地圖(x = > s”黃金。$ x = change_data。$ x”)。mkString(”和“)goldTable.as(“黃金”).merge (latestChangeData.as (“change_data”),“黃金。關鍵= change_data。關鍵”).whenMatched (“change_data。_change_type =‘刪除’”)delete () .whenMatched (“change_data。_change_type = update_postimage”) .updateAll () .whenNotMatched (“change_data。_change_type ! =‘刪除’”).insertAll () . execute ()}) .start ()
當測試3.3香草火花在本地獨立集群,或在CI管道,查詢是否按預期的方式工作。然而,當運行相同的代碼在磚集群(11.3 LTS運行時),我得到以下錯誤:
org.apache.spark。SparkException:意想不到的構建計劃執行人端廣播加入:ReusedStaticSubplan交換SinglePartition EXECUTOR_BROADCAST, [plan_id = 1687939],交換SinglePartition EXECUTOR_BROADCAST, [plan_id = 1619980] org.apache.spark.sql.execution.joins.ExecutorBroadcast .getShuffleIdFromPlan美元(ExecutorBroadcast.scala: 160)美元org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast lzycompute (BroadcastHashJoinExec.scala: 73) org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast (BroadcastHashJoinExec.scala: 71) org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast (BroadcastHashJoinExec.scala: 301) org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation (BroadcastHashJoinExec.scala: 321) org.apache.spark.sql.execution.joins.HashJoin.codegenInner (HashJoin.scala: 391)…
這是不正確的,使AQE和國會預算辦公室?
有什麼具體磚火花的需要調整,以避免“意想不到的構建計劃執行人廣播加入”錯誤呢?
非常感謝你的幫助!