取消
顯示的結果
而不是尋找
你的意思是:

三角洲生活表:讀取輸出

knawara
貢獻者

我想實現一個增量攝入邏輯以下列方式:

  1. 數據庫表DbUpdatedDate列
  2. 在初始加載我執行一個完整的數據庫表的副本
  3. 在增量加載我:
    1. 掃描的數據已經DLT看到最近DbUpdatedDate,我們已經(我們稱之為high_watermark)
    2. 查詢數據庫表與DbUpdatedDate > high_watermark隻獲取數據
    3. 我執行unionByName historized數據和新增量

3.1是我的問題——當我試圖讀取輸出,我越來越錯誤。

複製它的最小的例子:

進口dlt INITIAL_RUN = True @dlt。表def test_table():如果INITIAL_RUN:返回火花。createDataFrame ([{" id ": 1、“val”: " 1 "}, {" id ": 2,“val”:“2”},])其他:dlt.read @dlt (“test_table”)。表def test_table_copy (): df = dlt.read (test_table)打印(df.collect())返回df

當INITIAL_RUN是真的,一切工作正常。但在我翻轉錯誤(事先在運行它,所以表存在)我得到以下錯誤:

pyspark.sql.utils。AnalysisException:未能讀數據集“test_table”。數據集定義的管道,但不可能得到解決。

當我試圖用同樣的事情發生spark.table (“LIVE.test_table”)

閱讀從輸出是一個支持場景嗎?如果不是我怎麼能在這個工作嗎?

4回複4

fecavalc08
新的貢獻者三世

嗨@Chris Nawara,

我有同樣的問題。我試圖避免apply_changes但我們最終我實現它,我很快樂,我預期的嗬嗬

如果你有任何額外的列需要實現標準化,您可以簡單地從apply_changes表讀取並生成最終的表。

我的邏輯是這樣的

readStream - > dlt。基於dataframe - > dlt視圖。create_streaming_live_table - > dlt。apply_changes (stored_as_scd_type = 2) - > dlt。表(我不得不創建一個額外的表,因為我有一些列calculatated基於__START_AT和__END_AT apply_changes提供的)

這就跟你問聲好!

@Felipe Cavalcante你直接查詢數據庫嗎?或者你有一個如卡夫卡疾控中心流?換句話說,第一readStream讀取數據來自哪裏?

最好的問候,

克裏斯

fecavalc08
新的貢獻者三世

從一個adls嗨@Chris Nawara,我們讀。

BR

嗨@Felipe Cavalcante !在usecase我想讀取數據庫表,所以我想如果你閱讀從ADLS位置是不同的情況

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map