def upsertToDelta (microBatchOutputDF batchId): microBatchOutputDF.createOrReplaceTempView(“更新”)microBatchOutputDF._jdf.sparkSession ()。sql(阿”“合並成老使用更新u在u。id = o。id時,匹配不匹配時更新設置*然後插入*”“”)stream_new_df = spark.readStream.format(“δ”).load (new_data_frame_path) stream_old_df = spark.readStream.format(“δ”).load (old_data_frame_path) stream_old_df.createOrReplaceTempView stream_new_df.writeStream.format(“舊”)(“δ”)\ .option (“checkpointLocation”、" ") \ .option (“mergeSchema”,“真正的”)\ .option(“路徑”、“)\ .foreachBatch (upsertToDelta) \ .trigger(一次= true) \ .outputMode(“更新”)\ .table (" ")
我要執行該代碼但我得到以下錯誤:
數據源com.databricks.sql.transaction.tahoe.sources。DeltaDataSource不支持更新輸出模式
它工作了嗎?磚運行時也進口舊的技術(如一個工廠所使用的數據)
我認為你也可以重構代碼有點使用.start()在最後一行沒有.table()和變化有點def upsertToDelta隻使用這樣的(這是在scala中但類似python)邏輯https://docs.www.eheci.com/_static/notebooks/merge-in-streaming.html
lts @Hubert杜德克運行時版本是9.1。我想使用“.table() '因為我想有一個桌子在我metastore /目錄
@Hubert杜德克我也嚐試與10.2運行時和toTable()但它是相同的
在metastore表就注冊您的三角洲位置使用單獨的sql腳本(它足以做一次):
如果不存在your_db % sql創建表。your_table (id NOT NULL評論,……)用δ分區(partition_column)位置“path_to_your_delta”
@Hubert杜德克那樣工作。我有一個問題。我怎麼能包括查詢和刪除嗎?
microBatchOutputDF._jdf.sparkSession ()。sql(阿”“合並成老使用更新u在u。id = o。id時,匹配不匹配時更新設置*然後插入*”“”)
或者我可以從這個管道添加和刪除行。