跳轉到主要內容
工程的博客

簡化變化數據獲取與數據磚三角洲生活表

分享這篇文章

本指南將演示如何利用變化數據捕獲三角洲住表中管道識別新記錄和捕捉更改數據集的數據。三角洲生活表管道使您能夠開發可伸縮、可靠和低延遲數據管道,在執行中變化數據捕獲數據湖與最低要求計算資源和無縫的無序的數據處理。

注意:我們建議後開始與達美住表這解釋了創建可伸縮的、可靠的管道使用三角洲生活表(DLT)及其聲明ETL定義。

背景變化數據捕獲

變化數據捕獲(疾病預防控製中心)是一個過程,識別和捕捉增量變化(數據刪除、插入和更新)數據庫,跟蹤客戶,訂單或產品狀態為近實時數據的應用程序。進化的疾控中心提供實時數據處理數據連續漸進的方式作為新事件發生。
超過80%的公司計劃在2025年實施多重雲戰略,選擇合適的方法為你的業務,允許無縫實時集中所有數據ETL管道跨多個環境的變化是至關重要的。

通過捕獲中心事件,磚用戶可以re-materialize源表如表在三角洲Lakehouse並運行他們的分析之上,同時能夠結合數據與外部係統。並入命令在三角洲湖磚使客戶能夠高效地插入和刪除記錄的數據湖泊——你可以查看我們之前的深俯衝的話題beplay体育app下载地址在這裏。這是一個常見的用例,我們觀察磚的許多客戶利用湖泊三角洲執行,並保持他們的數據湖泊最新的實時業務數據。beplay体育app下载地址

而三角洲湖為實時疾病預防控製中心提供了一個完整的解決方案在數據同步湖,我們現在興奮地宣布三角洲生活中的變化數據捕獲功能表,使結構更簡單,更高效和可伸縮的。DLT允許用戶攝取CDC數據無縫地使用SQL和Python。

早期疾病預防控製中心與三角洲表解決方案是使用合並成操作需要手動排序數據,以避免失敗當源數據集的多行匹配而試圖更新相同的目標三角洲表行。處理無序的數據,有一個額外的步驟需要進行預處理源表使用foreachBatch實現消除多個匹配的可能性,隻保留最新的改變對於每個關鍵(見變化數據捕獲的例子)。新申請更改生效DLT管道自動和無縫地處理無序的數據沒有任何數據工程人工幹預的必要性。

疾病預防控製中心與磚三角洲的生活表

在這個博客中,我們將演示如何使用變化到命令在三角洲住表中管道申請一個常見的疾病預防控製中心疾病控製和預防中心的數據使用情況是來自外部係統。各種疾病預防控製中心的工具,如Debezium Fivetran, Qlik複製,Talend, StreamSets。雖然具體的實現不同,這些工具通常捕獲和記錄的曆史數據變化日誌;下遊應用程序使用這些疾病預防控製中心日誌。在我們的示例數據落在雲從CDC Debezium等工具,對象存儲Fivetran等等。

我們有來自各種疾病預防控製中心的數據工具降落在雲對象存儲或Apache卡夫卡等消息隊列。通常我們看到疾病預防控製中心用於攝入我們所說的大獎章的架構。大獎章的體係結構是一種數據設計模式用於Lakehouse邏輯上組織數據,逐步逐步的目標和改進的結構和質量數據流經的每一層的體係結構。三角洲生活無縫應用更改表允許您從美國疾病控製與預防中心Lakehouse提要表;這個功能結合圖案架構允許增量變化輕易流過大規模分析工作量。使用疾病預防控製中心的大獎章體係結構提供了許多好處因為隻有用戶更改或添加數據需要處理。因此,它使用戶能夠有效保持黃金表最新的商業數據。

注:例子同時適用於SQL和Python版本的疾病預防控製中心和一個特定的方式使用操作,評估變化,請參閱官方文檔在這裏

先決條件

