開始
用戶指南
管理指南
參考指南
資源
2022年12月22日更新
給我們反饋
本文包含一組建議和解決方案,用於在Delta Live Tables管道中實現常見任務。
您希望將一組公共數據質量規則應用於多個表,或者開發和維護數據質量規則的團隊成員與管道開發人員分開。
將數據質量規則與管道實現分開維護。以可靠且易於訪問和更新的格式存儲規則,例如,存儲在DBFS或雲存儲或Delta表中的文本文件。下麵以CSV文件命名為rules.csv存儲在DBFS中以維護規則。每條規則rules.csv根據標記進行分類。在數據集定義中使用此標記來確定應用哪些規則:
rules.csv
name, constraint, tag website_not_null,"Website IS NOT NULL",validity location_not_null,"Location IS NOT NULL",validity state_not_null,"State IS NOT NULL",validity fresh_data,"to_date(updateTime,'M/d/yyyy h: M:s a') > '2010-01-01'",維護social_media_access,"NOT(Facebook IS NULL, Twitter IS NULL, Youtube IS NULL)",維護
對象中存儲的規則定義了數據質量期望rules.csv文件。的get_rules ()函數從中讀取規則rules.csv返回一個Python字典,其中包含匹配標簽參數傳遞給函數。這本詞典應用於@dlt.expect_all_ * ()裝飾器來加強數據質量約束。例如,任何未通過規則標記的記錄有效性會從raw_farmers_market表:
get_rules ()
標簽
@dlt.expect_all_ * ()
有效性
raw_farmers_market
進口dlt從pyspark.sql.functions進口expr,上校defget_rules(標簽):”“”從CSV文件中加載數據質量規則:param tag:要匹配的標簽:return:匹配標簽的規則字典”“”規則={}df=火花.讀.格式(“csv”).選項(“頭”,“真正的”).負載(“/道路/ / rules.csv”)為行在df.過濾器(上校(“標簽”)= =標簽).收集():規則[行[“名字”]]=行[“約束”]返回規則@dlt.表格(的名字=“raw_farmers_market”)@dlt.expect_all_or_drop(get_rules(“有效性”))defget_farmers_market_data():返回(火花.讀.格式(“csv”).選項(“頭”,“真正的”).負載(' / databricks-datasets /網站/ farmers_markets_geographic_data /數據- 001 / '))@dlt.表格(的名字=“organic_farmers_market”)@dlt.expect_all_or_drop(get_rules(“維護”))defget_organic_farmers_market():返回(dlt.讀(“raw_farmers_market”).過濾器(expr(“有機=‘Y’”)).選擇(“MarketName”,“網站”,“狀態”,“Facebook”,“推特”,“Youtube”,“有機”,“updateTime”))
您希望使用SQL的簡單性來定義Delta Live Tables數據集,但需要SQL不直接支持的轉換。
使用一個Python用戶定義函數(UDF)在您的SQL查詢。屬性的定義和注冊廣場()UDF返回輸入參數的平方,並調用廣場()SQL表達式中的UDF。
廣場()
定義和注冊UDF。
創建一個筆記本與默認的語言設置為Python並在單元格中添加以下內容:
def廣場(我:int)->int:”“”簡單的udf,用於對傳遞的參數進行平方。:param i:來自Pyspark或SQL的列:return:傳遞參數的平方值。”“”返回我*我火花.udf.注冊(“makeItSquared”,廣場)#為Spark SQL注冊方形udf
調用UDF。
創建一個SQL筆記本,並在單元格中添加以下查詢:
創建或刷新生活表格raw_squared作為選擇makeItSquared(2)作為numSquared;
創建管道
創建一個新的Delta Live Tables管道,將您創建的筆記本添加到其中筆記本庫.使用添加筆記本庫按鈕中添加其他筆記本創建管道對話或庫字段在Delta活動表設置配置筆記本。
庫
你想用anMLflow管道中的訓練模型。
要在Delta Live Tables管道中使用MLflow模型:
獲取MLflow模型的運行ID和模型名稱。運行ID和模型名稱用於構造MLflow模型的URI。
使用URI定義一個Spark UDF來加載MLflow模型。
在表定義中調用UDF以使用MLflow模型。
下麵的示例定義了一個名為loaded_model它加載了一個經過貸款風險數據訓練的MLflow模型。的loaded_modelUDF然後用於定義gtb_scoring_train_data而且gtb_scoring_valid_data表:
loaded_model
gtb_scoring_train_data
gtb_scoring_valid_data
%皮普安裝mlflow進口dlt進口mlflow從pyspark.sql.functions進口結構體run_id=“mlflow_run_id”model_name=“the_model_name_in_run”model_uri=”:/{run_id}/{model_name}".格式(run_id=run_id,model_name=model_name)loaded_model=mlflow.pyfunc.spark_udf(火花,model_uri=model_uri)分類=[“術語”,“home_ownership”,“目的”,“addr_state”,“verification_status”,“application_type”]數字組成=[“loan_amnt”,“emp_length”,“annual_inc”,“唯一”,“delinq_2yrs”,“revol_util”,“total_acc”,“credit_length_in_years”]特性=分類+數字組成@dlt.表格(評論=基於貸款風險的GBT ML評分訓練數據集,table_properties={“質量”:“黃金”})defgtb_scoring_train_data():返回dlt.讀(“train_data”).withColumn(“預測”,loaded_model(結構體(特性)))@dlt.表格(評論=“基於貸款風險的GBT ML評分有效數據集”,table_properties={“質量”:“黃金”})defgtb_scoring_valid_data():返回dlt.讀(“valid_data”).withColumn(“預測”,loaded_model(結構體(特性)))
您希望為開發或測試創建示例數據集,例如,包含數據子集或特定記錄類型的數據集。
在單個或共享的筆記本中實現轉換邏輯。然後創建單獨的筆記本,以根據環境定義多個數據集。例如,在生產中,創建一個筆記本,定義管道的完整數據集:
創建或刷新流媒體生活表格input_data作為選擇*從cloud_files(“/生產/數據”,“json”)
然後創建筆記本,根據需求定義數據示例。例如,要生成一個包含特定記錄的小數據集用於測試:
創建或刷新生活表格input_data作為選擇“2021/09/04”作為日期,22.4作為sensor_reading聯盟所有選擇“2021/09/05”作為日期,21.5作為sensor_reading
您還可以過濾數據以創建用於開發或測試的生產數據的子集:
創建或刷新生活表格input_data作為選擇*從刺激.input_data在哪裏日期>當前日期()-時間間隔1一天
要使用這些不同的數據集,請使用實現轉換邏輯的筆記本創建多個管道。每個管道都可以從LIVE.input_data數據集,但配置為包括創建特定於環境的數據集的筆記本。
LIVE.input_data
您擁有包含多個流或數據集定義的管道,這些流或數據集定義僅在少量參數上有所不同。這種冗餘導致管道容易出錯且難以維護。例如,下圖顯示了一個管道的圖形,該管道使用消防部門數據集查找對不同類別的緊急呼叫響應時間最快的社區。在本例中,並行流僅在幾個參數上有所不同。
您可以使用元編程模式來減少生成和維護冗餘流定義的開銷。Delta Live Tables中的元編程是使用Python內部函數完成的。因為這些函數是延遲求值的,所以您可以使用它們來創建除了輸入參數之外完全相同的流。每次調用都可以包含一組不同的參數,這些參數控製每個表應該如何生成,如下例所示:
進口dlt從pyspark.sql.functions進口*@dlt.表格(的名字=“raw_fire_department”,評論=“消防部門反應原始表”)@dlt.expect_or_drop(“valid_received”,“收到的不是空的”)@dlt.expect_or_drop(“valid_response”,“響應的不是NULL”)@dlt.expect_or_drop(“valid_neighborhood”,"鄰居!= 'None'")defget_raw_fire_department():返回(火花.讀.格式(“csv”).選項(“頭”,“真正的”).選項(“多行”,“真正的”).負載(' / databricks-datasets / timeseries /火災/ Fire_Department_Calls_for_Service.csv ').withColumnRenamed(“調用類型”,“call_type”).withColumnRenamed(“收到DtTm”,“收到”).withColumnRenamed(“響應DtTm”,“回應”).withColumnRenamed(“社區-分析邊界”,“社區”).選擇(“call_type”,“收到”,“回應”,“社區”))all_tables=[]defgenerate_tables(call_table,response_table,過濾器):@dlt.表格(的名字=call_table,評論=“按調用類型劃分的頂級表”)defcreate_call_table():返回(火花.sql(”“”選擇unix_timestamp(received,'M/d/yyyy h: M:s a') as ts_received,unix_timestamp(responses,'M/d/yyyy h: M:s a') as ts_responses,社區從LIVE.raw_fire_departmentWHERE call_type = '{過濾器}'”“”.格式(過濾器=過濾器)))@dlt.表格(的名字=response_table,評論=“響應速度最快的十大社區”)defcreate_response_table():返回(火花.sql(”“”選擇社區,AVG((ts_received - ts_responses))作為response_time從生活。{call_table}分組按1ORDER BY response_time限製10”“”.格式(call_table=call_table)))all_tables.附加(response_table)generate_tables(“alarms_table”,“alarms_response”,“警報”)generate_tables(“fire_table”,“fire_response”,“結構火”)generate_tables(“medical_table”,“medical_response”,“醫療事故”)@dlt.表格(的名字=“best_neighborhoods”,評論=“哪個鄰居在最佳響應時間列表中出現次數最多”)def總結():target_tables=[dlt.讀(t)為t在all_tables]聯合=functools.減少(λx,y:x.聯盟(y),target_tables)返回(聯合.groupBy(上校(“社區”)).gg(數(“*”).別名(“分數”)).orderBy(desc(“分數”)))
您已經定義了期望以過濾掉違反數據質量約束的記錄,但是還希望保存無效記錄以供分析。
創建與您所定義的期望相反的規則,並使用這些規則將無效記錄保存到單獨的表中。您可以通過編程方式創建這些反向規則。屬性valid_farmers_market表,其中包含通過valid_website而且valid_location數據質量的限製,也創建invalid_farmers_market該表包含了未能滿足這些數據質量約束的記錄:
valid_farmers_market
valid_website
valid_location
invalid_farmers_market
進口dlt規則={}quarantine_rules={}規則[“valid_website”]="(網站不是空的)"規則[“valid_location”]="(位置不是空的)"#串聯反向規則quarantine_rules[“invalid_record”]=“不是({0})”.格式(“而且”.加入(規則.值()))@dlt.表格(的名字=“raw_farmers_market”)defget_farmers_market_data():返回(火花.讀.格式(“csv”).選項(“頭”,“真正的”).負載(' / databricks-datasets /網站/ farmers_markets_geographic_data /數據- 001 / '))@dlt.表格(的名字=“valid_farmers_market”)@dlt.expect_all_or_drop(規則)defget_valid_farmers_market():返回(dlt.讀(“raw_farmers_market”).選擇(“MarketName”,“網站”,“位置”,“狀態”,“Facebook”,“推特”,“Youtube”,“有機”,“updateTime”))@dlt.表格(的名字=“invalid_farmers_market”)@dlt.expect_all_or_drop(quarantine_rules)defget_invalid_farmers_market():返回(dlt.讀(“raw_farmers_market”).選擇(“MarketName”,“網站”,“位置”,“狀態”,“Facebook”,“推特”,“Youtube”,“有機”,“updateTime”))
上述方法的一個缺點是,它通過兩次處理數據來生成隔離表。如果不想要這種性能開銷,可以直接在查詢中使用約束來生成指示記錄驗證狀態的列。然後可以根據該列對表進行分區,以便進一步優化。
這種方法不使用期望,因此數據質量度量不會出現在事件日誌或管道UI中。
進口dlt從pyspark.sql.functions進口expr規則={}quarantine_rules={}規則[“valid_website”]="(網站不是空的)"規則[“valid_location”]="(位置不是空的)"quarantine_rules=“不是({0})”.格式(“而且”.加入(規則.值()))@dlt.表格(的名字=“raw_farmers_market”)defget_farmers_market_data():返回(火花.讀.格式(“csv”).選項(“頭”,“真正的”).負載(' / databricks-datasets /網站/ farmers_markets_geographic_data /數據- 001 / '))@dlt.表格(的名字=“partitioned_farmers_market”,partition_cols=[“隔離”])defget_partitioned_farmers_market():返回(dlt.讀(“raw_farmers_market”).withColumn(“隔離”,expr(quarantine_rules)).選擇(“MarketName”,“網站”,“位置”,“狀態”,“Facebook”,“推特”,“Youtube”,“有機”,“updateTime”,“隔離”))
您需要比較兩個活動表之間的行數,可能是為了驗證在沒有刪除行的情況下成功處理了數據。
向管道中添加一個額外的表,該表定義執行比較的期望。此期望的結果顯示在事件日誌和Delta Live Tables UI中。實例之間驗證相等的行數tbla而且tblb表:
tbla
tblb
創建或刷新生活表格count_verification(約束no_rows_dropped預計(a_count= =b_count))作為選擇*從(選擇數(*)作為a_count從生活.tbla),(選擇數(*)作為b_count從生活.tblb)
您的應用程序需要任意刪除或更新表中的記錄,並重新計算所有下遊表。下圖展示了兩個流媒體直播表:
raw_user_table從源中攝取一組原始用戶數據。
raw_user_table
bmi_table使用體重和身高增量計算BMI分數raw_user_table.
bmi_table
中刪除或更新用戶記錄以符合隱私要求raw_user_table重新計算bmi_table.
您可以手動刪除或更新的記錄raw_user_table並執行刷新操作以重新計算下遊表。但是,您需要確保刪除的記錄不會從源數據中重新加載。
使用pipelines.reset.allowed表屬性,以禁用完全刷新raw_user_table因此,預期的更改會隨著時間的推移而保留:
pipelines.reset.allowed
創建或刷新流媒體生活表格raw_user_tableTBLPROPERTIES(管道.重置.允許=假)作為選擇*從cloud_files(“/ databricks-datasets / iot-stream /關鍵因素”,“csv”);創建或刷新流媒體生活表格bmi_table作為選擇用戶標識,(重量/2.2)/戰俘(高度*0.0254,2)作為身體質量指數從流(生活.raw_user_table);
設置pipelines.reset.allowed來假防止刷新到raw_user_table,但不會阻止對表的增量寫操作,也不會阻止新數據流入表。
假
您已經配置了目標設置為發布您的表,但有一些表您不想發布。
目標
定義一個臨時命令Delta Live Tables不為表持久化元數據:
臨時
創建臨時生活表格beplay体育app下载地址customers_raw作為選擇*從json.`/數據/beplay体育app下载地址/json/ '
@dlt.表格(評論=“原始客戶數據”,臨時=真正的)defbeplay体育app下载地址customers_raw():返回(“…”)
您需要從管道向數據源進行身份驗證,例如,雲數據存儲或數據庫,並且不希望在您的筆記本或配置中包含憑據。
使用磚秘密存儲訪問密鑰或密碼等憑證。屬性中的Spark屬性可配置管道中的秘密管道的設置集群配置。
下麵的示例使用一個秘密存儲從Azure數據湖存儲Gen2 (ADLS Gen2)存儲帳戶讀取輸入數據所需的訪問密鑰自動加載程序.您可以使用相同的方法配置管道所需的任何秘密,例如訪問S3的AWS密鑰,或Apache Hive metastore的密碼。
要了解有關使用Azure數據湖存儲Gen2的詳細信息,請參見訪問Azure數據湖存儲Gen2和Blob存儲.
請注意
您必須添加spark.hadoop。的前綴spark_conf設置秘密值的配置鍵。
spark.hadoop。
spark_conf
{“id”:“43246596 - a63f - 11 - ec - b909 - 0242 ac120002”,“集群”:[{“標簽”:“默認”,“spark_conf”:{“spark.hadoop.fs.azure.account.key。< storage-account-name > .dfs.core.windows.net”:“{{秘密/ < scope-name > / <秘密名字>}}”},“自動定量”:{“min_workers”:1,“max_workers”:5,“模式”:“增強”}},{“標簽”:“維護”,“spark_conf”:{“spark.hadoop.fs.azure.account.key。< storage-account-name > .dfs.core.windows.net”:“{{秘密/ < scope-name > / <秘密名字>}}”}}),“發展”:真正的,“連續”:假,“庫”:[{“筆記本”:{“路徑”:"/Users/user@www.eheci.com/DLT notebook /Delta Live Tables quickstart"}}),“名稱”:“使用ADLS2的DLT快速入門”}
取代
< storage-account-name >ADLS Gen2存儲帳戶名。
< storage-account-name >
< scope-name >使用Databricks秘密作用域名。
< scope-name >
<秘密名字>包含Azure存儲帳戶訪問密鑰的密鑰名稱。
<秘密名字>
進口dltjson_path=“abfss: / / <容器名稱> @ < storage-account-name > .dfs.core.windows.net/ < path-to-input-dataset >”@dlt.create_table(評論="從ADLS2存儲帳戶攝取的數據")defread_from_ADLS2():返回(火花.readStream.格式(“cloudFiles”).選項(“cloudFiles.format”,“json”).負載(json_path))
<容器名稱>使用存儲輸入數據的Azure存儲帳戶容器的名稱。
<容器名稱>
< path-to-input-dataset >輸入數據集的路徑。
< path-to-input-dataset >
您希望限製運行Delta Live Tables管道的集群的配置選項;例如,您希望通過限製集群大小來控製成本,或者通過提供預定義的集群模板來簡化集群配置。
集群政策允許您定義限製用戶訪問集群配置的模板。您可以定義一個或多個集群策略,以便在配置管道時使用。
方法定義集群策略,可為Delta Live Tables管道創建集群策略cluster_type字段設置為dlt.下麵的例子為Delta Live Tables集群創建了一個最小策略:
cluster_type
dlt
{“cluster_type”:{“類型”:“固定”,“價值”:“dlt”},“num_workers”:{“類型”:“無限”,“defaultValue”:3.,“isOptional”:真正的},“node_type_id”:{“類型”:“無限”,“isOptional”:真正的},“spark_version”:{“類型”:“無限”,“隱藏”:真正的}}
有關創建集群策略(包括示例策略)的詳細信息,請參見創建集群策略.
要在管道配置中使用集群策略,需要策略ID。查詢策略ID。
點擊計算在側欄中。
單擊集群政策選項卡。
單擊策略名稱。
中的策略IDID字段。
當您單擊策略名稱時,在URL的末尾也可以使用策略ID。
使用流水線集群策略:
當使用集群策略配置Delta Live Tables集群時,Databricks建議將一個策略應用於兩個默認的而且維護集群。
默認的
維護
點擊工作流,並單擊Delta活動表選項卡。的管道列表顯示。
單擊管道名稱。的管道的細節頁麵出現。
單擊設置按鈕。的編輯管道設置對話框出現了。
單擊JSON按鈕。
在集群設置,設置policy_id字段轉換為策略ID的值。配置默認的而且維護使用指定ID的集群策略創建集群C65B864F02000008:
集群
policy_id
C65B864F02000008
{“集群”:[{“標簽”:“默認”,“policy_id”:“C65B864F02000008”,“自動定量”:{“min_workers”:1,“max_workers”:5,“模式”:“增強”}},{“標簽”:“維護”,“policy_id”:“C65B864F02000008”}]}
點擊保存.
您需要執行複雜的數據質量檢查,例如,確保派生表包含來自源表的所有記錄,或者確保跨表的數字列相等。
可以使用聚合和連接查詢定義活動表,並將這些查詢的結果用作預期檢查的一部分。中的所有預期記錄都顯示在報告表:
報告
創建生活表格report_compare_tests(約束no_missing_records預計(r.關鍵是不零))作為選擇*從生活.validation_copyv左外加入生活.報告r在v.關鍵=r.關鍵
下麵的例子使用了一個聚合來確保主鍵的唯一性:
創建生活表格report_pk_tests(約束unique_pk預計(num_entries=1))作為選擇pk,數(*)作為num_entries從生活.報告集團通過pk
若要防止驗證中使用的表的持久性,請使用臨時關鍵字在表定義中。
您的管道需要處理來自Azure事件集線器的消息。但是,不能使用結構化流事件集線器連接器因為這個庫不是Databricks Runtime和Delta Live Tables的一部分不允許您使用第三方JVM庫.
Azure事件集中性提供了一個與Apache Kafka兼容的端點,您可以將其與結構化流Kafka連接器,在Databricks Runtime中可用,用於處理來自Azure事件集中性的消息。有關Azure Event hub和Apache Kafka兼容性的更多信息,請參見使用來自Apache Kafka應用程序的Azure事件中心.
以下步驟描述將Delta Live Tables管道連接到現有事件集線器實例,並使用來自主題的事件。要完成這些步驟,您需要以下事件集線器連接值:
事件集線器名稱空間的名稱。
事件集線器名稱空間中的事件集線器實例的名稱。
事件集線器的共享訪問策略名稱和策略鍵。默認為ARootManageSharedAccessKey策略為每個事件集線器名稱空間創建。這項政策管理,發送而且聽權限。如果管道隻從事件集線器讀取數據,Databricks建議創建隻具有監聽權限的新策略。
RootManageSharedAccessKey
管理
發送
聽
有關事件集線器連接字符串的詳細信息,請參見獲取事件集線器連接字符串.
Azure Event Hubs提供OAuth 2.0和共享訪問簽名(SAS)選項來授權對安全資源的訪問。這些指令使用基於sas的身份驗證。
如果從Azure門戶獲取事件集線器連接字符串,則該連接字符串可能不包含EntityPath價值。的EntityPath值僅在使用結構化流事件集線器連接器時才需要。使用結構化流Kafka連接器隻需要提供主題名。
EntityPath
由於策略鍵是敏感信息,Databricks建議不要在管道代碼中硬編碼該值。相反,使用Databricks秘密來存儲和管理對密鑰的訪問。
下麵的示例使用Databricks CLI創建一個秘密作用域,並將密鑰存儲在該秘密作用域中。在管道代碼中,使用dbutils.secrets.get ()函數與scope-name而且shared-policy-name檢索鍵值。
dbutils.secrets.get ()
scope-name
shared-policy-name
Databricks——profile secrets create-scope——scope Databricks——profile secrets put——scope ——key ——string-value
有關Databricks秘密的更多信息,請參見保密管理.有關使用Secrets CLI的更多信息,請參見CLI的秘密.
下麵的示例從主題中讀取IoT事件,但您可以根據應用程序的需求調整示例。作為最佳實踐,Databricks建議使用Delta Live Tables管道設置來配置應用程序變量。管道代碼然後使用spark.conf.get ()函數檢索值。有關使用管道設置參數化管道的詳細信息,請參見參數化管道.
spark.conf.get ()
進口dlt進口pyspark.sql.types作為T從pyspark.sql.functions進口*#事件集線器配置EH_NAMESPACE=火花.相依.得到(“iot.ingestion.eh.namespace”)EH_NAME=火花.相依.得到(“iot.ingestion.eh.name”)EH_CONN_SHARED_ACCESS_KEY_NAME=火花.相依.得到(“iot.ingestion.eh.accessKeyName”)SECRET_SCOPE=火花.相依.得到(“io.ingestion.eh.secretsScopeName”)EH_CONN_SHARED_ACCESS_KEY_VALUE=dbutils.秘密.得到(範圍=SECRET_SCOPE,關鍵=EH_CONN_SHARED_ACCESS_KEY_NAME)EH_CONN_STR=f“端點=某人:/ /{EH_NAMESPACE}.servicebus.windows.net/; SharedAccessKeyName ={EH_CONN_SHARED_ACCESS_KEY_NAME}; SharedAccessKey ={EH_CONN_SHARED_ACCESS_KEY_VALUE}"# Kafka消費者配置KAFKA_OPTIONS={“kafka.bootstrap.servers”:f"{EH_NAMESPACE}.servicebus.windows.net: 9093”,“訂閱”:EH_NAME,“kafka.sasl.mechanism”:“普通”,“kafka.security.protocol”:“SASL_SSL”,“kafka.sasl.jaas.config”:f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\”ConnectionString美元\”密碼=\”{EH_CONN_STR}\”;“,“kafka.request.timeout.ms”:火花.相依.得到(“iot.ingestion.kafka.requestTimeout”),“kafka.session.timeout.ms”:火花.相依.得到(“iot.ingestion.kafka.sessionTimeout”),“maxOffsetsPerTrigger”:火花.相依.得到(“iot.ingestion.spark.maxOffsetsPerTrigger”),“failOnDataLoss”:火花.相依.得到(“iot.ingestion.spark.failOnDataLoss”),“startingOffsets”:火花.相依.得到(“iot.ingestion.spark.startingOffsets”)}#負載模式payload_ddl="""battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING,濕度BIGINT, ip STRING,緯度DOUBLE, lcd STRING,經度DOUBLE,刻度STRING, temp BIGINT,時間戳BIGINT"""payload_schema=T._parse_datatype_string(payload_ddl)#基本記錄解析和添加ETL審計列def解析(df):返回(df.withColumn(“記錄”,上校(“價值”).投(“字符串”)).withColumn(“parsed_records”,from_json(上校(“記錄”),payload_schema)).withColumn(“iot_event_timestamp”,expr(”(from_unixtime (parsed_records。Timestamp / 1000)作為Timestamp)")).withColumn(“eh_enqueued_timestamp”,expr(“時間戳”)).withColumn(“eh_enqueued_date”,expr(“to_date(時間戳)”)).withColumn(“etl_processed_timestamp”,上校(“current_timestamp”)).withColumn(“etl_rec_uuid”,expr(“uuid()”)).下降(“記錄”,“價值”,“關鍵”))@dlt.create_table(評論=“原始物聯網事件”,table_properties={“質量”:“青銅”,“pipelines.reset.allowed”:“假”如果完全刷新,則保留增量表中的數據},partition_cols=[“eh_enqueued_date”])@dlt.預計(“valid_topic”,“主題不是空的”)@dlt.預計(“有效的記錄”,“parsed_records不是空的”)defiot_raw():返回(火花.readStream.格式(“卡夫卡”).選項(**KAFKA_OPTIONS).負載().變換(解析))
使用以下設置創建一個新管道,將占位符值替換為適合您的環境的值。
{“集群”:[{“標簽”:“默認”,“spark_conf”:{“spark.hadoop.fs.azure.account.key。< storage-account-name > .dfs.core.windows.net”:“{{秘密/ < scope-name > / <秘密名字>}}”},“num_workers”:4}),“發展”:真正的,“連續”:假,“通道”:“當前”,“版”:“高級”,“光子”:假,“庫”:[{“筆記本”:{“路徑”:“< path-to-notebook >”}}),“名稱”:“dlt_eventhub_ingestion_using_kafka”,“存儲”:“abfss: / / <容器名稱> @ < storage-account-name > .dfs.core.windows.net/iot/”,“配置”:{“iot.ingestion.eh.namespace”:“< eh-namespace >”,“iot.ingestion.eh.accessKeyName”:“< eh-policy-name >”,“iot.ingestion.eh.name”:“< eventhub >”,“io.ingestion.eh.secretsScopeName”:“< secret-scope-name >”,“iot.ingestion.spark.maxOffsetsPerTrigger”:“50000”,“iot.ingestion.spark.startingOffsets”:“最新”,“iot.ingestion.spark.failOnDataLoss”:“假”,“iot.ingestion.kafka.requestTimeout”:“60000”,“iot.ingestion.kafka.sessionTimeout”:“30000”},“目標”:“< target-database-name >”}
<容器名稱>使用Azure存儲帳戶容器的名稱。
< storage-account-name >使用ADLS Gen2存儲帳戶的名稱。
< eh-namespace >使用Event Hubs名稱空間的名稱。
< eh-namespace >
< eh-policy-name >使用事件集線器策略鍵的秘密範圍鍵。
< eh-policy-name >
< eventhub >使用事件集線器實例的名稱。
< eventhub >
< secret-scope-name >使用包含事件集線器策略鍵的Databricks秘密作用域的名稱。
< secret-scope-name >
作為最佳實踐,該管道不使用默認的DBFS存儲路徑,而是使用Azure數據湖存儲Gen2 (ADLS Gen2)存儲帳戶。有關為ADLS Gen2存儲帳戶配置身份驗證的詳細信息,請參見在管道中使用秘密.
您希望在公共位置存儲和維護Python代碼,例如,訪問存儲在源代碼控製中的代碼或希望跨管道共享的代碼。
使用磚回購來存儲Python代碼。然後可以將Python代碼作為模塊導入管道筆記本。有關在Databricks回購中管理文件的詳細信息,請參見使用Python和R模塊.
下麵的示例改編自Delta Live Tables中的示例快速入門通過從repo導入數據集查詢。運行此示例,請執行以下步驟:
要為Python代碼創建回購,請單擊回購在側欄中,單擊添加回購.
取消選擇通過克隆Git存儲庫創建repo並在。中輸入回購的名稱庫名稱,例如,dlt-quickstart-repo.
dlt-quickstart-repo
創建一個模塊,將源數據讀入表:單擊repo名稱旁邊的向下箭頭,選擇創建>文件,並輸入文件名,例如:clickstream_raw_module.py.打開文件編輯器。在編輯器窗口輸入以下內容:
clickstream_raw_module.py
從dlt進口*json_path=“/ databricks-datasets / wikipedia-datasets /數據2015 - 001 /點擊流/ raw-uncompressed-json / _2_clickstream.json”defcreate_clickstream_raw_table(火花):@ tabledefclickstream_raw():返回(火花.讀.json(json_path))
創建一個模塊來創建一個包含準備好的數據的新表:select創建>文件再次輸入文件名稱,例如,clickstream_prepared_module.py.在新的編輯器窗口中輸入以下內容:
clickstream_prepared_module.py
從clickstream_raw_module進口*從dlt進口讀從pyspark.sql.functions進口*從pyspark.sql.types進口*defcreate_clickstream_prepared_table(火花):create_clickstream_raw_table(火花)@ table@expect(“valid_current_page_title”,"current_page_title不是NULL")@expect_or_fail(“valid_count”,"點擊計數> 0")defclickstream_prepared():返回(讀(“clickstream_raw”).withColumn(“click_count”,expr(“CAST(n AS INT)”)).withColumnRenamed(“curr_title”,“current_page_title”).withColumnRenamed(“prev_title”,“previous_page_title”).選擇(“current_page_title”,“click_count”,“previous_page_title”))
創建一個流水線筆記本:轉到你的Databricks登陸頁麵並選擇創建一個筆記本,或按新在側欄中選擇筆記本.的創建筆記本對話框出現了。您還可以通過單擊回購名稱旁邊的向下箭頭並選擇,在回購中創建筆記本創建>筆記本.
在創建筆記本對話,給你的筆記本起個名字,然後選擇Python從默認的語言下拉菜單。你可以離開了集群設置為默認值。
點擊創建.
在筆記本中輸入示例代碼。
如果你在工作區中創建了notebook,或者在一個不同於Python模塊路徑的repo路徑中創建了notebook,請在notebook的第一個單元格中輸入以下代碼:
進口sys,操作係統sys.路徑.附加(操作係統.路徑.abspath(' < repo-path >”))進口dlt從clickstream_prepared_module進口*從pyspark.sql.functions進口*從pyspark.sql.types進口*create_clickstream_prepared_table(火花)@dlt.表格(評論=包含鏈接到Apache Spark頁麵的頂部頁麵的表。)deftop_spark_referrers():返回(dlt.讀(“clickstream_prepared”).過濾器(expr("current_page_title == 'Apache_Spark'")).withColumnRenamed(“previous_page_title”,“referrer”).排序(desc(“click_count”)).選擇(“referrer”,“click_count”).限製(10))
取代< repo-path >包含要導入的Python模塊的Databricks repo的路徑。
< repo-path >
如果您在與正在導入的模塊相同的repo中創建了管道筆記本,則不需要指定repo路徑sys.path.append.在筆記本的第一個單元格中輸入以下代碼:
sys.path.append
進口sys,操作係統進口dlt從clickstream_prepared_module進口*從pyspark.sql.functions進口*從pyspark.sql.types進口*create_clickstream_prepared_table(火花)@dlt.表格(評論=包含鏈接到Apache Spark頁麵的頂部頁麵的表。)deftop_spark_referrers():返回(dlt.讀(“clickstream_prepared”).過濾器(expr("current_page_title == 'Apache_Spark'")).withColumnRenamed(“previous_page_title”,“referrer”).排序(desc(“click_count”)).選擇(“referrer”,“click_count”).限製(10))
創建管道使用新筆記本。
運行管道,在管道的細節頁麵,點擊開始.