取消
顯示的結果
而不是尋找
你的意思是:

如何處理在DLT源目錄中刪除文件?

BenLambert
新的貢獻者三世

我們有一個DLT管道使用autoloader檢測文件添加到源存儲桶。它讀取這些更新文件並添加新記錄到青銅流表。但是我們也想從青銅表自動刪除記錄源文件時刪除。有辦法完成這個沒有一個完整的刷新?我讀過關於DLT的新的變化數據捕獲功能,但我不清楚如何自動生成所需的CDC表。

1回複1

匿名
不適用

@Bennett蘭伯特:

是的,它是可以自動從青銅表刪除記錄源文件被刪除時,沒有做一個完整的刷新。實現這一目標的一個方法是利用變化數據捕獲(CDC)功能在磚三角洲。

δ疾控中心跟蹤更改表和生成一個變化表,記錄插入、更新和刪除操作在桌子上。您可以使用CDC表來跟蹤變化時銅表和刪除相應的記錄源文件被刪除。

使CDC三角洲表,您需要生成一個疾控中心使用三角洲表。日誌API。這裏有一個例子如何生成一個疾控中心表:

從pyspark.sql。進口current_timestampδ函數。表導入DeltaTable bronze_table_path =“/ mnt / s3 / /銅源”cdc_table_path =“/ mnt / s3 / /美國疾病控製與預防中心源”#為青銅表DeltaTable啟用疾控中心。forPath(火花,bronze_table_path) .generate (deltaPath = cdc_table_path) #得到表cdc_table = DeltaTable疾控中心。forPath(火花,cdc_table_path) #刪除記錄從青銅表刪除源文件時bronze_table = DeltaTable。forPath(火花,bronze_table_path) bronze_table.alias (b)。合並(cdc_table.alias (“c”),“b。<主鍵列> = c。< >主鍵列和c。= ' D '行動”).whenMatchedDelete () . execute () # CDC表的最新更新時間戳cdc_table.alias (“c”)。合並(bronze_table.alias (b))。whenMatchedUpdate(設置={“時間戳”:current_timestamp ()})。whenNotMatchedInsert(值={“時間戳”:current_timestamp ()}) . execute ()

在這個例子中,我們首先使CDC的青銅表通過生成一個疾控中心表。然後我們得到CDC表和使用它來從青銅表刪除相應的記錄刪除操作時檢測到。最後,我們更新CDC表最新的時間戳。

您可以運行這段代碼作為磚工作或管道自動化生成的過程的一部分CDC表從青銅表和刪除記錄。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map