PySpark流管道,從卡夫卡的話題,讀取數據時數據經曆通過各種變換,最後被合並成一個磚三角洲的表。一開始我們的數據加載到三角洲表通過合並函數如下考慮。
這傳入dataframe inc_df所有分區的數據。
合並成main_db。main_delta_table main_dt main_dt.continent上使用inc_df df = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company當匹配不匹配時更新設置*然後插入*
我們在表級別上執行上麵的查詢。
我給一個非常基本的圖下圖的過程。
但是我的三角洲表分區在大陸和年。例如,這是我的表分區的三角洲的樣子。
所以我試著實現合並分區級別,並試圖運行合並活動在多個分區平行。即我創建了獨立的管道與過濾器在查詢分區的水平。下麵的圖片可以看到。
合並成main_db。main_delta_table main_dt main_dt上使用inc_df df。大陸(非洲)和main_dt。年(“202301”)和main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company當匹配不匹配時更新設置*然後插入*
但是我看到一個錯誤和並發性。
com.databricks.sql.transaction.tahoe。ConcurrentAppendException:文件添加到分區(非洲大陸=,= 2021年)並發更新。請再次嚐試操作。
我知道錯誤告訴我,不能同時更新文件。但是我有大量的數據在生產和我不想執行合並表級別上有近10億條記錄沒有適當的過濾器。
Trial2:作為另一種方法,
但我麵臨著同樣的異常/錯誤。
Trial3:
我還做了另一個嚐試在不同的方法如前所述鏈接和“ConcurrentAppendException”部分的頁麵。
base_delta = DeltaTable.forPath(火花,s3: / / PATH_OF_BASE_DELTA_TABLE) base_delta.alias (“main_dt”)。合並(源= final_incremental_df.alias (df)、條件= " main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company大陸=非洲”).whenMatchedUpdateAll () .whenNotMatchedInsertAll () . execute ()
和
base_delta = DeltaTable.forPath(火花,s3: / / PATH_OF_BASE_DELTA_TABLE) base_delta.alias (“main_dt”)。合並(源= final_incremental_df.alias (df)、條件= " main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company大陸=亞洲”).whenMatchedUpdateAll () .whenNotMatchedInsertAll () . execute ()
我跑上麵的合並操作在兩個單獨的管道。但我仍然麵臨著同樣的問題。
有人能讓我知道如何設計和優化我的流管道將數據合並到三角洲表分區級別通過多個作業平行(單個分區上運行的工作)
@bobbysidhartha:
當數據合並到一個分區並行三角洲表,重要的是要確保每個工作隻訪問和修改文件的分區,以避免並發問題。實現這一目標的一個方法是使用分區級鎖來防止多個用戶的並行更新相同的分區。
這裏是一個例子如何修改您PySpark流管道將數據合並到一個分區的三角洲表並行:
下麵是一個示例代碼片段來幫助你開始:
#得到δ表的分區鍵partition_keys =[“大陸”、“年”]#遍曆每個分區並創建一個火花工作為大陸更新(“非洲”、“亞洲”、“歐洲”,…):年(“2021”、“2022”、“2023”,…):#創建一個過濾器為當前分區partition_filter = f“大陸={大陸}和年={一}”#創建一個增量表對象當前分區delta_table = DeltaTable。”forPath(火花,f /道路/ / delta_table /{大陸}/{一}”)#獲得一個鎖在當前分區delta_table \ .toDF() \其中(partition_filter) \ .write \ .option (“partitionBy”、“,”. join (partition_keys)) \ .option (“replaceWhere”, partition_filter) \ .format(“δ”)\ .mode(“追加”)\ .save (delta_table._data_path) #當前分區上執行合並操作delta_table \ .alias (main_dt) \ .merge(源= inc_df.alias (df)、條件= " main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company”) \ .whenMatchedUpdateAll () \ .whenNotMatchedInsertAll () \ . execute() #釋放鎖定當前分區delta_table \ .toDF() \其中(partition_filter) \ .write \ .option (“partitionBy”、“,”. join (partition_keys)) \ .option (“replaceWhere”、“1 = 1”) \ .format(“δ”)\ .mode(“追加”)\ .save (delta_table._data_path)
請注意,上麵的代碼僅僅是一個例子,可能需要修改以適合您的特定用例。另外,一定要測試這種方法在運行前的一小部分數據在整個數據集,以確保它能夠正常工作。