為了最有效的指導,你應該有一個基本的熟悉:

  • SQL或Python
  • 三角洲生活表
  • ETL開發管道和/或處理大數據係統
  • 磚互動的筆記本和集群
  • 你必須能夠訪問一個磚工作區與權限創建新集群,運行工作,並將數據保存到一個外部雲對象存儲或位置DBFS
  • 在這個博客我們創建的管道,“高級”產品版本支持的執行數據質量約束,需要選擇。

數據集

我們在這裏消費現實CDC數據從外部數據庫。在這個管道中,我們將使用騙子庫生成的數據集疾病預防控製中心工具Debezium可以產生和納入雲存儲初始攝取的磚。使用自動加載程序我們從雲對象存儲增量加載信息,並將它們存儲在銅表存儲原始消息。銅表僅供數據攝入使真理的快速訪問單一來源。接下來我們從清潔銅層表執行應用更改為傳播下遊銀表的更新。數據流銀表,通常它變得更加精煉和優化(“足夠”)提供一個企業的關鍵業務實體。請參見下圖。

樣本疾病預防控製中心流與疾病預防控製中心的工具,自動裝卸機和δ生活表管道

這個博客專注於一個簡單的例子,需要一個JSON消息與四個領域的客戶名稱、電子郵件、地址和id以及兩個字段:操作(商店操作代碼(刪除、添加、更新、創建),和operation_beplay体育app下载地址date(存儲記錄的日期和時間戳來為每個操作動作)來描述更改的數據。

與上麵的字段生成一個樣本數據集,我們使用一個Python包產生假數據,騙子。你可以找到筆記本相關數據生成部分在這裏。在這個筆記本我們提供的名稱和存儲位置寫生成的數據。我們使用的是磚的DBFS功能,請參閱DBFS文檔更多地了解它是如何工作的。然後,我們使用一個PySpark用戶定義函數為每個字段,生成合成數據集和寫數據定義的存儲位置,我們將請參考其他筆記本電腦合成數據集訪問。

攝入的原始數據集使用自動加載程序

根據圖案架構範式,青銅層擁有最原始數據質量。在這個階段我們可以逐步使用自動裝卸機讀取新的數據雲存儲的一個位置。這裏我們添加路徑下生成的數據集配置部分管道設置,它允許我們加載源路徑作為一個變量。所以現在我們的配置下管道設置看起來像下圖:

“配置”:{“源”:“/ tmp /演示/ cdc_raw”}

然後我們在筆記本中加載這個配置屬性。

讓我們看一看銅表我們會攝取,SQL。,和b。使用Python

一個SQL。

設置spark.source;創建流表customer_bronze生活(地址字符串,電子郵件字符串,id字符串,firstname字符串,字符串,操作字符串,operation_date字符串,_rescued_data字符串)TBLPROPERTIES (“質量”=“青銅”)評論“新客戶數據逐步吸收從雲對象存儲著陸區”作為SELECT *cloud_files (" ${來源}/客戶”beplay体育app下载地址,“json”地圖(“cloudFiles.inferColumnTypes”,“真正的”));

Python b。

進口dltpyspark.sql.functions進口*pyspark.sql.types進口*源= spark.conf.get (“源”)@dlt.table (name =“customer_bronze”,評論=“新客戶數據逐步吸收從雲對象存儲著陸區”,table_properties = {“質量”:“青銅”})defcustomer_bronze():返回(spark.readStream。格式(“cloudFiles”)\.option (“cloudFiles.format”,“json”)\.option (“cloudFiles.inferColumnTypes”,“真正的”)\.load (f”{來源}/beplay体育app下载地址客戶”))

上麵的語句使用自動加載程序從json文件中創建一個名為customer_bronze的流媒體直播表。當使用自動裝卸機在三角洲生活表,您不需要提供任何模式或檢查點位置,這些位置將被自動管理你的DLT管道。

自動加載器提供了一個結構化流源cloud_files在SQL和cloudFiles在Python中,采用雲存儲路徑和的形式作為參數。
降低計算成本,我們建議的DLT管道運行觸發模式作為micro-batch假設你沒有非常低的延遲需求。

預期和高質量的數據

在下一步創建高質量、多樣化和可訪問的數據集,我們期望實施質量檢查標準使用約束。目前,一個約束可以保留,下降,或失敗。為更多的細節在這裏看到的。所有的約束都記錄到啟用流線型的質量監控。

一個SQL。

創建臨時現場直播customer_bronze_clean_v (約束valid_id期望(id)違反下降,約束valid_address期望(地址),約束valid_operation期望(操作)違反下降)TBLPROPERTIES(“質量”=“銀”)評論”潔淨青銅客戶視圖(即什麼將成為銀)”作為選擇*流(LIVE.customer_bronze);

Python b。

@dlt.view (name =“customer_bronze_clean_v”,評論=“潔淨青銅客戶視圖(即什麼將成為銀)”)@dlt.expect_or_drop (“valid_id”,“id”不是零)@dlt.expect (“valid_address”,“地址不空”)@dlt.expect_or_drop (“valid_operation”,“不是空行動”)defcustomer_bronze_clean_v():返回dlt.read_stream (“customer_bronze”)\.select (“地址”,“電子郵件”,“id”,“firstname”,“姓”,“操作”,“operation_date”,“_rescued_data”)

使用應用變化成語句更改傳播到下遊目標表

在執行之前申請變更成查詢,我們必須確保目標流表,我們要保存最新的數據存在。如果它不存在,我們需要創建一個。下麵的細胞是創建一個目標流表的例子。注意,發布這篇博客的時候,目標要求隨流表創建語句申請變更成查詢,都需要在管道,否則你的表創建查詢將失敗。

一個SQL。

創建流媒體直播customer_silverTBLPROPERTIES(“質量”=“銀”)評論“幹淨、合並客戶”;beplay体育app下载地址

Python b。

dlt.create_target_table (name =“customer_silver”,評論=“幹淨、合並客戶”beplay体育app下载地址,table_properties = {“質量”:“銀”})

現在我們有一個目標流表,我們可以更改傳播到下遊目標表使用申請變更成查詢。而CDC飼料有插入、更新和刪除事件,DLT的默認行為是應用插入和更新事件從源數據集匹配的任何記錄主鍵,和排序的字段標識事件的順序。更確切地說它更新現有的任何行匹配的目標表主鍵(s)或插入一個新行,當一個匹配的目標流表中記錄不存在。我們可以使用使用時刪除在SQL或其等價的apply_as_deletes參數在Python中來處理刪除事件。

在這個示例中,我們使用“id”作為我的主鍵,唯一地標識客戶和允許疾控中心事件申請確定目標流表的客戶記錄。beplay体育app下载地址因為“operation_date“疾控中心的邏輯順序事件源數據集,我們使用“operation_date序列”在SQL或其等價的“sequence_by =坳(“operation_date”)”在Python中處理變更的事件到達的順序。請記住,我們使用的字段值(或sequence_by序列)應該是唯一的在所有相同的密鑰更新。在大多數情況下,順序與時間戳列將列信息。

最後我們使用“列*除了(操作、operation_date _rescued_data)”在SQL或其等價的“except_column_list”=(“操作”、“operation_date”,“_rescued_data”)在Python中排除三列“操作”、“operation_date”、“_rescued_data”從目標流表。默認情況下所有的列都包含在目標流表,當我們不指定“列”條款。

一個SQL。

申請更改LIVE.customer_silver流(LIVE.customer_bronze_clean_v)鍵(id)應用作為刪除操作=“刪除”序列通過operation_date*除了(操作、operation_date_rescued_data);

Python b。

dlt.apply_changes (目標=“customer_silver”,源=“customer_bronze_clean_v”,鍵= [“id”),sequence_by =坳(“operation_date”),apply_as_deletes = expr (“=”刪除“行動”),except_column_list = [“操作”,“operation_date”,“_rescued_data”])

檢查可用條款的完整列表在這裏
請注意,在發布這篇博客的時候,表讀取的目標申請變更成查詢或apply_changes函數必須是一個生活表,不能流表。

一個SQLpython這一部分筆記本可供參考。現在我們已經準備好了所有的細胞,讓我們創建一個管道從雲攝取數據對象存儲。在新選項卡中打開工作或工作區窗口,並選擇“三角洲住表”。

管道與此相關的博客,有以下DLT管道設置:

{“集群”:【{“標簽”:“默認”,“num_workers”:1}),“發展”:真正的,“連續”:,“版”:“高級”,“光子”:,“庫”:【{“筆記本”:{“路徑”:“/回購/(電子郵件保護)/ Delta-Live-Tables /筆記本電腦/ 1-CDC_DataGenerator”}},{“筆記本”:{“路徑”:“/回購/(電子郵件保護)/ Delta-Live-Tables /筆記本電腦/ 2-Retail_DLT_CDC_sql”}}),“名稱”:“CDC_blog”,“存儲”:“dbfs: / home / mydir / myDB / dlt_storage”,“配置”:{“源”:“/ tmp /演示/ cdc_raw”,“pipelines.applyChangesPreviewEnabled”:“真正的”},“目標”:“my_database”}
  1. 選擇“創建管道”來創建一個新的管道
  2. 指定一個名稱,如“零售中心管道”
  3. 指定你的筆記本路徑之前已經創建,一個用於生成的數據集使用偽造者的包,另一個路徑在DLT攝入生成的數據。第二個筆記本路徑可以參考筆記本用SQL編寫的,或者Python取決於你的語言的選擇。
  4. 訪問數據生成的第一個筆記本,添加數據路徑的配置。這裏我們將數據存儲在“/ tmp /演示/ cdc_raw /客戶”,我們將“源”“beplay体育app下载地址/ tmp /演示/ cdc_raw /”引用“源/客戶”在第二個筆記本。
  5. 指定目標(這是可選的,他指的是目標數據庫),在那裏你可以查詢結果表從你的管道。
  6. 指定對象存儲的存儲位置(這是可選的),訪問你的DLT生產數據和元數據日誌的管道。
  7. 管道模式設置為觸發。在觸發模式下,DLT管道將使用新數據的來源,一旦處理它將自動終止計算資源。你可以觸發和連續模式之間切換時編輯管道設置。設置“連續”:假的JSON相當於設置管道觸發模式。
  8. 對於這個工作負載可以禁用自動定量自動駕駛儀選項下,集群和隻使用1工人。生產工作負載,我們建議啟用自動定量和設置最大的集群大小所需的工人數量。
  9. 選擇“開始”
  10. 現在您的管道創建並運行!

