教程:用Python在Delta Live Tables中聲明一個數據管道
預覽
此功能已在公共預覽。
本教程向您展示如何使用Python語法在Delta Live Tables中聲明數據管道。熟悉PySpark或Pandas for Spark的用戶可以在Delta Live表中使用dataframe。對於不熟悉Spark DataFrames的用戶,Databricks建議使用SQL For Delta Live Tables。看到教程:在Delta Live Tables中使用SQL聲明一個數據管道。
Delta Live Tables的Python語法擴展了標準PySpark,使用一組通過dlt
模塊。
請注意
不能在Delta Live Tables源代碼文件中混合語言。您可以在一個管道中使用多個具有不同語言的筆記本或文件。
在哪裏運行Delta Live Tables Python查詢?
您可以使用筆記本電腦或Python文件來編寫Delta Live Tables Python查詢,但Delta Live Tables不是為在筆記本電腦單元格中交互式運行而設計的。
Delta Live Tables與許多Python腳本有一個關鍵的區別:您不調用執行數據攝取和轉換的函數來創建Delta Live Tables數據集。類中的裝飾器函數進行解釋dlt
模塊中的所有文件加載到管道中,並構建一個數據流圖。
重要的
在為Delta Live table編寫Python時,不能依賴於筆記本的逐個單元格執行順序。Delta Live Tables計算並運行筆記本中定義的所有代碼,但它的執行模型與筆記本完全不同運行所有命令。
在Databricks筆記本中執行包含Delta Live Tables語法的單元格將導致錯誤消息。要了解如何使用Delta活動表配置管道,請參見教程:運行您的第一個Delta Live Tables管道。
用Python聲明一個Delta Live Tables數據管道
本教程演示使用Python語法在包含維基百科點擊流數據的數據集上聲明Delta Live Tables管道:
將原始JSON點擊流數據讀入一個表。
從原始數據表中讀取記錄並使用Delta Live Tables預期創建一個包含已清理數據的新表。
使用已清理數據表中的記錄進行Delta Live Tables查詢,以創建派生數據集。
這段代碼演示了勳章體係結構的簡化示例。看到什麼是圓形湖屋建築?。
複製Python代碼並將其粘貼到一個新的Python筆記本中。您可以將示例代碼添加到筆記本的單個單元格或多個單元格中。要查看用於創建筆記本的選項,請參見創建一個筆記本。
請注意
在使用Python接口創建管道時,默認情況下,表名由函數名定義。例如,下麵的Python示例創建了三個名為clickstream_raw
,clickstream_prepared
,top_spark_referrers
。方法重寫表名名字
參數。看到創建一個Delta Live Tables實體化視圖或流表。
導入Delta Live Tables模塊
所有Delta Live Tables Python api都在dlt
模塊。顯式導入dlt
模塊在Python筆記本和文件的頂部。
下麵的示例展示了這個導入,以及for的import語句pyspark.sql.functions
。
進口dlt從pyspark.sql.functions進口*
執行任意的Python語句
你可以在筆記本中定義Python變量和函數以及Delta Live Tables代碼。所有Python邏輯都作為Delta Live Tables解析管道圖運行。
下麵的代碼聲明了一個文本變量,用於後麵的步驟加載JSON數據文件:
json_path=“/ databricks-datasets / wikipedia-datasets /數據2015 - 001 /點擊流/ raw-uncompressed-json / _2_clickstream.json”
從對象存儲係統中的文件中創建表
Delta Live Tables支持從Databricks支持的所有格式加載數據。看到與Databricks上的外部數據交互。
的@dlt.table
decorator告訴Delta Live Tables創建一個包含函數返回的DataFrame結果的表。添加@dlt.table
在任何返回Spark DataFrame的Python函數定義之前,在Delta Live Tables中注冊一個新表。下麵的例子演示了使用函數名作為表名,並向表中添加描述性注釋:
@dlt。表格(評論=“原始維基百科點擊流數據集,從/ databicks -datasets中攝取。”)defclickstream_raw():返回(火花。讀。格式(“json”)。負載(json_path))
從管道中的上遊數據集中添加一個表
你可以使用dlt.read ()
從當前Delta Live Tables管道中聲明的其他數據集中讀取數據。以這種方式聲明新表會創建一個依賴項,Delta Live tables會在執行更新之前自動解析這個依賴項。下麵的代碼還包括使用預期監視和強製執行數據質量的示例。看到使用Delta Live Tables管理數據質量。
@dlt。表格(評論=“維基百科點擊流數據被清理並準備好進行分析。”)@dlt。預計(“valid_current_page_title”,"current_page_title不是NULL")@dlt。expect_or_fail(“valid_count”,"點擊計數> 0")defclickstream_prepared():返回(dlt。讀(“clickstream_raw”)。withColumn(“click_count”,expr(“CAST(n AS INT)”))。withColumnRenamed(“curr_title”,“current_page_title”)。withColumnRenamed(“prev_title”,“previous_page_title”)。選擇(“current_page_title”,“click_count”,“previous_page_title”))
創建具有豐富數據視圖的表
因為Delta Live Tables以一係列依賴關係圖的形式處理對管道的更新,所以您可以聲明高度豐富的視圖,通過聲明具有特定業務邏輯的表來支持儀表板、BI和分析。
Delta Live Tables表在概念上等同於物化視圖。Spark上的傳統視圖在每次查詢視圖時執行邏輯,而Delta Live Tables表將查詢結果的最新版本存儲在數據文件中。因為Delta Live Tables管理管道中所有數據集的更新,所以您可以安排管道更新以匹配物化視圖的延遲要求,並知道對這些表的查詢包含可用數據的最新版本。
下麵代碼定義的表演示了從管道中的上遊數據派生的物化視圖在概念上的相似性:
@dlt。表格(評論=包含鏈接到Apache Spark頁麵的頂部頁麵的表。)deftop_spark_referrers():返回(dlt。讀(“clickstream_prepared”)。過濾器(expr("current_page_title == 'Apache_Spark'"))。withColumnRenamed(“previous_page_title”,“referrer”)。排序(desc(“click_count”))。選擇(“referrer”,“click_count”)。限製(10))
使用其他Delta Live Tables選項的示例
Delta Live Tables實體化視圖和流表支持上麵示例中沒有顯示的其他選項。下麵的示例指定目標表的模式,包括使用Delta Lake生成的列。還定義了目標表的分區列。
請注意
對於大小小於1tb的表,Databricks建議讓Delta Live tables控製數據組織。除非您希望您的表增長超過1tb,否則通常不應該指定分區列。
@dlt。表格(評論=“銷售原始數據”,模式=”“”customer_id字符串,customer_name字符串,number_of_line_items字符串,order_datetime字符串,order_number長,(dayofweek(order_datetime))”“”,partition_cols=(“order_day_of_week”])def銷售():返回(“…”)