使用Delta Live表更改數據捕獲

請注意

本文描述如何根據源數據的更改更新Delta Live tables管道中的表。要了解如何記錄和查詢Delta表的行級更改信息,請參見在Databricks上使用Delta Lake更改數據提要

預覽

Delta Live Tables支持SCD類型2公共預覽

可以在Delta Live Tables中使用更改數據捕獲(CDC)根據源數據的更改更新表。Delta Live Tables SQL和Python接口支持CDC。Delta Live Tables支持更新慢變維(SCD)類型1和類型2的表:

  • 使用SCD類型1直接更新記錄。更新的記錄不保留曆史記錄。

  • 使用SCD類型2來保留對記錄的所有更新的曆史。在使用SCD類型2時,還可以保留對指定列的更新曆史記錄。看到僅跟蹤具有SCD類型2的指定列的曆史記錄

為了表示變更的有效期間,SCD Type 2將每個變更與生成的變更一起存儲__START_AT__END_AT列。的列指定序列通過使用SQL或sequence_by來生成__START_AT__END_AT列。

請注意

的數據類型__START_AT__END_AT列的值與指定的數據類型相同序列通過字段。

SQL

使用應用變化語句使用Delta Live Tables CDC功能:

將更改應用到活動中。表格_name FROM source KEYS (keys) [WHERE condition] [IGNORE NULL UPDATES] [APPLY AS DELETE WHEN condition] [APPLY AS TRUNCATE WHEN condition] SEQUENCE BY orderByColumn [COLUMNS {columnList | * EXCEPT (exceptColumnList)}] [STORED AS {SCD TYPE 1 | SCD TYPE 2}] [TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

條款

在源數據中唯一標識一行的列或列的組合。這用於確定哪些CDC事件應用於目標表中的特定記錄。

這一條款是必需的。

在哪裏

應用於源和目標的條件,以觸發優化,如分區修剪。此條件不能用於刪除源行;源中的所有CDC行必須滿足此條件,否則將拋出錯誤。使用WHERE子句是可選的,應該在處理需要特定優化時使用。

這個條款是可選的。

忽略空更新

允許攝取包含目標列子集的更新。當CDC事件匹配現有行並且指定了IGNORE NULL UPDATES時,帶有將其現有值保留在目標中。這也適用於值為的嵌套列

這個條款是可選的。

的默認值是覆蓋現有列值。

應用為delete時

指定CDC事件何時應作為事件處理刪除而不是上插。為了處理亂序數據,被刪除的行被臨時保留為底層Delta表中的一個墓碑,並在metastore中創建一個視圖以過濾掉這些墓碑。保留間隔可以使用pipelines.cdc.tombstoneGCThresholdInSeconds表屬性

這個條款是可選的。

應用為截斷時

指定CDC事件何時應作為滿表處理截斷。因為這個子句觸發了目標表的完整截斷,所以它應該隻用於需要此功能的特定用例。

應用作為截斷子句僅支持SCD類型1。SCD類型2不支持截斷。

這個條款是可選的。

序列由

指定源數據中CDC事件邏輯順序的列名。Delta Live Tables使用此排序來處理無序到達的更改事件。

這一條款是必需的。

指定要包含在目標表中的列的子集。你可以:

  • 指定要包含的列的完整列表:(用戶標識,名字市)

  • 指定要排除的列列表:*除了(操作,sequenceNum)

這個條款是可選的。

屬性時,默認值是在目標表中包含所有列子句未指定。

存儲為

是否將記錄存儲為SCD類型1或SCD類型2。

這個條款是可選的。

默認為SCD類型1。

航跡曆史記錄

pipelines.enableTrackHistory設置後,指定輸出列的一個子集,以便在對這些指定列進行任何更改時生成曆史記錄。你可以指定:

  • 指定要跟蹤的列的完整列表:(用戶標識,名字市)

  • 指定要排除跟蹤的列的列表:*除了(操作,sequenceNum)

這個條款是可選的。當有任何更改時,默認為所有輸出列的跟蹤曆史記錄,相當於跟蹤曆史*

要使用此子句,必須設置pipelines.enableTrackHistory在管道設置中。否則,拋出異常。當pipelines.enableTrackHistory未設置時,將為每個輸入行生成曆史記錄。

的默認行為插入更新事件是插入來自源的CDC事件:更新目標表中與指定鍵匹配的任何行,或者在目標表中不存在匹配記錄時插入新行。處理的刪除屬性可以指定事件應用作為刪除條件。

Python

使用apply_changes ()函數中使用Delta Live Tables CDC功能。Delta Live Tables Python CDC接口還提供create_streaming_live_table ()函數。類所需的目標表可以使用此函數創建apply_changes ()函數。看到示例查詢

應用更改函數

apply_changes目標=“<目標表>”=“<數據源>”=(“key1”“key2”“keyN”],sequence_by=“< sequence-column >”ignore_null_updates=apply_as_deletes=沒有一個apply_as_truncates=沒有一個column_list=沒有一個except_column_list=沒有一個stored_as_scd_type=<類型>track_history_column_list=沒有一個track_history_except_column_list=沒有一個

參數

目標

類型:str

要更新的表的名稱。您可以使用create_streaming_live_table ()函數在執行apply_changes ()函數。

必選參數。

類型:str

包含CDC記錄的數據源。

必選參數。

類型:列表

在源數據中唯一標識一行的列或列的組合。這用於確定哪些CDC事件應用於目標表中的特定記錄。

你可以指定:

  • 字符串列表:["標識",“orderId”)

  • Spark SQL的列表坳()功能:[坳(“標識”),坳(“orderId”)

參數坳()函數不能包含限定符。例如,你可以使用坳(標識),但你不能使用坳(source.userId)

必選參數。

sequence_by

類型:str坳()

指定源數據中CDC事件邏輯順序的列名。Delta Live Tables使用此排序來處理無序到達的更改事件。

你可以指定:

  • 一個字符串:“sequenceNum”

  • Spark SQL坳()功能:坳(“sequenceNum”)

參數坳()函數不能包含限定符。例如,你可以使用坳(標識),但你不能使用坳(source.userId)

必選參數。

ignore_null_updates

類型:保齡球

允許攝取包含目標列子集的更新。當CDC事件與現有行和ignore_null_updates真正的,列中將其現有值保留在目標中。這也適用於值為的嵌套列。當ignore_null_updates,現有值將被覆蓋值。

可選參數。

默認為

apply_as_deletes

類型:strexpr ()

指定CDC事件何時應作為事件處理刪除而不是上插。為了處理亂序數據,被刪除的行被臨時保留為底層Delta表中的一個墓碑,並在metastore中創建一個視圖以過濾掉這些墓碑。保留間隔可以使用pipelines.cdc.tombstoneGCThresholdInSeconds表屬性

你可以指定:

  • 一個字符串:”操作=“刪除”

  • Spark SQLexpr ()功能:expr(“操作=“刪除”)

可選參數。

apply_as_truncates

類型:strexpr ()

指定CDC事件何時應作為滿表處理截斷。因為這個子句觸發了目標表的完整截斷,所以它應該隻用於需要此功能的特定用例。

apply_as_truncates參數隻支持SCD類型1。SCD類型2不支持截斷。

你可以指定:

  • 一個字符串:”操作=“截斷”

  • Spark SQLexpr ()功能:expr(“操作=“截斷”)

可選參數。

column_listexcept_column_list

類型:列表

要包含在目標表中的列的子集。使用column_list指定要包含的列的完整列表。使用except_column_list指定要排除的列。您可以將值聲明為字符串列表或Spark SQL坳()功能:

  • column_list=["標識",“名稱”,“城市”)

  • column_list=[坳(“標識”),坳(“名字”),坳(“城市”)]

  • except_column_list=["操作",“sequenceNum”)

  • except_column_list=[坳(“操作”),坳(“sequenceNum”)

參數坳()函數不能包含限定符。例如,你可以使用坳(標識),但你不能使用坳(source.userId)

可選參數。

默認情況下是在目標表中包含所有列column_listexcept_column_list參數被傳遞給函數。

stored_as_scd_type

類型:strint

是否將記錄存儲為SCD類型1或SCD類型2。

設置為1適用於SCD類型1或2適用於SCD類型2。

這個條款是可選的。

默認為SCD類型1。

track_history_column_listtrack_history_except_column_list

類型:列表

要在目標表中跟蹤曆史記錄的輸出列的子集。當pipelines.enableTrackHistory已設置,使用track_history_column_list指定要跟蹤的列的完整列表。使用track_history_except_column_list指定要排除在跟蹤之外的列。您可以將值聲明為字符串列表或Spark SQL坳()功能:-track_history_column_list=["標識",“名稱”,“城市”)。-track_history_column_list=[坳(“標識”),坳(“名字”),坳(“城市”)]-track_history_except_column_list=["操作",“sequenceNum”)-track_history_except_column_list=[坳(“操作”),坳(“sequenceNum”)

參數坳()函數不能包含限定符。例如,你可以使用坳(標識),但你不能使用坳(source.userId)

可選參數。

默認情況下是在目標表中包含所有列track_history_column_listtrack_history_except_column_list參數被傳遞給函數。

要使用這些參數,必須設置pipelines.enableTrackHistory在管道設置中。否則,拋出異常。當pipelines.enableTrackHistory未設置時,將為每個輸入行生成曆史記錄。

的默認行為插入更新事件是插入來自源的CDC事件:更新目標表中與指定鍵匹配的任何行,或者在目標表中不存在匹配記錄時插入新行。處理的刪除屬性可以指定事件apply_as_deletes論點。

為輸出記錄創建一個目標表

使用create_streaming_live_table ()方法創建目標表apply_changes ()輸出記錄。

請注意

create_target_table ()函數已棄用。Databricks建議更新現有代碼以使用create_streaming_live_table ()函數。

create_streaming_live_table名字=“<表名稱>”評論=“< >評論”spark_conf={“< >鍵”“<價值”“<鍵”“< >價值”},table_properties={“< >鍵”“< >價值”“< >鍵”“< >價值”},partition_cols=(“<劃分字段>”“<劃分字段>”],路徑=“< storage-location-path >”模式=“模式定義”

參數

名字

類型:str

表名。

必選參數。

評論

類型:str

表的可選描述。

spark_conf

類型:dict

用於執行此查詢的可選Spark配置列表。

table_properties

類型:dict

可選的表屬性在桌子上。

partition_cols

類型:數組

用於對表進行分區的一個或多個列的可選列表。

路徑

類型:str

表數據的可選存儲位置。如果不設置,係統將默認為管道存儲位置。

模式

類型:strStructType

表的可選模式定義。模式可以定義為SQL DDL字符串,也可以定義為PythonStructType

類的模式時apply_changes目標表中,還必須包含__START_AT__END_AT類具有相同數據類型的列sequence_by字段。例如,如果目標表有列鍵,字符串值,字符串,測序,

create_streaming_live_table名字=“目標”評論="目標為疾病控製中心攝入。"partition_cols=(“價值”],路徑=“tablePath美元”模式=StructType(StructField“關鍵”StringType()),StructField“價值”StringType()),StructField“排序”LongType()),StructField“__START_AT”LongType()),StructField“__END_AT”LongType())