樣本三角洲住表下遊管道傳播更改表

DLT管道血統可觀測性,數據質量監控

所有DLT管道日誌存儲在管道的存儲位置。你可以指定存儲位置隻有當你創建管道。注意一旦創建了管道可以不再修改存儲位置。

你可以查看我們之前的深俯衝的話題在這裏。試一試這個筆記本看到管道可觀測性和數據質量監控的例子DLT管道與此相關的博客。

結論

在這個博客中,我們展示了我們無縫用戶高效地實現變化數據捕獲(CDC)到Lakehouse平台與達美住表(DLT)。Beplay体育安卓版本DLT為內置的質量控製提供了深可見性管道操作,觀察管道血統,監控模式,在管道中的每個步驟和質量檢查。DLT支持自動錯誤處理和最好的伸縮能力流工作負載,使得用戶擁有質量數據和優化他們的工作負載所需的資源。

數據工程師現在可以輕鬆實現疾病預防控製中心和一個新的聲明申請變更為API在SQL或與DLT Python。這個新功能允許你ETL管道容易識別的變化和應用這些更改在成千上萬的表與低延遲的支持。

準備開始試用中心在三角洲表為自己生活嗎?
請注意這個網絡研討會學習如何三角洲生活表簡化了數據轉換的複雜性和ETL,看看我們改變數據獲取與三角洲生活表官方文檔,github並遵循的步驟視頻創建你的管道!
免費試著磚
看到所有工程的博客的帖子
Baidu
map