改變數據獲取與三角洲生活表
預覽
這個特性是在公共預覽。
您可以使用變化數據捕獲(CDC)三角洲生活表更新表根據源數據的變化。CDC在三角洲地區的生活表支持SQL和Python接口。三角洲生活與緩慢變化維度表支持更新表(SCD) 1型和2型:
直接使用的化合物1型更新記錄。不保留曆史記錄更新。
使用SCD 2型保留的曆史記錄,在所有更新或更新一套指定的列。看到追蹤曆史隻指定列SCD 2型
語法和其他參考資料,請參閱:
請注意
本文描述了如何更新表在三角洲住表管道基於源數據的變化。學習如何記錄和查詢為三角洲表行級變化信息,明白了使用三角洲湖變化數據以磚。
疾病預防控製中心是如何實現與達美住表嗎?
您必須指定的源數據序列中的一列記錄,三角洲生活表解釋的單調遞增表示適當的源數據的排序。三角洲生活表自動處理數據到達的順序。化合物2型變化,δ住表傳播適當的排序值__START_AT
和__END_AT
目標表的列。應該有一個不同的更新在每個排序值,每個鍵和零排序值是不支持的。
執行中心處理三角洲生活表,您首先創建一個流表,然後使用一個應用變化成
聲明中指定源,鑰匙,和測序改變飼料。創建目標流表,可以使用創建或刷新流媒體表
在SQL或聲明create_streaming_table ()
在Python中,函數。創建語句定義疾病預防控製中心處理,使用應用變化
在SQL或聲明apply_changes ()
在Python中,函數。語法細節,請參閱改變δ生活表中數據獲取與SQL或改變δ生活表中數據獲取與Python。
δ住表數據對象是用於什麼疾病預防控製中心處理?
當你聲明的目標表,在蜂巢metastore創建了兩個數據結構:
一個視圖使用分配給目標表的名稱。
內部支持表用δ住表表管理中心處理。這張桌子是由將命名的
__apply_changes_storage_
到目標表名。
例如,如果你聲明一個目標表命名dlt_cdc_target
,你會看到一個視圖命名dlt_cdc_target
和一個表命名__apply_changes_storage_dlt_cdc_target
metastore。創建一個視圖允許三角洲生活表過濾掉多餘的信息(例如,墓碑和版本)需要處理無序的數據。將處理過的數據,查詢目標視圖。你也可以查詢的原始數據__apply_changes_storage_
表刪除記錄和額外的版本列。如果你手動添加數據表,記錄被認為是其他更改,因為之前的版本列失蹤。
限製
度量目標表,如輸出的行數,並不是可用的。
化合物2型更新將添加一個曆史行對於每一個輸入行,即使沒有列已經改變了。
的目標
應用變化成
查詢或apply_changes
不能使用函數作為一個流的源表。表讀取的目標應用變化成
查詢或apply_changes
函數必須是一個生活表。不支持在一個預期
應用變化成
查詢或apply_changes ()
函數。使用預期的源或目標數據集:源數據上添加預期通過定義一個中間表所需的期望和使用這個數據集作為目標表的源。
添加預期與下遊目標數據表,從目標表中讀取輸入數據。
化合物1型和SCD 2型磚
以下部分提供的例子,演示三角洲生活表SCD 1型和2型查詢更新目標表基於源的事件:
創建新用戶記錄。
刪除一個用戶記錄。
更新用戶記錄。在化合物1型的例子中,最後一次
更新
操作遲到,從目標表,展示了事件的處理。
所有下麵的例子假設熟悉配置和更新三角洲住表管道。看到教程:管道運行第一個三角洲住表。
為了運行這些示例,您必須首先創建一個示例數據集。看到生成測試數據。
以下是輸入記錄這些例子:
用戶標識 |
的名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
124年 |
勞爾 |
瓦哈卡 |
插入 |
1 |
123年 |
伊莎貝爾 |
蒙特雷 |
插入 |
1 |
125年 |
梅塞德斯 |
提華納 |
插入 |
2 |
126年 |
莉莉 |
坎昆 |
插入 |
2 |
123年 |
零 |
零 |
刪除 |
6 |
125年 |
梅塞德斯 |
瓜達拉哈拉 |
更新 |
6 |
125年 |
梅塞德斯 |
墨西卡利 |
更新 |
5 |
123年 |
伊莎貝爾 |
吉娃娃 |
更新 |
5 |
如果你取消最後一行數據的示例中,將插入以下記錄指定記錄應該被截斷的地方:
用戶標識 |
的名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
零 |
零 |
零 |
截斷 |
3 |
請注意
下麵的例子包括所有選項來指定刪除
和截斷
業務,但每一個都是可選的。
化合物1型更新過程
下麵的代碼示例演示了處理SCD 1型更新:
進口dlt從pyspark.sql.functions進口上校,expr@dlt。視圖def用戶():返回火花。readStream。格式(“δ”)。表(“cdc_data.users”)dlt。create_streaming_table(“目標”)dlt。apply_changes(目標=“目標”,源=“用戶”,鍵=(“標識”),sequence_by=上校(“sequenceNum”),apply_as_deletes=expr(“=”刪除“行動”),apply_as_truncates=expr(“=“截斷”行動”),except_column_list=(“操作”,“sequenceNum”),stored_as_scd_type=1)
——創建和填充目標表。創建或刷新流媒體表目標;應用變化成生活。目標從流(cdc_data。用戶)鍵(用戶標識)應用作為刪除當操作=“刪除”應用作為截斷當操作=“截斷”序列通過sequenceNum列*除了(操作,sequenceNum)存儲作為鏡頭分割類型1;
跑後SCD 1型的例子中,目標表包含以下記錄:
用戶標識 |
的名字 |
城市 |
---|---|---|
124年 |
勞爾 |
瓦哈卡 |
125年 |
梅塞德斯 |
瓜達拉哈拉 |
126年 |
莉莉 |
坎昆 |
後運行SCD 1型有額外的例子截斷
記錄,記錄124年
和126年
截斷的嗎截斷
操作在sequenceNum = 3
,目標表包含以下記錄:
用戶標識 |
的名字 |
城市 |
---|---|---|
125年 |
梅塞德斯 |
瓜達拉哈拉 |
2型更新過程的化合物
下麵的代碼示例演示了處理SCD 2型更新:
進口dlt從pyspark.sql.functions進口上校,expr@dlt。視圖def用戶():返回火花。readStream。格式(“δ”)。表(“cdc_data.users”)dlt。create_streaming_table(“目標”)dlt。apply_changes(目標=“目標”,源=“用戶”,鍵=(“標識”),sequence_by=上校(“sequenceNum”),apply_as_deletes=expr(“=”刪除“行動”),except_column_list=(“操作”,“sequenceNum”),stored_as_scd_type=“2”)
——創建和填充目標表。創建或刷新流媒體表目標;應用變化成生活。目標從流(cdc_data。用戶)鍵(用戶標識)應用作為刪除當操作=“刪除”序列通過sequenceNum列*除了(操作,sequenceNum)存儲作為鏡頭分割類型2;
跑後SCD 2型的例子中,目標表包含以下記錄:
用戶標識 |
的名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123年 |
伊莎貝爾 |
蒙特雷 |
1 |
5 |
123年 |
伊莎貝爾 |
吉娃娃 |
5 |
6 |
124年 |
勞爾 |
瓦哈卡 |
1 |
零 |
125年 |
梅塞德斯 |
提華納 |
2 |
5 |
125年 |
梅塞德斯 |
墨西卡利 |
5 |
6 |
125年 |
梅塞德斯 |
瓜達拉哈拉 |
6 |
零 |
126年 |
莉莉 |
坎昆 |
2 |
零 |
追蹤曆史隻指定列SCD 2型
SCD 2型支持指定輸出列的一個子集生成曆史上那些列;更改其他列就地更新,而不是產生新的曆史記錄。
使用跟蹤曆史SCD 2型三角洲生活表,您必須顯式地啟用這個特性在管道通過添加以下配置三角洲生活表管道設置:
{“配置”:{“pipelines.enableTrackHistory”:“真正的”}}
如果pipelines.enableTrackHistory
沒有設置或設置假
談到2型查詢使用的默認行為為每個輸入行生成一個曆史記錄。
下麵的例子演示了使用跟蹤曆史SCD 2型:
進口dlt從pyspark.sql.functions進口上校,expr@dlt。視圖def用戶():返回火花。readStream。格式(“δ”)。表(“cdc_data.users”)dlt。create_streaming_table(“目標”)dlt。apply_changes(目標=“目標”,源=“用戶”,鍵=(“標識”),sequence_by=上校(“sequenceNum”),apply_as_deletes=expr(“=”刪除“行動”),except_column_list=(“操作”,“sequenceNum”),stored_as_scd_type=“2”,track_history_except_column_list=(“城市”])
——創建和填充目標表。創建或刷新流媒體表目標;應用變化成生活。目標從流(cdc_data。用戶)鍵(用戶標識)應用作為刪除當操作=“刪除”序列通過sequenceNum列*除了(操作,sequenceNum)存儲作為鏡頭分割類型2;跟蹤曆史在*除了(城市)
跑後的SCD 2型跟蹤曆史有額外的例子截斷
記錄,目標表包含以下記錄:
用戶標識 |
的名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123年 |
伊莎貝爾 |
吉娃娃 |
1 |
6 |
124年 |
勞爾 |
瓦哈卡 |
1 |
零 |
125年 |
梅塞德斯 |
瓜達拉哈拉 |
2 |
零 |
126年 |
莉莉 |
坎昆 |
2 |
零 |
生成測試數據
下麵的代碼生成提供了一個示例數據集用於本教程中的示例查詢現在。假設你有適當的憑證來創建一個新的模式和創建一個新表,你可以用一個筆記本或執行這些語句磚SQL。下麵的代碼是不為了運行管道作為三角洲的一部分生活表:
創建模式如果不存在cdc_data;創建表cdc_data。用戶作為選擇col1作為用戶標識,col2作為的名字,col3作為城市,col4作為操作,col5作為sequenceNum從(值——初始載荷。(124年,“勞爾”,“瓦哈卡”,“插入”,1),(123年,“伊莎貝爾”,“蒙特雷”,“插入”,1),——新用戶。(125年,“梅賽德斯”,“提華納”,“插入”,2),(126年,“莉莉”,“坎昆”,“插入”,2),——伊莎貝爾從係統中刪除和奔馳搬到瓜達拉哈拉。(123年,零,零,“刪除”,6),(125年,“梅賽德斯”,“瓜達拉哈拉”,“更新”,6),——這批更新到達的順序。上麵批sequenceNum 5將是最終的狀態。(125年,“梅賽德斯”,“墨西卡利”,“更新”,5),(123年,“伊莎貝爾”,“吉娃娃”,“更新”,5)——取消測試截斷。———(空,空,空,“截斷”,3));