嘿社區! !我用dlt。apply_changes DLT的工作如下:
dlt.apply_changes (
目標= " employee_silver ",
源= " employee_bronze_clean_v ",
鍵= (“EMPLOYEE_ID”),
sequence_by =坳(“last_updated”),
apply_as_deletes = expr (Op = ' D '),
except_column_list =[“人事處”、“_rescued_data”])
apply_as_deletes行代碼不工作。我使用AWS數據庫遷移服務,在“正在進行的複製模式”,它創建csv文件在S3中列Op和I / U / D的值是一個(分別插入、更新、刪除)。我不知道為什麼我刪除事務不是正在處理的管道。
非常感謝您的見解!謝謝!
好的....會給它一個嚐試暫時和報告嗎
@Alex Barreto我給一槍,它說不列刪除錯誤的存在。這是疾病預防控製中心文件,我加載(當然假的數據):
Op, EMPLOYEE_ID FIRST_NAME、LAST_NAME、電子郵件、PHONE_NUMBER, HIRE_DATE, JOB_ID,薪水,COMMISSION_PCT, MANAGER_ID, DEPARTMENT_ID last_updated
威廉D, 206年,Gietz、WGIETZ 515.123.8181, 2002-06-07, AC_ACCOUNT, 8300,, 205110, 2021-10-06 21:26:04
所以我必須使用expr (“Op = ' D '”),這就是不工作。當我使用磚SQL查詢青銅表,我看到2行這個主鍵(206)…有一個“我”(對於這個初始插入)和一個“D”(刪除事務從源(MySQL極光)。但這是D行不流入銀表
@Alex Barreto現在就想我了! !我最初使用sequence_by =坳(“createDate”)在我apply_changes代碼,它來自源表。然而,我意識到我有一個中間dlt。視圖函數做一些基本的數據質量檢查(dlt.expect)在這個函數,我也這樣做:
def employee_bronze_clean_v ():
返回dlt.read_stream \ (“employee_bronze”)
.withColumn (inputFileName, F.input_file_name ()) \
.withColumn (LoadDate, F.lit (datetime.now ()))
我在sequence_by =然後用LoadDate坳dlt (“LoadDate”)。apply_changes,轉眼間....行從我的銀表! ! !
但這是消失了嗎?它還可以在時間旅行嗎?鑲木地板文件中仍然存在嗎?我擔心這個隱私法像GDPR CCPA等等……
注:再次感謝幫助!