我想實現一個增量攝入邏輯以下列方式:
3.1是我的問題——當我試圖讀取輸出,我越來越錯誤。
複製它的最小的例子:
進口dlt INITIAL_RUN = True @dlt。表def test_table():如果INITIAL_RUN:返回火花。createDataFrame ([{" id ": 1、“val”: " 1 "}, {" id ": 2,“val”:“2”},])其他:dlt.read @dlt (“test_table”)。表def test_table_copy (): df = dlt.read (test_table)打印(df.collect())返回df
當INITIAL_RUN是真的,一切工作正常。但在我翻轉錯誤(事先在運行它,所以表存在)我得到以下錯誤:
pyspark.sql.utils。AnalysisException:未能讀數據集“test_table”。數據集定義的管道,但不可能得到解決。
當我試圖用同樣的事情發生spark.table (“LIVE.test_table”)
閱讀從輸出是一個支持場景嗎?如果不是我怎麼能在這個工作嗎?
嗨@Chris Nawara,
我有同樣的問題。我試圖避免apply_changes但我們最終我實現它,我很快樂,我預期的嗬嗬
如果你有任何額外的列需要實現標準化,您可以簡單地從apply_changes表讀取並生成最終的表。
我的邏輯是這樣的
readStream - > dlt。基於dataframe - > dlt視圖。create_streaming_live_table - > dlt。apply_changes (stored_as_scd_type = 2) - > dlt。表(我不得不創建一個額外的表,因為我有一些列calculatated基於__START_AT和__END_AT apply_changes提供的)
從一個adls嗨@Chris Nawara,我們讀。
BR