取消
顯示的結果
而不是尋找
你的意思是:

過程批次流管道,確認刪除

Kearon
新的貢獻者三世

好的。所以我想我可能錯過了顯而易見的,把自己在海裏。

這是場景:

  1. 批處理數據集到json格式的Azure數據湖
  2. 每一批是一套完整的“當前”記錄(完整的表)
  3. 這些處理中使用自動裝卸機流管道(因為我們有其他相關流程運行流數據管道,因為自動裝卸機使生活簡單)
  4. 重複數據刪除從提取行
  5. 變化捕獲通過內置的DLT SCD2處理

這都很好地工作。

現在,我需要確定當記錄已被刪除(刪除),不再出現在批次。

SCD表,這將導致記錄的狀態改變從“當前”到“前”

我不管理為實現這一目標。

這是有關管道代碼:

#自動裝卸機拿起文件導入dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) -- extract the rows from the json (each file is a complete dataset with a few hundred rows) CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest AS SELECT col.* ,"current" AS status ,file_modification_time FROM ( SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students) FROM STREAM(LIVE.currStudents_streamFiles) AS fi ) WHERE col.id IS NOT NULL ; #run some de-duplication due to most records being identical each time import dlt from pyspark.sql.functions import lit @dlt.table def currStudents_dedup(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return ( df.dropDuplicates([col for col in df.columns if col != "file_modification_time"]) .select('*') .withColumn('status', lit('current')) ) -- capture SCD2 change history CREATE OR REFRESH STREAMING LIVE TABLE students_SCD; APPLY CHANGES INTO live.currStudents_SCD FROM STREAM(live.currStudents_dedup) KEYS (id) SEQUENCE BY file_modification_time STORED AS SCD TYPE 2 TRACK HISTORY ON * EXCEPT (file_modification_time) -- match the latest batch (from json file) against "current" version and identify missing records -- attempt to push identified records back through SCD with a new status "former" -- DLT pipeline doesn't like INSERT in the apply changes into .... CREATE TEMPORARY LIVE VIEW former_students_view AS SELECT *, "former" AS status -- all records from the last batch processed FROM ( SELECT * FROM STREAM(live.currStudents_ingest) WHERE file_modification_time = ( SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles) ) ) t1 WHERE NOT EXISTS ( -- "current" version of the table held in Databricks SELECT 1 FROM ( SELECT schoolId FROM STREAM(live.students_SCD) WHERE `__END_AT` IS NULL AND status != "former" ) t2 WHERE t1.schoolId = t2.schoolId ); APPLY CHANGES INTO live.currStudents_dedup FROM former_students_view KEYS (schoolId) INSERT ALL

所有的幫助感激地接受。

我知道我可能會用錯誤的方式。

11日回複11

Kearon
新的貢獻者三世

看來我的DLT管道不支持合並操作。我需要改變一個運行時設置或更新嗎?

匿名
不適用

@Kearon McNicol:

合並。操作隻能在磚運行時的7.0及以後版本。如果您正在運行一個早期版本,您需要升級您的運行時使用合並操作。

Kearon
新的貢獻者三世

嗨@Suteja卡努裏人

我的管道運行DBR 12.1。

是因為我們試圖做一個合並表流住表嗎?

我丟失的東西,這是我的管道配置:

{" id ":“* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *”,“集群”:[{“標簽”:“默認”,“自動定量”:{“min_workers”: 1、“max_workers”: 2,“模式”:“增強”}}],“發展”:真的,“連續”:真的,“渠道”:“預覽”,“版”:“先進”、“光子”:假的,“庫”:[{“筆記本”:{“路徑”:“/回購/ * * * * * * * * * * * * / * * * * * * * * * * * * / DLT /出勤/ regPeriods / dlt_reg_periods_python}},。}),“名字”:“CABS-Streaming-Processes_Dev”,“存儲”:“dbfs: /管道/ * * * * * * * * * * * * * * * * * * * * * * * *”、“配置”:{”管道。enableTrackHistory”:“真正的”、“spark.databricks.secureVariableSubstitute。啟用”:“假”、“火花。databaseUser”:“{{秘密/ * * * * * * * * * * * / * * * * * * * * * * * * * *}}”、“火花。databasePassword”:“{{秘密/ * * * * * * * * * * / * * * * * * * * * * * * * * * * * * *}}”、“spark.databricks.delta.schema.autoMerge.enabled”:“真正的”},“目標”:“CABS_dataProcessing_Dev "}

匿名
不適用

@Kearon McNicol:

根據你提供的配置,看來你的管道使用三角洲湖(如所示

“spark.databricks.delta.schema.autoMerge.enabled”:“真正的”

配置參數),支持合並操作。然而,可能還有其他因素在起作用,導致合並操作失敗。

一種可能性是,合並操作遇到衝突是由於並發更新流來源。三角洲湖允許並發流和批處理來源寫道,但當多個作家試圖更新相同的記錄同時,可能出現衝突。三角洲湖提供不同的衝突解決策略來處理這些場景。您可能需要考慮使用三角洲湖的時間旅行功能執行合並表的快照,而不是生活表。這確保了合並操作並不衝突持續到表中寫道。

另一個可能性是三角洲湖DBR 12.1的版本不支持某些功能合並操作所必需的。您可能希望檢查版本注釋的三角洲湖是否有任何已知問題包含在DBR 12.1版本,可以導致合並操作失敗。如果是這種情況,升級到新版本的DBR可能解決這個問題。

Kearon
新的貢獻者三世

嗨@Suteja卡努裏人,

DBR 12.1支持,提高合並功能。

我的猜測是,問題在於我試圖利用表喂SCD表來確定變化——這有點圓和可能產生的衝突。

試圖讓我的頭圓在辦公室現在我回來了(走了很多)

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map