請注意

  • 方法之前,必須確保已創建目標表應用變化查詢或apply_changes函數。看到示例查詢

  • 目標表的指標(如輸出行數)不可用。

  • SCD類型2更新將為每個輸入行添加曆史記錄行,即使沒有列發生更改。

  • 的目標應用變化查詢或apply_changes函數不能用作流式直播表的源。對象的目標中讀取的表應用變化查詢或apply_changes函數必須是活動表。

  • 類中不支持期望應用變化查詢或apply_changes ()函數。要對源數據集或目標數據集使用期望:

    • 通過定義具有所需期望的中間表,在源數據上添加期望,並使用此數據集作為目標表的源。

    • 使用從目標表讀取輸入數據的下遊表在目標數據上添加期望。

表屬性

添加以下表屬性以控製墓碑管理的行為刪除事件:

表屬性

pipelines.cdc.tombstoneGCThresholdInSeconds

將此值設置為匹配無序數據之間的最高預期間隔。

缺省值:5分鍾

pipelines.cdc.tombstoneGCFrequencyInSeconds

控製墓碑清理檢查的頻率。

缺省值:60秒

僅跟蹤具有SCD類型2的指定列的曆史記錄

SCD類型2支持指定輸出列的子集,僅在這些列上生成曆史記錄;對其他列的更改將就地更新,而不是生成新的曆史記錄。

