取消
顯示的結果
而不是尋找
你的意思是:

過程批次流管道,確認刪除

Kearon
新的貢獻者三世

好的。所以我想我可能錯過了顯而易見的,把自己在海裏。

這是場景:

  1. 批處理數據集到json格式的Azure數據湖
  2. 每一批是一套完整的“當前”記錄(完整的表)
  3. 這些處理中使用自動裝卸機流管道(因為我們有其他相關流程運行流數據管道,因為自動裝卸機使生活簡單)
  4. 重複數據刪除從提取行
  5. 變化捕獲通過內置的DLT SCD2處理

這都很好地工作。

現在,我需要確定當記錄已被刪除(刪除),不再出現在批次。

SCD表,這將導致記錄的狀態改變從“當前”到“前”

我不管理為實現這一目標。

這是有關管道代碼:

#自動裝卸機拿起文件導入dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) -- extract the rows from the json (each file is a complete dataset with a few hundred rows) CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest AS SELECT col.* ,"current" AS status ,file_modification_time FROM ( SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students) FROM STREAM(LIVE.currStudents_streamFiles) AS fi ) WHERE col.id IS NOT NULL ; #run some de-duplication due to most records being identical each time import dlt from pyspark.sql.functions import lit @dlt.table def currStudents_dedup(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return ( df.dropDuplicates([col for col in df.columns if col != "file_modification_time"]) .select('*') .withColumn('status', lit('current')) ) -- capture SCD2 change history CREATE OR REFRESH STREAMING LIVE TABLE students_SCD; APPLY CHANGES INTO live.currStudents_SCD FROM STREAM(live.currStudents_dedup) KEYS (id) SEQUENCE BY file_modification_time STORED AS SCD TYPE 2 TRACK HISTORY ON * EXCEPT (file_modification_time) -- match the latest batch (from json file) against "current" version and identify missing records -- attempt to push identified records back through SCD with a new status "former" -- DLT pipeline doesn't like INSERT in the apply changes into .... CREATE TEMPORARY LIVE VIEW former_students_view AS SELECT *, "former" AS status -- all records from the last batch processed FROM ( SELECT * FROM STREAM(live.currStudents_ingest) WHERE file_modification_time = ( SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles) ) ) t1 WHERE NOT EXISTS ( -- "current" version of the table held in Databricks SELECT 1 FROM ( SELECT schoolId FROM STREAM(live.students_SCD) WHERE `__END_AT` IS NULL AND status != "former" ) t2 WHERE t1.schoolId = t2.schoolId ); APPLY CHANGES INTO live.currStudents_dedup FROM former_students_view KEYS (schoolId) INSERT ALL

所有的幫助感激地接受。

我知道我可能會用錯誤的方式。

11日回複11

匿名
不適用

@Kearon McNicol:

這聽起來像你試圖使用提要SCD表的表作為識別的源數據的變化。如果我理解正確的話,這可能會導致循環依賴和衝突。

為了避免這些問題,你可能要考慮使用一個單獨的源表或視圖識別數據的變化。這應該創建源表或視圖基於識別變化的業務邏輯,不應該有任何循環依賴SCD表或源數據。

一旦你源表或視圖,您可以使用MERGE語句DBR 12.1有效更新SCD表基於源表或視圖中確定的變化。

記住,MERGE語句需要仔細考慮加入的條件和更新邏輯來避免衝突,確保正確的結果。這可能有助於測試MERGE語句的一個小子集的數據在運行完整的數據集。

Vidula_Khanna
主持人
主持人

嗨@Kearon McNicol

謝謝你發布你的問題在我們的社區!我們很高興幫助你。

幫助我們為您提供最準確的信息,請您花一些時間來回顧反應和選擇一個最好的回答了你的問題嗎?

這也將有助於其他社區成員可能也有類似的問題在未來。謝謝你的參與,讓我們知道如果你需要任何進一步的援助!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map