管理數據質量與達美住表
你使用的期望來定義數據質量約束的數據集的內容。期望讓你保證數據到達表滿足數據質量要求,並提供洞察為每個管道更新數據質量。你期望適用於使用Python修飾符或SQL查詢約束條款。
δ住表期望是什麼?
期望是可選條款添加到三角洲生活表數據集聲明應用數據質量檢查每個記錄通過一個查詢。
一個期望包括三個方麵:
一個描述,它充當一個惟一的標識符,並允許您跟蹤指標的約束。
一個布爾值聲明,總是返回真或假基於一些條件。
一個動作記錄失敗時期望,即布爾返回false。
下麵的矩陣顯示了三個動作你可以適用於無效的記錄:
行動 |
結果 |
---|---|
警告(默認) |
無效記錄寫入到目標;失敗是報道作為數據集的度量。 |
無效的記錄數據寫入到目標前下降;失敗是報告的指標數據集。 |
|
無效記錄防止更新成功。加工之前需要手動幹預。 |
您可以查看數據質量標準的記錄數等違反一個期望通過查詢三角洲生活事件日誌表。看到管道監控三角洲生活表。
三角洲的完整參考生活表數據集聲明語法,看三角洲生活表Python語言參考或三角洲生活表SQL語言參考。
請注意
雖然您可以包括多個條款在任何期望,隻有基於多個Python支持定義操作的期望。看到多個預期。
保留無效記錄
使用預計
運營商當你想記錄違反的期望。記錄違反期望被添加到目標數據集以及有效的記錄:
@dlt。預計(“有效時間戳”,“上校(“時間戳”)> 2012-01-01”)
約束valid_timestamp預計(時間戳>“2012-01-01”)
減少無效的記錄
使用預計或下降
操作符,以防止進一步的處理無效記錄。記錄違反預期下降的目標數據集:
@dlt。expect_or_drop(“valid_current_page”,“current_page_id不是零和current_page_title不是零”)
約束valid_current_page預計(current_page_id是不零和current_page_title是不零)在違反下降行
無效記錄失敗
當無效的記錄是不可接受的,使用預計或失敗
操作員記錄驗證失敗時立即停止執行。如果操作是一個表的更新,係統自動回滾事務:
@dlt。expect_or_fail(“valid_count”,“數> 0 ")
約束valid_count預計(數>0)在違反失敗更新
當管道失敗因為違反一個期望,你必須解決管道代碼來處理無效數據正確地重新運行前管道。
失敗的期望修改的火花查詢計劃所需轉換跟蹤信息檢測和報告違規行為。對於許多查詢,您可以使用這些信息來確定哪些輸入記錄導致侵犯。下麵是一個例子例外:
預期違反:{“flowName”:“a - b”," verboseInfo ": {“expectationsViolated”:(“x1是負的)," inputData ": {“一”:{“x1”: 1、“日元”:"}," b ": {“x2”: 1、“日元”:“aa”}}," outputRecord ": {“x1”: 1、“日元”:“一”,“x2”: 1、“日元”:“aa”},“missingInputData”:假的}}
多個預期
您可以定義期望與一個或多個數據質量約束在Python中管道。這些修飾符接受Python字典作為參數,其中的關鍵是期望名稱和值是期望約束。
使用expect_all
指定多個數據質量約束當記錄失敗驗證應包含在目標數據集:
@dlt。expect_all({“valid_count”:“數> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”})
使用expect_all_or_drop
指定多個數據質量約束當記錄失敗驗證應該從目標數據集:
@dlt。expect_all_or_drop({“valid_count”:“數> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”})
使用expect_all_or_fail
指定多個數據質量約束當記錄失敗驗證應該停止管道執行:
@dlt。expect_all_or_fail({“valid_count”:“數> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”})
您還可以定義期望的集合作為一個變量並將其傳遞到一個或多個查詢你的管道:
valid_pages={“valid_count”:“數> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”}@dlt。表@dlt。expect_all(valid_pages)defraw_data():#創建原始數據集@dlt。表@dlt。expect_all_or_drop(valid_pages)defprepared_data():#創建清潔和準備數據集
檢疫無效數據
下麵的示例使用期望結合臨時表和視圖。這種模式為您提供指標通過期望調查的記錄,並在管道更新,並提供了一種方法來處理有效和無效記錄通過不同的下遊路徑。
請注意
這個例子中讀取包含在示例數據磚的數據集。因為磚與管道發布數據集不支持統一目錄,這個例子隻能與一個蜂巢metastore管道配置發布。然而,這種模式也適用於統一目錄啟用管道,但你必須讀取數據外部位置。了解更多關於使用統一目錄與達美住表,看看使用統一的目錄與三角洲住表管道。
進口dlt從pyspark.sql.functions進口expr規則={}規則(“valid_website”]=“(網站不是NULL)”規則(“valid_location”]=”(位置不是零)”quarantine_rules=“不是({0})”。格式(”和“。加入(規則。值()))@dlt。表(的名字=“raw_farmers_market”)defget_farmers_market_data():返回(火花。讀。格式(“csv”)。選項(“頭”,“真正的”)。負載(' / databricks-datasets /網站/ farmers_markets_geographic_data /數據- 001 / '))@dlt。表(的名字=“farmers_market_quarantine”,臨時=真正的,partition_cols=(“is_quarantined”])@dlt。expect_all(規則)deffarmers_market_quarantine():返回(dlt。讀(“raw_farmers_market”)。選擇(“MarketName”,“網站”,“位置”,“狀態”,“Facebook”,“推特”,“Youtube”,“有機”,“updateTime”)。withColumn(“is_quarantined”,expr(quarantine_rules)))@dlt。視圖(的名字=“valid_farmers_market”)defget_valid_farmers_market():返回(dlt。讀(“farmers_market_quarantine”)。過濾器(“is_quarantined = false”))@dlt。視圖(的名字=“invalid_farmers_market”)defget_invalid_farmers_market():返回(dlt。讀(“farmers_market_quarantine”)。過濾器(“is_quarantined = true”))
驗證跨表行數
您可以添加一個額外的表定義一個期望你的管道來比較兩個生活表之間的行數。期望的結果出現在UI事件日誌和三角洲住表。下麵這個例子驗證之間的平等的行數tbla
和tblb
表:
創建或刷新生活表count_verification(約束no_rows_dropped預計(a_count= =b_count))作為選擇*從(選擇數(*)作為a_count從生活。tbla),(選擇數(*)作為b_count從生活。tblb)
執行先進的驗證與達美住表期望
您可以使用聚合和定義生活表連接查詢和使用這些查詢的結果作為期望檢查的一部分。這是有用的,如果你想執行複雜的數據質量檢查,例如,確保一個派生表包含源表的所有記錄或保證平等的數字列在表中。您可以使用臨時
關鍵字來防止這些表發布到目標模式。
下麵的例子驗證所有預期的記錄中報告
表:
創建臨時生活表report_compare_tests(約束no_missing_records預計(r。關鍵是不零))作為選擇*從生活。validation_copyv左外加入生活。報告r在v。關鍵=r。關鍵
下麵的示例使用一個聚合,以確保一個主鍵的唯一性:
創建臨時生活表report_pk_tests(約束unique_pk預計(num_entries=1))作為選擇pk,數(*)作為num_entries從生活。報告集團通過pk
使預期便攜和可重用
您可以維護數據質量規則分別從管道實現。
磚建議將規則存儲在三角洲表每個規則分類的標簽。你使用這個標簽數據集定義來確定哪些規則適用。
下麵的示例創建一個表命名規則
維護規則:
創建或取代表規則作為選擇col1作為的名字,col2作為約束,col3作為標簽從(值(“website_not_null”,”網站不是零”,“有效性”),(“location_not_null”,“位置是不空”,“有效性”),(“state_not_null”,“不空”,“有效性”),(“fresh_data”,“to_date (updateTime, M / d / yyyy h:男:“)> 2010-01-01”,“維護”),(“social_media_access”,“不是(Facebook是NULL和Twitter是零和Youtube是NULL)”,“維護”))
以下Python示例定義數據質量的期望基於存儲在規則規則
表。的get_rules ()
從函數讀取規則規則
表並返回一個Python字典包含匹配的規則標簽
參數傳遞給函數。字典上的應用@dlt.expect_all_ * ()
decorator執行數據質量約束。例如,任何記錄未標記的規則有效性
將從嗎raw_farmers_market
表:
請注意
這個例子中讀取包含在示例數據磚的數據集。因為磚與管道發布數據集不支持統一目錄,這個例子隻能與一個蜂巢metastore管道配置發布。然而,這種模式也適用於統一目錄啟用管道,但你必須讀取數據外部位置。了解更多關於使用統一目錄與達美住表,看看使用統一的目錄與三角洲住表管道。
進口dlt從pyspark.sql.functions進口expr,上校defget_rules(標簽):”“”加載數據質量規則從一個表:param標簽:標簽返回:字典的規則匹配的標記”“”規則={}df=火花。讀。表(“規則”)為行在df。過濾器(上校(“標簽”)= =標簽)。收集():規則(行(“名字”]]=行(“約束”]返回規則@dlt。表(的名字=“raw_farmers_market”)@dlt。expect_all_or_drop(get_rules(“有效性”))defget_farmers_market_data():返回(火花。讀。格式(“csv”)。選項(“頭”,“真正的”)。負載(' / databricks-datasets /網站/ farmers_markets_geographic_data /數據- 001 / '))@dlt。表(的名字=“organic_farmers_market”)@dlt。expect_all_or_drop(get_rules(“維護”))defget_organic_farmers_market():返回(dlt。讀(“raw_farmers_market”)。過濾器(expr(“有機= Y”))。選擇(“MarketName”,“網站”,“狀態”,“Facebook”,“推特”,“Youtube”,“有機”,“updateTime”))