要在Delta Live Tables SCD type 2中使用跟蹤曆史,必須通過將以下配置添加到Delta Live Tables管道設置中來顯式啟用管道中的功能:

{“配置”{“pipelines.enableTrackHistory”“真正的”}}

如果pipelines.enableTrackHistory是未設置還是設置為, SCD類型2查詢使用為每個輸入行生成曆史記錄的默認行為。

數據庫上的SCD類型1和SCD類型2

這些示例演示了Delta Live Tables SCD類型1和類型2查詢,這些查詢基於以下源事件更新目標表:

  1. 創建新的用戶記錄。

  2. 刪除用戶記錄。

  3. 更新用戶記錄。在SCD類型1的示例中,最後一個更新操作延遲到達並從目標表中刪除,演示了無序事件的處理。

以下是這些例子的輸入記錄:

用戶標識

名字

城市

操作

sequenceNum

124

勞爾

瓦哈卡

插入

1

123

伊莎貝爾

蒙特雷

插入

1

125

梅塞德斯

提華納

插入

2

126

莉莉

坎昆

插入

2

123

刪除

6

125

梅塞德斯

瓜達拉哈拉

更新

6

125

梅塞德斯

墨西卡利

更新

5

123

伊莎貝爾

吉娃娃

更新

5

運行SCD type 1示例後,目標表包含以下記錄:

