技術指南
開始使用Delta Live表
簡介
本指南將演示Delta Live Tables如何使您能夠開發可伸縮的、可靠的數據管道,這些管道符合Lakehouse架構的數據質量標準。
讓我們從描述一個常見場景開始。我們在雲對象存儲(如S3、ADLS或GCS)中擁有來自各種OLTP係統的數據。有些數據集定期更新,有些是源係統的曆史快照。我們對數據的消費者和轉換有了大致的了解,我們將遵循Lakehouse架構將數據質量劃分為原始、精煉和聚合層:
這些Gold表中的每一個都可能服務於不同的消費者,從BI報告到訓練機器學習模型,因此這些數據從源到Gold層的旅程將有不同的需求,我們作為數據工程師關心:
- 延遲:當我們攝取新數據時,它必須在5秒內在銀表中可見。
- 成本:“我們無法讓一個固定容量的集群全天候運行來支持這些更新”
- 精度:“我應該在實時數據源中對延遲到達的數據承擔多少責任?”
乍一看,這些需求中的許多似乎很容易在上麵的參考管道中滿足。然而,盡管湖屋管道故意設計得優雅而簡單,但在現實中,我們通常不是在處理直接的線性流。在現實中,它通常是這樣的:
隨著我們開始擴大規模,用額外的數據源來豐富我們的分析環境,以支持新的見解,ETL複雜性呈指數級增長,以下挑戰導致這些管道變得極其脆弱:
- 由於表之間沒有明確的依賴關係,錯誤處理和恢複非常費力
- 數據質量很差,因為強製執行和監視約束是一個手動過程
- 無法跟蹤數據沿襲,或者充其量需要大量的實現
- 顆粒級、單個批處理/流級別的可觀察性是不可能的
- 很難在統一的管道中處理批處理和流處理
注意:批處理和流式?
Spark提供了使用單個API的批處理和流處理範式的能力,Delta Lake支持在單個數據集上並發批處理和流操作,從而消除了在兩層或兩層數據中所需的權衡或再處理λ架構,在實現和監控流方麵仍有很多工作要做,特別是在ETL過程中,它將流和批作業作為數據集之間的單獨跳點組合在一起。
聲明式ETL
在編寫ETL管道時,數據轉換通常是“程序性”執行的。這意味著將對數據執行的操作表示為ETL引擎要執行的一係列計算步驟。在許多情況下,即使您使用的是編排工具(如風流或Azure Data Factory),啟動的作業也包含過程邏輯。盡管編排器可能必須知道作業之間的依賴關係,但它們對ETL轉換和業務邏輯是不透明的。
另一方麵,聲明式ETL要求用戶描述管道的預期結果,而不顯式列出得到結果必須執行的有序步驟。聲明性意味著關注我們期望的目標是什麼,並利用像DLT這樣的智能引擎來確定計算框架應該“如何”執行這些過程。
你可能會想到程序性和聲明性的ETL定義,就像給某人一步一步的駕駛方向,給他們提供一個包括城市地圖和交通流信息的GPS。
駕駛指南會提供司機到達目的地的步驟,但不能提供他們的預計到達時間,他們也不知道他們會在路上經過哪些社區。此外,如果需要繞行路線,分步指示現在是無用的,但GPS地圖將能夠繞行重新規劃路線。
在這個比喻中,地圖就是DLT管道。DLT引擎是GPS,可以解釋地圖並確定最佳路線,並為您提供諸如ETA等指標。關於在路由中遍曆的鄰居的詳細信息就像數據沿襲一樣,在事故(或錯誤)周圍找到彎路的能力是依賴項解析和模塊化的結果,這是DLT的聲明性性質所提供的。
你的第一條管道
在本指南中,我們將實現一個遭受這些挑戰的管道,並將此作為一個機會,教你DLT的聲明式開發範式如何簡化ETL開發,並提高整個湖屋的質量、沿襲和可觀察性。
為了快速開始,我們將管道的完成結果托管在Delta Live Tables筆記本回購.你可以複製SQL筆記本到您的Databricks部署中作為參考,或者您可以按照指南進行操作。
本指南將重點介紹SQL管道,但如果您更願意在Python中運行相同的管道,請使用這個筆記本.
先決條件
為了充分利用本指南,你應該基本熟悉以下內容:
- SQL
- 開發ETL管道和/或與大數據係統合作
- 數據庫交互式筆記本和集群
- 您必須能夠訪問Databricks工作區,並具有創建新集群、運行作業和將數據保存到外部雲對象存儲或數據存儲上的位置的權限DBFS.
數據集
在第一個管道中,我們將使用retail-org數據集databricks-datasets每個工作區都有。Delta Live Tables提供了處理Lakehouse中Bronze表(即原始數據)的細微差別的技術。您將使用自動加載程序從雲對象存儲增量加載數據的特性。
青銅數據集:使用雲文件攝取數據集
青銅數據集代表最原始的質量。我們通常會從源頭進行最小限度的調整,利用雲存儲的成本效益來創建一個原始源,我們可以在此基礎上驗證改進的數據,訪問我們通常不報告的字段,或者創建新的管道。此階段的常見模式是不斷從雲存儲中的某個位置攝取新數據。
雖然其中一些術語在通用說法中可以互換使用,但它們在DLT中具有不同的含義。有Spark結構化流經驗的讀者可能還會注意到一些重載的術語。在這裏,我們試圖消除這些術語的歧義:
- 流媒體數據集被視為無界的處理範式嗎
- 增量更新模式是否隻對目標數據進行最小的更改
- 連續指始終運行直到在任意時間停止的管道,而不是在管道啟動時基於源數據狀態的時間停止
你可能會注意到無界流處理框架(如Spark Structured Streaming)和DLT中的流數據集之間有一些重疊。事實上,DLT的流數據集利用了Spark結構化流和Delta事務日誌的基礎,但抽象了大部分複雜性,允許開發人員專注於滿足處理需求,而不是係統級的繁重工作。
我們將在本指南的黃金部分討論DLT的流數據集和DLT的連續模式是如何相互作用的。
作為一個例子,讓我們看一下我們將要攝取的一個Bronze表。
CREATE STREAMING LIVE TABLE sales_orders_raw COMMENT“原始銷售訂單,從/ databicks -datasets中攝取。”TBLPROPERTIES ("quality" = "bronze") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles. properties ")inferColumnTypes”、“真正的”);
SQL語句使用Auto Loader從json文件創建一個名為sales_orders_raw的流表。
cloud_files:調用Auto Loader,並以雲存儲路徑和格式作為參數。(注意,該API與DLT之外的cloudFiles調用略有不同)。
現在,讓我們創建一個Pipeline來從雲對象存儲中攝取數據。
打開你的工作區
- 創建你的第一個DLT管道筆記本
- 為你的DLT管道創建一個新的筆記本,如“dlt_retail_sales_pipeline”
- 將以下代碼複製到第一個單元格中:
“客戶購買成品,從/ databicks -datasebeplay体育app下载地址t中攝取。”TBLPROPERTIES ("quality" = "mapping") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/beplay体育app下载地址customers/", "csv");CREATE STREAMING LIVE TABLE sales_orders_raw COMMENT“原始銷售訂單,從/ databicks -datasets中攝取。”TBLPROPERTIES ("quality" = "bronze") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles. properties ")inferColumnTypes”、“真正的”);
注意:流水線筆記本
DLT流水線筆記本是特殊的,即使他們使用標準的Databricks筆記本。雖然我們目前不阻止您將集群附加到Pipeline Notebook,但DLT絕不會使用附加的集群來運行管道。作為最佳實踐,我們建議您將管道筆記本保持在分離狀態,並在開發過程中使用二級草稿筆記本運行任意命令。如果您在附加的集群上運行管道筆記本,您將看到類似這樣的內容……
- 在新選項卡或窗口中打開作業,並選擇“Delta Live Tables”
- 選擇“Create Pipeline”創建一個新的管道
- 指定一個名稱,例如“銷售訂單管道”
- 將Notebook Path指定為步驟2中創建的Notebook。這是必需的步驟,但將來可能會修改為引用非筆記本庫。
- Target是可選的,但推薦使用,因為目標是目標數據庫,其他授權成員可以從管道訪問結果數據。
- 存儲位置是可選的,但推薦使用。如果已配置外部blob存儲位置,則可以指定外部blob存儲位置。DLT將在這裏為管道生成數據集和元數據日誌。提示:如果沒有指定storage,那麼DLT Pipeline生成的所有數據和日誌都將存儲在DLT創建的DBFS根存儲中的路徑中。稍後您可以在Edit Setting JSON文件中找到該路徑。要將數據和日誌存儲在外部(即非dbfs根目錄)位置,必須為DLT管道指定存儲位置。
- “管道模式”為觸發
- 設置用於的最小和最大工人數集群規模
- 選擇“開始”
- 您已經創建了您的第一個管道!
管道日誌
現在,您將在圖的下方看到包含管道運行日誌的部分。下麵是這個部分的樣子。
第一個攝取代碼解釋
圖標表示DLT數據集,在本例中是表。這兩張桌子我們認為是青銅桌。具體來說,它們是增量活動表,我們使用Auto Loader特性使用cloud_files函數
在DLT中,視圖類似於SQL中的臨時視圖,是某些計算的別名。視圖允許您將複雜的查詢分解為更小或更容易理解的查詢。視圖還允許您將給定的轉換重用為多個表的源。視圖隻能從管道中獲得,不能以交互方式查詢。
在DLT中,表類似於傳統的物化視圖。Delta Live Tables運行時自動創建Delta格式的表,並確保使用創建表的查詢的最新結果更新這些表。
消費者可以像使用標準Delta表一樣從Data Lakehouse讀取這些表和視圖(例如用於SQL報告或Python數據科學),但它們是由DLT引擎更新和管理的。有關更多詳細信息,請參見目標在下麵。
銀色數據集:期望和高質量數據
在本節中,我們將交給您開發端到端管道的控製權,如下麵的DAG所示。我們已經創建了青銅數據集,現在為銀,然後是金,概述在Lakehouse架構在2020年CIDR數據庫會議上發表的論文,並使用每一層教你一個新的DLT概念。
銀色層是關於高質量、多樣化和可訪問的數據集。這些表可能不會服務於特定的用例,例如以低延遲為生產報告提供服務,但它們已經被清理、轉換和管理,因此數據科學家和分析師可以輕鬆、自信地使用這些表來快速執行預處理、探索性分析和特征工程,以便他們可以將剩餘的時間用於機器學習和洞察收集。
對這些消費者來說,最大的生產力殺手不僅僅是數據訪問和預處理,而是對他們所使用的數據質量的信心。因此,我們將使用DLT來確保這些數據集符合特定的質量水平,並清楚地注釋數據集。數據消費者和決策者都可以使用通過正確使用約束和注釋得到的結果編目和質量監控。
- 打開管道筆記本並創建一個新單元格。
- 複製以下代碼到一個新單元格:
CREATE STREAMING LIVE TABLE sales_orders_cleaned(CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW) PARTITIONED BY (order_date) COMMENT“使用有效的order_number(s)清洗的銷售訂單,並使用order_datetime進行分區。”TBLPROPERTIES ("quality" = "silver") AS SELECT f.customer_id, f.customer_name, f.number_of_line_items, TIMESTAMP(from_unixtime((cast(f。order_datetime as long)))) as order_datetime, DATE(from_unixtime((cast(f。order_datetime as long)))) as order_date, f.order_number, f.ordered_products, c.state, c.c city, c.lon, c.lat, c.units_purchased, c.loyalty_segment FROM STREAM(LIVE.sales_orders_raw) f LEFT JOIN LIVE。beplay体育app下载地址c ON c.customer_id = f.customer_id AND c.customer_name = f.customer_name
- 通過導航到左側導航欄中的Jobs,選擇“Delta Live Tables”並選擇在上一步中創建的管道,返回到管道“Sales Order Pipeline”
- 選擇開始/停止切換按鈕旁邊的下拉菜單,然後選擇“完整的刷新”
約束:約束允許您定義數據質量期望。它們采用一個解析為任何Spark篩選器謂詞的語句,以及在失敗時要采取的操作。操作可以是保留、刪除、失敗或隔離。欲知詳情在這裏看到的.所有約束都被記錄下來,以支持簡化的質量監控。
Tblproperties:鍵值對列表,可以是Delta Lake屬性、DLT管道屬性或任意屬性。任意tblproperties類似於可用於數據編目的標記。在本例中," quality ": " silver "是一個作為標記的任意屬性。
評論:簡要描述表用途的字符串,將來用於數據編目
黃金數據集:完整vs流/連續vs觸發
許多聚合不能增量地執行,必須作為完整的再處理執行,即使新數據可以在聚合的上遊青銅層和銀層增量地處理。然而,訪問尚未聚合的實時或“快速”數據具有重要價值。與傳統Lambda架構需要複雜的兩層基礎設施來處理快數據和慢數據不同,Lakehouse架構支持單個管道,包括實時增量“快速”青銅層和銀層,以及批量更新的黃金層(由Delta Lake存儲的強一致性保證實現)。
在實踐中,這種模式在過程式ETL中可能具有挑戰性,因為它需要部署單獨的流作業和批處理作業,並分別維護每個作業。為了解決這個問題,DLT允許您選擇管道中的每個數據集是完整的還是增量的,對管道的其餘部分進行最小的更改。這使得它很容易擴展管道,涉及青銅和白銀實時數據與黃金聚集層的組合。
事實氣泡:部分Spark聚合可以增量執行,如count、min、max、sum等。在一些簡單的情況下,將gold數據集聲明為增量可能是有意義的。然而,即使是簡單的計數和和,這可能會變得效率低下,如果您正在使用多個分組(例如GROUP BY col1, col2, col3),則不建議這樣做。
在本例中,我們通過按城市聚合銀表中的數據來創建完整的金表:
- 打開管道筆記本並創建一個新單元格。
- 複製以下代碼到一個新單元格:
創建活動表sales_order_in_la COMMENT“銷售訂單在LA。”TBLPROPERTIES ("quality" = "gold") AS SELECT city, order_date, customer_id, customer_name, ordered_products_explosion。curr, SUM(ordered_products_explosion .price)作為sales, SUM(ordered_products_explosion .qty)作為quantity, COUNT(ordered_products_explosion .id)作為product_count FROM (SELECT city, DATE(order_datetime)作為order_date, customer_id, customer_name, explosion (ordered_products)作為ordered_products_explosion FROM LIVE。sales_orders_cleaned WHERE city = 'Los Angeles') GROUP BY order_date, city, customer_id, customer_name, ordered_products_explosion .curr;創建活動表sales_order_in_chicago COMMENT“芝加哥的銷售訂單”。TBLPROPERTIES ("quality" = "gold") AS SELECT city, order_date, customer_id, customer_name, ordered_products_explosion。curr, SUM(ordered_products_explosion .price)作為sales, SUM(ordered_products_explosion .qty)作為quantity, COUNT(ordered_products_explosion .id)作為product_count FROM (SELECT city, DATE(order_datetime)作為order_date, customer_id, customer_name, explosion (ordered_products)作為ordered_products_explosion FROM LIVE。sales_orders_cleaned WHERE city = 'Chicago') GROUP BY order_date, city, customer_id, customer_name, ordered_products_explosion .curr;
- 通過導航到左側導航欄中的Jobs,選擇“Delta Live Tables”並選擇在上一步中創建的管道,返回到管道“Sales Order Pipeline”
- 選擇啟動/停止開關旁邊的下拉菜單,然後選擇“完全刷新”
連續vs觸發管道模式
在DLT中,雖然單個數據集可能是增量的或完整的,但整個管道可能是觸發的或連續的。當連續管道啟動時,它將啟動基礎設施並繼續吸收新數據,直到手動或通過API停止管道。被觸發的管道將一次性消耗源中的所有新數據,並自動關閉基礎設施。被觸發的管道通常會在生產中使用協調器或Databricks多任務作業的調度上運行。
要在觸發模式和連續模式之間切換,打開管道並選擇“編輯設置”。Continuous將是JSON中的布爾值。設置" continuous ": false "相當於將管道設置為觸發模式。
這使您可以靈活地慢慢成熟到連續處理範式,而無需對代碼進行重大重構。對於那些開始意識到實時洞察的價值而不需要持續運行雲基礎設施的更高成本的組織來說,這是一種常見的模式。有經驗的Spark工程師可以使用下麵的矩陣來理解DLT的功能:
讀: | 寫: | 連續模式 | 觸發模式 |
---|---|---|---|
完整的 | 完整的 | 在預定義的時間間隔內重新處理 | 單次再處理(刪除和替換) |
完整的 | 增量 | 不可能的 | 不可能的 |
增量 | 完整的 | 在預定義的時間間隔內重新處理 | 重新處理物化流結果 |
增量 | 增量 | 使用默認觸發器的流 | Trigger.once()流 |
Productionization
現在我們已經定義了管道。我們可以通過以下步驟來總結:
管道可觀測性與數據質量監測
事件日誌
DLT將所有管道日誌發送到管道存儲位置中的預定義Delta Lake表,該表可用於監控、沿襲和數據質量報告。您可以導入這個通用日誌分析筆記本來檢查事件日誌,或者使用dbutils訪問Delta表{{您的存儲位置}}/system/events。
最有用的信息在日誌表的“details”列中。下麵是導致DLT發出日誌的不同類型的操作,以及您可以在“詳細信息”中找到該事件的一些相關字段:
- user_action:在執行諸如創建管道之類的操作時發生事件
- flow_definition:當管道部署或更新並具有沿襲、模式和執行計劃信息時發生事件
- output_dataset而且input_datasets-輸出表/視圖及其上遊表/視圖
- flow_type—這是一個完整流還是附加流
- explain_text- Spark解釋計劃
- flow_progress:當數據流開始運行或完成對一批數據的處理時發生事件
- 指標-目前包含num_output_rows
- Data_quality——包含這個特定數據集的數據質量規則的結果數組
- dropped_records
- 預期
- 名稱,數據集,passed_records, failed_records
數據質量監控(需要Databricks SQL)
因為DLT日誌是作為Delta表公開的,並且日誌包含數據期望指標,所以很容易生成報告,用您選擇的BI工具監視數據質量。我們建議使用磚的SQL因為它與Delta和Databricks平台緊密集成,並通過易於管理的計算端點提供極快的查詢速度。Beplay体育安卓版本
使用Databricks SQL創建數據質量報告,請遵循以下步驟:
- 注意管道的“存儲位置”,方法是導航到管道,選擇Edit Settings,並複製的值“storage_location”
- 使用下麵的例子和步驟1中的存儲位置在metastore中注冊日誌表:
CREATE TABLE {{my_pipeline_logs}} AS SELECT * FROM delta。'{{管道存儲位置}}/係統/事件'
- 在左上角的下拉菜單中,切換到“SQL”工作區(在開發DLT管道時,您應該在“數據科學與工程”工作區)
- 在左側導航欄中,選擇“查詢”
- 選擇“創建查詢”
- 複製以下SQL查詢,替換{{my_pipeline_logs}}用你在步驟2中創建的表的名字:
WITH all_expectations AS (SELECT爆炸(from_json(details:flow_progress:data_quality:expectations, schema_of_json("[{'name':'str', 'dataset':'str', 'passed_records':'int', 'failed_records':'int'}]"))) AS expectation FROM {{my_pipeline_logs}} WHERE details:flow_progress. log . FROM {{my_pipeline_logs}}SELECT expectation_name, X_Axis, SUM(Y_Axis) AS Y_Axis FROM (SELECT expect .name AS expectation_name, 'Passed' AS X_Axis, expectations .name)passsed_records AS Y_Axis FROM all_expectations UNION ALL SELECT expect .name AS expectation_name, 'Failed' AS X_Axis, expectation.namefailed_records AS FROM all_expectations) GROUP BY expectation_name, X_Axis
- 運行查詢,你應該看到一個類似於下麵的響應:
- 選擇“添加可視化”
- 選擇可視化類型為“圖表”,圖表類型為“餅”。設置X列和Y列,並設置grouping為expectation_name:
你現在可以嚐試在Redash中使用不同的圖表和/或可視化類型。通常,對於圖表,您可以使用X_axis和Y_axis,並根據expectation_name進行分組,以創建用於不同質量監控目的的儀表板
結論
現在您已經完成了第一個Delta Live Tables管道,並在此過程中學習了一些關鍵概念,我們迫不及待地想要看到您創建的管道!有關Delta Live Tables的更多信息,請參閱我們的DLT文檔,觀看演示,或下載筆記本!