使用Delta Live表更改數據捕獲
請注意
本文描述如何根據源數據的更改更新Delta Live tables管道中的表。要了解如何記錄和查詢Delta表的行級更改信息,請參見在Databricks上使用Delta Lake更改數據提要。
預覽
Delta Live Tables支持SCD類型2公共預覽。
可以在Delta Live Tables中使用更改數據捕獲(CDC)根據源數據的更改更新表。Delta Live Tables SQL和Python接口支持CDC。Delta Live Tables支持更新慢變維(SCD)類型1和類型2的表:
使用SCD類型1直接更新記錄。更新的記錄不保留曆史記錄。
使用SCD類型2來保留對記錄的所有更新的曆史。在使用SCD類型2時,還可以保留對指定列的更新曆史記錄。看到僅跟蹤具有SCD類型2的指定列的曆史記錄
為了表示變更的有效期間,SCD Type 2將每個變更與生成的變更一起存儲__START_AT
和__END_AT
列。的列指定序列通過
使用SQL或sequence_by
來生成__START_AT
和__END_AT
列。
請注意
的數據類型__START_AT
和__END_AT
列的值與指定的數據類型相同序列通過
字段。
SQL
使用應用變化成
語句使用Delta Live Tables CDC功能:
將更改應用到活動中。表格_name FROM source KEYS (keys) [WHERE condition] [IGNORE NULL UPDATES] [APPLY AS DELETE WHEN condition] [APPLY AS TRUNCATE WHEN condition] SEQUENCE BY orderByColumn [COLUMNS {columnList | * EXCEPT (exceptColumnList)}] [STORED AS {SCD TYPE 1 | SCD TYPE 2}] [TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
條款 |
---|
鍵 在源數據中唯一標識一行的列或列的組合。這用於確定哪些CDC事件應用於目標表中的特定記錄。 這一條款是必需的。 |
在哪裏 應用於源和目標的條件,以觸發優化,如分區修剪。此條件不能用於刪除源行;源中的所有CDC行必須滿足此條件,否則將拋出錯誤。使用WHERE子句是可選的,應該在處理需要特定優化時使用。 這個條款是可選的。 |
忽略空更新 允許攝取包含目標列子集的更新。當CDC事件匹配現有行並且指定了IGNORE NULL UPDATES時,帶有 這個條款是可選的。 的默認值是覆蓋現有列 |
應用為delete時 指定CDC事件何時應作為事件處理 這個條款是可選的。 |
應用為截斷時 指定CDC事件何時應作為滿表處理 的 這個條款是可選的。 |
序列由 指定源數據中CDC事件邏輯順序的列名。Delta Live Tables使用此排序來處理無序到達的更改事件。 這一條款是必需的。 |
列 指定要包含在目標表中的列的子集。你可以:
這個條款是可選的。 屬性時,默認值是在目標表中包含所有列 |
存儲為 是否將記錄存儲為SCD類型1或SCD類型2。 這個條款是可選的。 默認為SCD類型1。 |
航跡曆史記錄 當
這個條款是可選的。當有任何更改時,默認為所有輸出列的跟蹤曆史記錄,相當於 要使用此子句,必須設置 |
的默認行為插入
和更新
事件是插入來自源的CDC事件:更新目標表中與指定鍵匹配的任何行,或者在目標表中不存在匹配記錄時插入新行。處理的刪除
屬性可以指定事件應用作為刪除當
條件。
Python
使用apply_changes ()
函數中使用Delta Live Tables CDC功能。Delta Live Tables Python CDC接口還提供create_streaming_live_table ()函數。類所需的目標表可以使用此函數創建apply_changes ()
函數。看到示例查詢。
應用更改函數
apply_changes(目標=“<目標表>”,源=“<數據源>”,鍵=(“key1”,“key2”,“keyN”],sequence_by=“< sequence-column >”,ignore_null_updates=假,apply_as_deletes=沒有一個,apply_as_truncates=沒有一個,column_list=沒有一個,except_column_list=沒有一個,stored_as_scd_type=<類型>,track_history_column_list=沒有一個,track_history_except_column_list=沒有一個)
參數 |
---|
目標 類型: 要更新的表的名稱。您可以使用create_streaming_live_table ()函數在執行 必選參數。 |
源 類型: 包含CDC記錄的數據源。 必選參數。 |
鍵 類型: 在源數據中唯一標識一行的列或列的組合。這用於確定哪些CDC事件應用於目標表中的特定記錄。 你可以指定:
參數 必選參數。 |
sequence_by 類型: 指定源數據中CDC事件邏輯順序的列名。Delta Live Tables使用此排序來處理無序到達的更改事件。 你可以指定:
參數 必選參數。 |
ignore_null_updates 類型: 允許攝取包含目標列子集的更新。當CDC事件與現有行和 可選參數。 默認為 |
apply_as_deletes 類型: 指定CDC事件何時應作為事件處理 你可以指定:
可選參數。 |
apply_as_truncates 類型: 指定CDC事件何時應作為滿表處理 的 你可以指定:
可選參數。 |
column_listexcept_column_list 類型: 要包含在目標表中的列的子集。使用
參數 可選參數。 默認情況下是在目標表中包含所有列 |
stored_as_scd_type 類型: 是否將記錄存儲為SCD類型1或SCD類型2。 設置為 這個條款是可選的。 默認為SCD類型1。 |
track_history_column_listtrack_history_except_column_list 類型: 要在目標表中跟蹤曆史記錄的輸出列的子集。當 參數 可選參數。 默認情況下是在目標表中包含所有列 要使用這些參數,必須設置 |
的默認行為插入
和更新
事件是插入來自源的CDC事件:更新目標表中與指定鍵匹配的任何行,或者在目標表中不存在匹配記錄時插入新行。處理的刪除
屬性可以指定事件apply_as_deletes
論點。
為輸出記錄創建一個目標表
使用create_streaming_live_table ()
方法創建目標表apply_changes ()
輸出記錄。
請注意
的create_target_table ()
函數已棄用。Databricks建議更新現有代碼以使用create_streaming_live_table ()
函數。
create_streaming_live_table(名字=“<表名稱>”,評論=“< >評論”spark_conf={“< >鍵”:“<價值”,“<鍵”:“< >價值”},table_properties={“< >鍵”:“< >價值”,“< >鍵”:“< >價值”},partition_cols=(“<劃分字段>”,“<劃分字段>”],路徑=“< storage-location-path >”,模式=“模式定義”)
參數 |
---|
名字 類型: 表名。 必選參數。 |
評論 類型: 表的可選描述。 |
spark_conf 類型: 用於執行此查詢的可選Spark配置列表。 |
table_properties 類型: 可選的表屬性在桌子上。 |
partition_cols 類型: 用於對表進行分區的一個或多個列的可選列表。 |
路徑 類型: 表數據的可選存儲位置。如果不設置,係統將默認為管道存儲位置。 |
模式 類型: 表的可選模式定義。模式可以定義為SQL DDL字符串,也可以定義為Python |
類的模式時apply_changes
目標表中,還必須包含__START_AT
和__END_AT
類具有相同數據類型的列sequence_by
字段。例如,如果目標表有列鍵,字符串
,值,字符串
,測序,長
:
create_streaming_live_table(名字=“目標”,評論="目標為疾病控製中心攝入。",partition_cols=(“價值”],路徑=“tablePath美元”,模式=StructType((StructField(“關鍵”,StringType()),StructField(“價值”,StringType()),StructField(“排序”,LongType()),StructField(“__START_AT”,LongType()),StructField(“__END_AT”,LongType())]))
請注意
方法之前,必須確保已創建目標表
應用變化成
查詢或apply_changes
函數。看到示例查詢。目標表的指標(如輸出行數)不可用。
SCD類型2更新將為每個輸入行添加曆史記錄行,即使沒有列發生更改。
的目標
應用變化成
查詢或apply_changes
函數不能用作流式直播表的源。對象的目標中讀取的表應用變化成
查詢或apply_changes
函數必須是活動表。類中不支持期望
應用變化成
查詢或apply_changes ()
函數。要對源數據集或目標數據集使用期望:通過定義具有所需期望的中間表,在源數據上添加期望,並使用此數據集作為目標表的源。
使用從目標表讀取輸入數據的下遊表在目標數據上添加期望。
表屬性
添加以下表屬性以控製墓碑管理的行為刪除
事件:
表屬性 |
---|
pipelines.cdc.tombstoneGCThresholdInSeconds 將此值設置為匹配無序數據之間的最高預期間隔。 缺省值:5分鍾 |
pipelines.cdc.tombstoneGCFrequencyInSeconds 控製墓碑清理檢查的頻率。 缺省值:60秒 |
僅跟蹤具有SCD類型2的指定列的曆史記錄
SCD類型2支持指定輸出列的子集,僅在這些列上生成曆史記錄;對其他列的更改將就地更新,而不是生成新的曆史記錄。
要在Delta Live Tables SCD type 2中使用跟蹤曆史,必須通過將以下配置添加到Delta Live Tables管道設置中來顯式啟用管道中的功能:
{“配置”:{“pipelines.enableTrackHistory”:“真正的”}}
如果pipelines.enableTrackHistory
是未設置還是設置為假
, SCD類型2查詢使用為每個輸入行生成曆史記錄的默認行為。
數據庫上的SCD類型1和SCD類型2
這些示例演示了Delta Live Tables SCD類型1和類型2查詢,這些查詢基於以下源事件更新目標表:
創建新的用戶記錄。
刪除用戶記錄。
更新用戶記錄。在SCD類型1的示例中,最後一個
更新
操作延遲到達並從目標表中刪除,演示了無序事件的處理。
以下是這些例子的輸入記錄:
用戶標識 |
名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
124 |
勞爾 |
瓦哈卡 |
插入 |
1 |
123 |
伊莎貝爾 |
蒙特雷 |
插入 |
1 |
125 |
梅塞德斯 |
提華納 |
插入 |
2 |
126 |
莉莉 |
坎昆 |
插入 |
2 |
123 |
零 |
零 |
刪除 |
6 |
125 |
梅塞德斯 |
瓜達拉哈拉 |
更新 |
6 |
125 |
梅塞德斯 |
墨西卡利 |
更新 |
5 |
123 |
伊莎貝爾 |
吉娃娃 |
更新 |
5 |
運行SCD type 1示例後,目標表包含以下記錄:
用戶標識 |
名字 |
城市 |
---|---|---|
124 |
勞爾 |
瓦哈卡 |
125 |
梅塞德斯 |
瓜達拉哈拉 |
126 |
莉莉 |
坎昆 |
屬性的附加記錄包含以下輸入記錄截斷
操作,可與SCD類型1示例代碼一起使用:
用戶標識 |
名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
124 |
勞爾 |
瓦哈卡 |
插入 |
1 |
123 |
伊莎貝爾 |
蒙特雷 |
插入 |
1 |
125 |
梅塞德斯 |
提華納 |
插入 |
2 |
126 |
莉莉 |
坎昆 |
插入 |
2 |
123 |
零 |
零 |
刪除 |
6 |
125 |
梅塞德斯 |
瓜達拉哈拉 |
更新 |
6 |
125 |
梅塞德斯 |
墨西卡利 |
更新 |
5 |
123 |
伊莎貝爾 |
吉娃娃 |
更新 |
5 |
零 |
零 |
零 |
截斷 |
3. |
在運行帶有附加的SCD類型1示例之後截斷
記錄,記錄124
和126
會被截斷,因為截斷
操作在sequenceNum = 3
,目標表包含以下記錄:
用戶標識 |
名字 |
城市 |
---|---|---|
125 |
梅塞德斯 |
瓜達拉哈拉 |
在沒有附加的情況下運行SCD類型2的示例之後截斷
記錄,目標表包含以下記錄:
用戶標識 |
名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
伊莎貝爾 |
蒙特雷 |
1 |
5 |
123 |
伊莎貝爾 |
吉娃娃 |
5 |
6 |
124 |
勞爾 |
瓦哈卡 |
1 |
零 |
125 |
梅塞德斯 |
提華納 |
2 |
5 |
125 |
梅塞德斯 |
墨西卡利 |
5 |
6 |
125 |
梅塞德斯 |
瓜達拉哈拉 |
6 |
零 |
126 |
莉莉 |
坎昆 |
2 |
零 |
運行後的SCD類型2跟蹤曆史的例子沒有額外的截斷
記錄,目標表包含以下記錄:
用戶標識 |
名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
伊莎貝爾 |
吉娃娃 |
1 |
6 |
124 |
勞爾 |
瓦哈卡 |
1 |
零 |
125 |
梅塞德斯 |
瓜達拉哈拉 |
2 |
零 |
126 |
莉莉 |
坎昆 |
2 |
零 |
生成測試數據
為這個例子創建測試記錄:
轉到Databricks登錄頁並選擇創建一個筆記本,或按新在側欄中選擇筆記本。的創建筆記本對話框出現了。
在創建筆記本對話,給你的筆記本起個名字;例如,生成測試CDC記錄。選擇SQL從默認的語言下拉菜單。
如果有正在運行的集群,則集群下拉顯示。選擇要將筆記本附加到的集群。你也可以創建創建筆記本後要附加到的新集群。
點擊創建。
複製以下查詢並將其粘貼到新筆記本的第一個單元格中:
創建模式如果不存在cdc_data;創建表格cdc_data。用戶作為選擇col1作為用戶標識,col2作為名字,col3作為城市,col4作為操作,col5作為sequenceNum從(值——初始負載。(124,“勞爾”,“瓦哈卡”,“插入”,1),(123,“伊莎貝爾”,“蒙特雷”,“插入”,1),——新用戶。(125,“梅賽德斯”,“提華納”,“插入”,2),(126,“莉莉”,“坎昆”,“插入”,2),伊莎貝爾被從係統中移除,梅賽德斯被轉移到瓜達拉哈拉。(123,零,零,“刪除”,6),(125,“梅賽德斯”,“瓜達拉哈拉”,“更新”,6),—這批更新是不按順序來的。上述位於sequenceNum 5的批處理將是最終狀態。(125,“梅賽德斯”,“墨西卡利”,“更新”,5),(123,“伊莎貝爾”,“吉娃娃”,“更新”,5)—取消注釋以測試TRUNCATE。——,(null, null, null, "TRUNCATE", 3));
要運行筆記本並填充測試記錄,請在單元格操作菜單中在最右側,單擊並選擇運行單元,或按shift + enter。
示例查詢
進口dlt從pyspark.sql.functions進口上校,expr@dlt。視圖def用戶():返回火花。readStream。格式(“δ”)。表格(“cdc_data.users”)dlt。create_streaming_live_table(“目標”)dlt。apply_changes(目標=“目標”,源=“用戶”,鍵=(“標識”],sequence_by=上校(“sequenceNum”),apply_as_deletes=expr("operation = 'DELETE'"),apply_as_truncates=expr("操作= 'TRUNCATE'"),except_column_list=(“操作”,“sequenceNum”],stored_as_scd_type=1)
——創建並填充目標表。創建或刷新流媒體生活表格目標;應用變化成生活。目標從流(cdc_data。用戶)鍵(用戶標識)應用作為刪除當操作=“刪除”應用作為截斷當操作=“截斷”序列通過sequenceNum列*除了(操作,sequenceNum)存儲作為鏡頭分割類型1;
示例查詢
進口dlt從pyspark.sql.functions進口上校,expr@dlt。視圖def用戶():返回火花。readStream。格式(“δ”)。表格(“cdc_data.users”)dlt。create_streaming_live_table(“目標”)dlt。apply_changes(目標=“目標”,源=“用戶”,鍵=(“標識”],sequence_by=上校(“sequenceNum”),apply_as_deletes=expr("operation = 'DELETE'"),except_column_list=(“操作”,“sequenceNum”],stored_as_scd_type=“2”)
——創建並填充目標表。創建或刷新流媒體生活表格目標;應用變化成生活。目標從流(cdc_data。用戶)鍵(用戶標識)應用作為刪除當操作=“刪除”序列通過sequenceNum列*除了(操作,sequenceNum)存儲作為鏡頭分割類型2;
帶有跟蹤曆史的示例查詢
進口dlt從pyspark.sql.functions進口上校,expr@dlt。視圖def用戶():返回火花。readStream。格式(“δ”)。表格(“cdc_data.users”)dlt。create_streaming_live_table(“目標”)dlt。apply_changes(目標=“目標”,源=“用戶”,鍵=(“標識”],sequence_by=上校(“sequenceNum”),apply_as_deletes=expr("operation = 'DELETE'"),except_column_list=(“操作”,“sequenceNum”],stored_as_scd_type=“2”,track_history_except_column_list=(“城市”])
——創建並填充目標表。創建或刷新流媒體生活表格目標;應用變化成生活。目標從流(cdc_data。用戶)鍵(用戶標識)應用作為刪除當操作=“刪除”序列通過sequenceNum列*除了(操作,sequenceNum)存儲作為鏡頭分割類型2;跟蹤曆史在*除了(城市)