我有一個從一個數據流管道吸入json文件使用自動裝卸機湖。這些文件被定期。主要是文件包含重複數據,但偶爾會有一些變化。我想處理這些文件到一個數據倉庫使用存儲為SCD 2選項的三角洲住表管道。一切都似乎工作好,但我得到分割表中重複的行——即使沒有數據變化。
在管道集群配置,我補充道
管道。enableTrackHistory真實
這是SQL SCD表:
創建或更新直播表currStudents_SCD;應用到生活變化。currStudents_SCD從流(live.currStudents_ingest)鍵(id)序列file_modification_time存儲為SCD 2型跟蹤曆史*除了(file_modification_time);
任何想法我為什麼得到這個和我怎麼能阻止這些副本嗎?
我找不到任何信息在文檔來幫助。除了可能模糊的暗示,化合物2,將新記錄時沒有列變化。但跟蹤曆史文檔似乎表明,隻有改變監控列將導致一個新記錄....
@Kearon McNicol:
你是正確的,如果你使用de-duped_stream函數,它會選擇最新版本的JSON文件,即使它與以前的版本相同。這意味著如果你每晚運行管道和檢索JSON文件,將被添加到新行分割表。
然而,如果你在流,使重複數據刪除de-duped_stream函數隻會返回唯一的記錄基於一組指定的列。這意味著,如果新的JSON文件與前一個相同,它會過濾掉,而不是添加到SCD表。
使重複數據刪除的流,您可以添加的選項(“streaming.deduplicate”,“真正的”)
流程定義參數。您還可以指定使用的列德重複使用的選項(“streaming.deduplicateKeys”, < column_list >)參數。
@Kearon McNicol:
SCD 2型,創建新記錄有修改監控列以及當沒有變化監測列但當前記錄已經過期。這是因為當前的記錄被認為是過時的和新記錄關鍵但需要創建更新的值。有可能你看到重複的記錄,因為當前的記錄已經過期,新的記錄被創建相同的關鍵,但相同的值作為過期的記錄。
為了避免這種情況,你可以試著增加記錄的有效期設置更大的值存儲為SCD的valid_time_column 2選項。這將允許記錄保持有效期更長一段時間,減少創建新記錄的數量。
或者,你可以嚐試在管道中使用重複數據刪除步驟刪除重複的記錄之前處理的SCD 2型邏輯。可以通過分組記錄的鍵並選擇最近的記錄每組根據file_modification_time列。這可以通過使用引發的窗口函數。
例如:
進口pyspark.sql。函數從pyspark.sql F。窗口導入窗口deduped_stream =(流(live.currStudents_ingest) .groupBy (“id”) .agg (F.max (file_modification_time) .alias (“latest_file_modification_time”), F。第一個(json,真的).alias (json)) .select (“id”,“latest_file_modification_time”, json))應用到生活變化。currStudents_SCD從latest_file_modification_time deduped_stream鍵(id)序列存儲為SCD 2型跟蹤曆史除了(latest_file_modification_time) *
在這裏,我們組的記錄id列,選擇最近的記錄每組根據file_modification_time列。然後我們通過刪除處理流進了SCD 2型邏輯進行進一步處理。
希望這可以幫助!
@Suteja卡努裏人,謝謝你。被拉到其他項目,回到現在,所以將你的解決方案。
我認為重複數據刪除選項是我們想要的,但是我不確定我完全理解。
我有一個調用的api檢索一個json文件。當這個數據很少變化,記錄通常是相同的。
如果我理解你的de-duped_stream正確的話,這將選擇json文件的最新版本。
不會這隻是改變每天晚上當我檢索json文件,因此創建一個新的SCD此時重複的條目嗎?
我覺得我缺少如何重複數據刪除生效,這可能是由於我在這方麵缺乏經驗。
謝謝你的幫助。
@Kearon McNicol:
你是正確的,如果你使用de-duped_stream函數,它會選擇最新版本的JSON文件,即使它與以前的版本相同。這意味著如果你每晚運行管道和檢索JSON文件,將被添加到新行分割表。
然而,如果你在流,使重複數據刪除de-duped_stream函數隻會返回唯一的記錄基於一組指定的列。這意味著,如果新的JSON文件與前一個相同,它會過濾掉,而不是添加到SCD表。
使重複數據刪除的流,您可以添加的選項(“streaming.deduplicate”,“真正的”)
流程定義參數。您還可以指定使用的列德重複使用的選項(“streaming.deduplicateKeys”, < column_list >)參數。