轉換數據與達美住表
本文描述了如何使用達美住表聲明轉換數據並通過查詢邏輯指定如何處理記錄。它還包含一些常見轉換模式的例子可以是有用的,以構建出三角洲住表管道。
您可以定義一個數據集對任何查詢,返回一個DataFrame。可以使用Apache火花內置操作、udf定製邏輯,MLflowδ住表中管道模型轉換。一旦數據被吸收到三角洲住表管道,您可以定義新的數據集對上遊資源創建新的流表、物化視圖和視圖。
何時使用視圖、物化視圖和流表
確保你的管道是有效的和可維護的,選擇最好的數據集類型實現管道時查詢。
考慮使用一個視圖:
你有一個大型或複雜的查詢您想要闖入更易管理查詢。
你想驗證使用中間結果的期望。
你想減少存儲和計算成本,不需要查詢結果的實體化。因為表是物化,他們需要額外的計算和存儲資源。
考慮使用物化視圖時:
多個下遊消費表的查詢。因為視圖計算需求,視圖重新計算每次查詢視圖。
其他管道,工作,或消耗表的查詢。因為不物化視圖,您隻能使用它們在同一管道。
你想在開發過程中查看查詢的結果。因為表是物化的,可以查看和查詢外的管道,在開發過程中使用表格可以幫助驗證計算的正確性。驗證後,不需要具體化的查詢轉換成視圖。
考慮使用一個流表時:
查詢定義數據源不斷或者逐步增長。
查詢結果應逐步計算的。
需要高吞吐量和低延遲的管道。
請注意
流表總是定義針對流媒體資源。您還可以使用流媒體資源應用變化成
從疾病預防控製中心應用更新提要。看到改變數據獲取與三角洲生活表。
結合流表和物化視圖在一個單一的管道
流表繼承的處理保證配置Apache火花結構化流和處理查詢擴展的數據來源,在新行總是插入源表而不是修改。
請注意
雖然默認情況下,流表需要擴展的數據源,當流源是另一個流表需要更新或者刪除,您可以覆蓋這個行為的skipChangeCommits國旗。
常見的流模式包括攝入源數據來創建初始數據集在一個管道。這些初始數據集通常被稱為青銅表和經常執行簡單的轉換。
相比之下,最後一個表在一個管道,通常被稱為黃金表,通常需要複雜的聚合或閱讀來源的目標應用變化成
操作。因為這些操作本質上創建更新而不是附加,他們不支持輸入流表。這些轉換更適合物化視圖。
通過混合流表和物化視圖到一個管道,可以簡化你的管道,避免昂貴的re-ingestion或加工的原始數據,並有完整的SQL來計算複雜的力量聚合在一個有效地編碼和過濾數據集。下麵的例子說明了這種類型的混合處理:
請注意
這些例子從雲存儲使用自動加載程序加載文件。加載文件自動加載程序在統一目錄啟用管道,必須使用外部位置。了解更多關於使用統一目錄與達美住表,看看使用統一的目錄與三角洲住表管道。
@dlt。表defstreaming_bronze():返回(#由於這是一個流源,這個表是增量。火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“json”)。負載(“s3: / /道路/ /生/數據”))@dlt。表defstreaming_silver():#因為我們讀青銅表流,這銀表也#增量更新。返回dlt。read_stream(“streaming_bronze”)。在哪裏(…)@dlt。表deflive_gold():#這個表將被重新計算完全閱讀整個銀表#時更新。返回dlt。讀(“streaming_silver”)。groupBy(“user_id”)。數()
創建或刷新流媒體表streaming_bronze作為選擇*從cloud_files(“s3: / /道路/ /生/數據”,“json”)創建或刷新流媒體表streaming_silver作為選擇*從流(生活。streaming_bronze)在哪裏…創建或刷新生活表live_gold作為選擇數(*)從生活。streaming_silver集團通過user_id
了解更多關於使用自動加載程序有效地讀取JSON文件從S3增量處理。
Stream-static連接
Stream-static連接是一個好的選擇當denormalizing擴展數據的連續流主要是靜態維度表。
每個管道更新,新記錄從流與最新的靜態表的快照。如果記錄被添加或更新後的靜態表對應的數據流表處理,合成記錄並非重新計算,除非完全執行刷新。
在管道配置為觸發執行靜態表返回結果的時間更新開始。在管道配置連續執行,每次表流程更新,最新版本的靜態表查詢。
下麵是一個示例stream-static加入:
@dlt。表defcustomer_sales():返回dlt。read_stream(“銷售”)。加入(讀(“beplay体育app下载地址顧客”),(“customer_id”),“左”)
創建或刷新流媒體表customer_sales作為選擇*從流(生活。銷售)內心的加入左生活。beplay体育app下载地址使用(customer_id)
計算聚合效率
您可以使用流表增量計算簡單的分配總量統計,min,馬克斯,或總和,代數總量平均和標準偏差。磚建議增量的聚合查詢與數量有限的組織,例如,一個查詢的集團通過國家
條款。隻有新的輸入數據讀取與每個更新。
在三角洲住表管道使用MLflow模型
您可以使用MLflow-trained三角洲住表中管道模型。在磚MLflow模型被視為轉換,這意味著他們行動火花DataFrame作為火花DataFrame輸入,並返回結果。因為δ生活對DataFrames表定義數據集,您可以將Apache火花工作負載,利用MLflow三角洲住表隻有幾行代碼。更多關於MLflow,看到MLflow指南。
如果你已經有一個Python筆記本調用一個MLflow模型,可以適應這段代碼三角洲生活表使用@dlt.table
裝飾和確保函數定義返回轉換結果。默認不安裝MLflow三角洲生活表,所以確保你%皮普安裝mlflow
和導入mlflow
和dlt
你的筆記本的頂部。介紹三角洲住表的語法,看與Python教程:聲明一個數據管道三角洲生活表。
在三角洲住表使用MLflow模型,完成以下步驟:
獲得的運行ID和模型名稱MLflow模型。運行ID和模型名稱是用於構造MLflow的URI模式。
使用URI來定義一個火花UDF加載MLflow模型。
在你的表定義調用UDF使用MLflow模型。
下麵的例子顯示了這一模式的基本語法:
%皮普安裝mlflow進口dlt進口mlflowrun_id=“< mlflow-run-id >”model_name=“< the-model-name-in-run >”model_uri=f”:/{run_id}/{model_name}”loaded_model_udf=mlflow。pyfunc。spark_udf(火花,model_uri=model_uri)@dlt。表defmodel_predictions():返回dlt。讀(<輸入- - - - - -數據>)。withColumn(“預測”,loaded_model_udf(<模型- - - - - -特性>))
作為一個完整的例子,下麵的代碼定義了一個火花UDF命名loaded_model_udf
裝載一個MLflow模型對貸款風險數據訓練。預測的數據列用於製造作為一個參數傳遞給UDF。表loan_risk_predictions
計算預測為每一行loan_risk_input_data
。
%皮普安裝mlflow進口dlt進口mlflow從pyspark.sql.functions進口結構體run_id=“mlflow_run_id”model_name=“the_model_name_in_run”model_uri=f”:/{run_id}/{model_name}”loaded_model_udf=mlflow。pyfunc。spark_udf(火花,model_uri=model_uri)分類=(“術語”,“home_ownership”,“目的”,“addr_state”,“verification_status”,“application_type”]數字組成=(“loan_amnt”,“emp_length”,“annual_inc”,“唯一”,“delinq_2yrs”,“revol_util”,“total_acc”,“credit_length_in_years”]特性=分類+數字組成@dlt。表(評論=“貸款風險GBT毫升預測”,table_properties={“質量”:“黃金”})defloan_risk_predictions():返回dlt。讀(“loan_risk_input_data”)。withColumn(“預測”,loaded_model_udf(結構體(特性)))
保留手動刪除或更新
三角洲生活表允許您手動刪除或更新一個表的記錄和下遊驗算表進行刷新操作。
默認情況下,三角洲生活表驗算表結果基於輸入數據每次更新一條管道,所以你必須確保刪除記錄不是從源數據加載。設置pipelines.reset.allowed
表屬性假
防止刷新表但並不妨礙增量寫入表或防止新流入的數據表。
下圖展示了一個示例使用兩個流表:
raw_user_table
接受原始用戶數據從源。bmi_table
使用體重和身高的增量計算體重指數得分raw_user_table
。
你想要手動刪除或更新用戶的記錄raw_user_table
和驗算bmi_table
。
下麵的代碼演示了設置pipelines.reset.allowed
表屬性假
禁用全部刷新raw_user_table
這有意保留隨時間的變化,但下遊表重新計算管道更新時,運行:
創建或刷新流媒體表raw_user_tableTBLPROPERTIES(管道。重置。允許=假)作為選擇*從cloud_files(“/ databricks-datasets / iot-stream /關鍵因素”,“csv”);創建或刷新流媒體表bmi_table作為選擇用戶標識,(重量/2。2)/戰俘(高度*0。0254年,2)作為身體質量指數從流(生活。raw_user_table);