好的。所以我想我可能錯過了顯而易見的,把自己在海裏。
這是場景:
這都很好地工作。
現在,我需要確定當記錄已被刪除(刪除),不再出現在批次。
SCD表,這將導致記錄的狀態改變從“當前”到“前”
我不管理為實現這一目標。
這是有關管道代碼:
#自動裝卸機拿起文件導入dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) -- extract the rows from the json (each file is a complete dataset with a few hundred rows) CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest AS SELECT col.* ,"current" AS status ,file_modification_time FROM ( SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students) FROM STREAM(LIVE.currStudents_streamFiles) AS fi ) WHERE col.id IS NOT NULL ; #run some de-duplication due to most records being identical each time import dlt from pyspark.sql.functions import lit @dlt.table def currStudents_dedup(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return ( df.dropDuplicates([col for col in df.columns if col != "file_modification_time"]) .select('*') .withColumn('status', lit('current')) ) -- capture SCD2 change history CREATE OR REFRESH STREAMING LIVE TABLE students_SCD; APPLY CHANGES INTO live.currStudents_SCD FROM STREAM(live.currStudents_dedup) KEYS (id) SEQUENCE BY file_modification_time STORED AS SCD TYPE 2 TRACK HISTORY ON * EXCEPT (file_modification_time) -- match the latest batch (from json file) against "current" version and identify missing records -- attempt to push identified records back through SCD with a new status "former" -- DLT pipeline doesn't like INSERT in the apply changes into .... CREATE TEMPORARY LIVE VIEW former_students_view AS SELECT *, "former" AS status -- all records from the last batch processed FROM ( SELECT * FROM STREAM(live.currStudents_ingest) WHERE file_modification_time = ( SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles) ) ) t1 WHERE NOT EXISTS ( -- "current" version of the table held in Databricks SELECT 1 FROM ( SELECT schoolId FROM STREAM(live.students_SCD) WHERE `__END_AT` IS NULL AND status != "former" ) t2 WHERE t1.schoolId = t2.schoolId ); APPLY CHANGES INTO live.currStudents_dedup FROM former_students_view KEYS (schoolId) INSERT ALL
所有的幫助感激地接受。
我知道我可能會用錯誤的方式。
@Kearon McNicol:
這聽起來像你試圖使用提要SCD表的表作為識別的源數據的變化。如果我理解正確的話,這可能會導致循環依賴和衝突。
為了避免這些問題,你可能要考慮使用一個單獨的源表或視圖識別數據的變化。這應該創建源表或視圖基於識別變化的業務邏輯,不應該有任何循環依賴SCD表或源數據。
一旦你源表或視圖,您可以使用MERGE語句DBR 12.1有效更新SCD表基於源表或視圖中確定的變化。
記住,MERGE語句需要仔細考慮加入的條件和更新邏輯來避免衝突,確保正確的結果。這可能有助於測試MERGE語句的一個小子集的數據在運行完整的數據集。