我的用例是過程的分區數據集價值100年代並發性。數據分區,他們是脫節的。我正麵臨ConcurrentAppendException由於S3不支持“put-if-absent”保證一致性。從三角洲湖1.2,S3DynamoDBLogStore API的幫助下,所有作家跨多個集群和/或火花司機可以同時寫三角洲湖S3同時確保隻有一個作家與每筆交易成功。我的三角洲湖的版本是2.1。我創建了一個發電機DB表啟用了自動伸縮的讀/寫的數量和配置傳遞到三角洲工作。請找到下麵的配置(省略了一些火花相關配置)。
火花= SparkSession \
.builder \
.appName \(“三角洲作業”)
. config (“spark.driver。記憶”,arg遊戲\ [“spark_driver_memory”])
. config (“spark.executor。記憶”,arg遊戲\ [“spark_executor_memory”])
. config (“spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName args [" log_table_name "]) \
. config (“spark.io.delta.storage.S3DynamoDBLogStore.ddb.region args [" log_region "]) \
.getOrCreate ()
spark.sparkContext.setLogLevel(“警告”)
請找到下麵的實際邏輯:
delta_table.alias(“舊”).merge (
input_df.alias(“新”),
“老。{primary_key} = new。\ {primary_key}”)
.whenMatchedDelete(條件=坳(f“老。{primary_key}”) .isin (deletes_df)) \
.whenMatchedUpdateAll () \
.whenNotMatchedInsertAll () \
. execute ()
delta_table三角洲湖的目標表。
input_df是聯合所有插入的數據幀,和更新。
deletes_df dataframe剛剛刪除的。
我還跑到delta.exceptions.ConcurrentAppendException無論這些設置。我做錯了什麼嗎?