取消
顯示的結果
而不是尋找
你的意思是:

特林創建增量管道,但失敗了,當我試圖用outputMode“更新”

BorislavBlagoev
重視貢獻三世
def upsertToDelta (microBatchOutputDF batchId): microBatchOutputDF.createOrReplaceTempView(“更新”)microBatchOutputDF._jdf.sparkSession ()。sql(阿”“合並成老使用更新u在u。id = o。id時,匹配不匹配時更新設置*然後插入*”“”)stream_new_df = spark.readStream.format(“δ”).load (new_data_frame_path) stream_old_df = spark.readStream.format(“δ”).load (old_data_frame_path) stream_old_df.createOrReplaceTempView stream_new_df.writeStream.format(“舊”)(“δ”)\ .option (“checkpointLocation”、" ") \ .option (“mergeSchema”,“真正的”)\ .option(“路徑”、“)\ .foreachBatch (upsertToDelta) \ .trigger(一次= true) \ .outputMode(“更新”)\ .table (" ")

我要執行該代碼但我得到以下錯誤:

數據源com.databricks.sql.transaction.tahoe.sources。DeltaDataSource不支持更新輸出模式

9回複9

Hubert_Dudek1
尊敬的貢獻者三世

它工作了嗎?磚運行時也進口舊的技術(如一個工廠所使用的數據)

我認為你也可以重構代碼有點使用.start()在最後一行沒有.table()和變化有點def upsertToDelta隻使用這樣的(這是在scala中但類似python)邏輯https://docs.www.eheci.com/_static/notebooks/merge-in-streaming.html

lts @Hubert杜德克運行時版本是9.1。我想使用“.table() '因為我想有一個桌子在我metastore /目錄

@Hubert杜德克我也嚐試與10.2運行時和toTable()但它是相同的

Hubert_Dudek1
尊敬的貢獻者三世

在metastore表就注冊您的三角洲位置使用單獨的sql腳本(它足以做一次):

如果不存在your_db % sql創建表。your_table (id NOT NULL評論,……)用δ分區(partition_column)位置“path_to_your_delta”

@Hubert杜德克那樣工作。我有一個問題。我怎麼能包括查詢和刪除嗎?

microBatchOutputDF._jdf.sparkSession ()。sql(阿”“合並成老使用更新u在u。id = o。id時,匹配不匹配時更新設置*然後插入*”“”)

或者我可以從這個管道添加和刪除行。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map