用戶標識

名字

城市

124

勞爾

瓦哈卡

125

梅塞德斯

瓜達拉哈拉

126

莉莉

坎昆

屬性的附加記錄包含以下輸入記錄截斷操作,可與SCD類型1示例代碼一起使用:

用戶標識

名字

城市

操作

sequenceNum

124

勞爾

瓦哈卡

插入

1

123

伊莎貝爾

蒙特雷

插入

1

125

梅塞德斯

提華納

插入

2

126

莉莉

坎昆

插入

2

123

刪除

6

125

梅塞德斯

瓜達拉哈拉

更新

6

125

梅塞德斯

墨西卡利

更新

5

123

伊莎貝爾

吉娃娃

更新

5

截斷

3.

在運行帶有附加的SCD類型1示例之後截斷記錄,記錄124126會被截斷,因為截斷操作在sequenceNum = 3,目標表包含以下記錄:

用戶標識

名字

城市

125

梅塞德斯

瓜達拉哈拉

在沒有附加的情況下運行SCD類型2的示例之後截斷記錄,目標表包含以下記錄:

用戶標識

名字

城市

__START_AT

__END_AT

123

伊莎貝爾

蒙特雷

1

5

123

伊莎貝爾

吉娃娃

5

6

124

勞爾

瓦哈卡

1

125

梅塞德斯

提華納

2

5

125

梅塞德斯

墨西卡利

5

6

125

梅塞德斯

瓜達拉哈拉

6

126

莉莉

坎昆

2

運行後的SCD類型2跟蹤曆史的例子沒有額外的截斷記錄,目標表包含以下記錄:

用戶標識

名字

城市

__START_AT

__END_AT

123

伊莎貝爾

吉娃娃

1

6

124

勞爾

瓦哈卡

1

125

梅塞德斯

瓜達拉哈拉

2

126

莉莉

坎昆

2

生成測試數據

為這個例子創建測試記錄:

  1. 轉到Databricks登錄頁並選擇創建一個筆記本,或按新圖標在側欄中選擇筆記本。的創建筆記本對話框出現了。

  2. 創建筆記本對話,給你的筆記本起個名字;例如,生成測試CDC記錄。選擇SQL默認的語言下拉菜單。

  3. 如果有正在運行的集群,則集群下拉顯示。選擇要將筆記本附加到的集群。你也可以創建創建筆記本後要附加到的新集群。

  4. 點擊創建

  5. 複製以下查詢並將其粘貼到新筆記本的第一個單元格中:

    創建模式如果存在cdc_data創建表格cdc_data用戶作為選擇col1作為用戶標識col2作為名字col3作為城市col4作為操作col5作為sequenceNum——初始負載。124“勞爾”“瓦哈卡”“插入”1),123“伊莎貝爾”“蒙特雷”“插入”1),——新用戶。125“梅賽德斯”“提華納”“插入”2),126“莉莉”“坎昆”“插入”2),伊莎貝爾被從係統中移除,梅賽德斯被轉移到瓜達拉哈拉。123“刪除”6),125“梅賽德斯”“瓜達拉哈拉”“更新”6),—這批更新是不按順序來的。上述位於sequenceNum 5的批處理將是最終狀態。125“梅賽德斯”“墨西卡利”“更新”5),123“伊莎貝爾”“吉娃娃”“更新”5—取消注釋以測試TRUNCATE。——,(null, null, null, "TRUNCATE", 3));
  6. 要運行筆記本並填充測試記錄,請在單元格操作菜單中細胞的行為在最右側,單擊運行圖標並選擇運行單元,或按shift + enter

