取消
顯示的結果
而不是尋找
你的意思是:

跑進delta.exceptions。ConcurrentAppendException即使設置S3多集群環境通過S3發電機DB LogStore寫道

KiranKondamadug
新的貢獻者二世

我的用例是過程的分區數據集價值100年代並發性。數據分區,他們是脫節的。我正麵臨ConcurrentAppendException由於S3不支持“put-if-absent”保證一致性。從三角洲湖1.2,S3DynamoDBLogStore API的幫助下,所有作家跨多個集群和/或火花司機可以同時寫三角洲湖S3同時確保隻有一個作家與每筆交易成功。我的三角洲湖的版本是2.1。我創建了一個發電機DB表啟用了自動伸縮的讀/寫的數量和配置傳遞到三角洲工作。請找到下麵的配置(省略了一些火花相關配置)。

火花= SparkSession \

.builder \

.appName \(“三角洲作業”)

. config (“spark.driver。記憶”,arg遊戲\ [“spark_driver_memory”])

. config (“spark.executor。記憶”,arg遊戲\ [“spark_executor_memory”])

. config (“spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName args [" log_table_name "]) \

. config (“spark.io.delta.storage.S3DynamoDBLogStore.ddb.region args [" log_region "]) \

.getOrCreate ()

spark.sparkContext.setLogLevel(“警告”)

請找到下麵的實際邏輯:

delta_table.alias(“舊”).merge (

input_df.alias(“新”),

“老。{primary_key} = new。\ {primary_key}”)

.whenMatchedDelete(條件=坳(f“老。{primary_key}”) .isin (deletes_df)) \

.whenMatchedUpdateAll () \

.whenNotMatchedInsertAll () \

. execute ()

delta_table三角洲湖的目標表。

input_df是聯合所有插入的數據幀,和更新。

deletes_df dataframe剛剛刪除的。

我還跑到delta.exceptions.ConcurrentAppendException無論這些設置。我做錯了什麼嗎?

1接受解決方案

接受的解決方案

Debayan
尊敬的貢獻者三世
尊敬的貢獻者三世

嗨,您可以參考https://docs.www.eheci.com/optimizations/isolation-level.html conflict-exceptions重新檢查,如果一切都好了。

請讓我們知道如果這可以幫助,也請與你的下一個標記@Debayan響應將通知我,謝謝!

在原帖子查看解決方案

1回複1

Debayan
尊敬的貢獻者三世
尊敬的貢獻者三世

嗨,您可以參考https://docs.www.eheci.com/optimizations/isolation-level.html conflict-exceptions重新檢查,如果一切都好了。

請讓我們知道如果這可以幫助,也請與你的下一個標記@Debayan響應將通知我,謝謝!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map