創建並運行SCD類型1示例管道

  1. 轉到Databricks登錄頁並選擇創建一個筆記本,或按新圖標在側欄中選擇筆記本。的創建筆記本對話框出現了。

  2. 創建筆記本對話,給你的筆記本起個名字;例如,DLT CDC示例。選擇PythonSQL默認的語言下拉菜單,根據您的首選語言。你可以離開了集群設置為默認值。Delta Live Tables運行時在運行管道之前創建一個集群。

  3. 點擊創建

  4. 複製Python或SQL查詢然後粘貼到筆記本的第一個單元格中。

  5. 創建一個新的管道並將筆記本添加到筆記本庫字段。要發布管道處理的輸出,可以在目標字段。

  6. 啟動管道。如果您配置了目標價值,你可以查看並驗證結果查詢的。

示例查詢

進口dltpyspark.sql.functions進口上校expr@dlt視圖def用戶():返回火花readStream格式“δ”表格“cdc_data.users”dltcreate_streaming_live_table“目標”dltapply_changes目標=“目標”=“用戶”=(“標識”],sequence_by=上校“sequenceNum”),apply_as_deletes=expr"operation = 'DELETE'"),apply_as_truncates=expr"操作= 'TRUNCATE'"),except_column_list=(“操作”“sequenceNum”],stored_as_scd_type=1
——創建並填充目標表。創建刷新流媒體生活表格目標應用變化生活目標cdc_data用戶用戶標識應用作為刪除操作=“刪除”應用作為截斷操作=“截斷”序列通過sequenceNum*除了操作sequenceNum存儲作為鏡頭分割類型1

創建並運行SCD類型2示例管道

  1. 轉到Databricks登錄頁並選擇創建一個筆記本,或按新圖標在側欄中選擇筆記本。的創建筆記本對話框出現了。

  2. 創建筆記本對話,給你的筆記本起個名字;例如,DLT CDC示例。選擇PythonSQL默認的語言下拉菜單,根據您的首選語言。你可以離開了集群設置為默認值。Delta Live Tables運行時在運行管道之前創建一個集群。

  3. 點擊創建

  4. 複製Python或SQL查詢然後粘貼到筆記本的第一個單元格中。

  5. 創建一個新的管道並將筆記本添加到筆記本庫字段。要發布管道處理的輸出,可以在目標字段。

  6. 啟動管道。如果您配置了目標價值,你可以查看並驗證結果查詢的。

示例查詢

進口dltpyspark.sql.functions進口上校expr@dlt視圖def用戶():返回火花readStream格式“δ”表格“cdc_data.users”dltcreate_streaming_live_table“目標”dltapply_changes目標=“目標”=“用戶”=(“標識”],sequence_by=上校“sequenceNum”),apply_as_deletes=expr"operation = 'DELETE'"),except_column_list=(“操作”“sequenceNum”],stored_as_scd_type=“2”
——創建並填充目標表。創建刷新流媒體生活表格目標應用變化生活目標cdc_data用戶用戶標識應用作為刪除操作=“刪除”序列通過sequenceNum*除了操作sequenceNum存儲作為鏡頭分割類型2

帶有跟蹤曆史的示例查詢

進口dltpyspark.sql.functions進口上校expr@dlt視圖def用戶():返回火花readStream格式“δ”表格“cdc_data.users”dltcreate_streaming_live_table“目標”dltapply_changes目標=“目標”=“用戶”=(“標識”],sequence_by=上校“sequenceNum”),apply_as_deletes=expr"operation = 'DELETE'"),except_column_list=(“操作”“sequenceNum”],stored_as_scd_type=“2”track_history_except_column_list=(“城市”
——創建並填充目標表。創建刷新流媒體生活表格目標應用變化生活目標cdc_data用戶用戶標識應用作為刪除操作=“刪除”序列通過sequenceNum*除了操作sequenceNum存儲作為鏡頭分割類型2跟蹤曆史*除了城市