從工作區文件導入Python模塊
本文提供了指導如何Python模塊和包從工作區文件導入到< DLT >管道。
你可以存儲在<磚>工作區或Python代碼磚回購模塊或包。你可以進口管道筆記本的Python代碼。有關管理文件的更多信息在磚回購,明白了使用Python和R模塊。
導入一個Python模塊從回購< DLT >
下麵的例子適應< DLT >的例子教程通過導入數據集查詢從回購作為一個Python模塊。運行這個示例中,使用以下步驟:
為您的Python代碼創建一個回購,單擊<回購圖標>回購在側邊欄,然後單擊添加回購。
取消選擇通過複製Git存儲庫創建回購和回購的輸入一個名稱庫名稱例如,
dlt-quickstart-repo
。創建一個模塊讀取源數據表:回購的名字旁邊單擊向下箭頭,選擇創建>文件文件,輸入一個名稱,例如,
clickstream_raw_module.py
。文件編輯器打開。在編輯器窗口中輸入以下:從dlt進口*json_path=“/ databricks-datasets / wikipedia-datasets /數據2015 - 001 /點擊流/ raw-uncompressed-json / _2_clickstream.json”defcreate_clickstream_raw_table(火花):@ tabledefclickstream_raw():返回(火花。讀。json(json_path))
創建一個模塊來創建一個新的表包含準備數據:選擇創建>文件輸入文件的名稱,例如,
clickstream_prepared_module.py
。在新的編輯器窗口中輸入以下:從clickstream_raw_module進口*從dlt進口讀從pyspark.sql.functions進口*從pyspark.sql.types進口*defcreate_clickstream_prepared_table(火花):create_clickstream_raw_table(火花)@ table@expect(“valid_current_page_title”,“current_page_title NOT NULL”)@expect_or_fail(“valid_count”,“click_count > 0 ")defclickstream_prepared():返回(讀(“clickstream_raw”)。withColumn(“click_count”,expr(“鑄(n為INT)”))。withColumnRenamed(“curr_title”,“current_page_title”)。withColumnRenamed(“prev_title”,“previous_page_title”)。選擇(“current_page_title”,“click_count”,“previous_page_title”))
創建一個管道筆記本:去你<磚>著陸頁並選擇創建一個筆記本,或單擊<新圖標>新在側邊欄並選擇筆記本。的創建筆記本對話框出現了。您還可以創建回購的筆記本通過點擊旁邊的向下箭頭回購名稱並選擇創建>筆記本。
在創建筆記本對話,給你的筆記本一個名稱並選擇Python從默認的語言下拉菜單。你可以把集群設置為默認值。
點擊創建。
輸入筆記本中的示例代碼。
如果你在工作區中創建了筆記本或回購路徑不同於Python模塊的路徑,輸入以下代碼的第一個細胞筆記本:
進口sys,操作係統sys。路徑。附加(操作係統。路徑。abspath(“< repo-path >”))進口dlt從clickstream_prepared_module進口*從pyspark.sql.functions進口*從pyspark.sql.types進口*create_clickstream_prepared_table(火花)@dlt。表(評論=“一個表包含頁麵鏈接到Apache火花頁麵頂部。”)deftop_spark_referrers():返回(dlt。讀(“clickstream_prepared”)。過濾器(expr(“current_page_title = = Apache_Spark”))。withColumnRenamed(“previous_page_title”,“referrer”)。排序(desc(“click_count”))。選擇(“referrer”,“click_count”)。限製(10))
取代
< repo-path >
與磚的路徑包含Python模塊導入的回購。如果你創建的管道筆記本在同一個回購作為你導入的模塊,您不需要指定回購路徑
sys.path.append
。輸入以下代碼的第一個細胞筆記本:進口sys,操作係統進口dlt從clickstream_prepared_module進口*從pyspark.sql.functions進口*從pyspark.sql.types進口*create_clickstream_prepared_table(火花)@dlt。表(評論=“一個表包含頁麵鏈接到Apache火花頁麵頂部。”)deftop_spark_referrers():返回(dlt。讀(“clickstream_prepared”)。過濾器(expr(“current_page_title = = Apache_Spark”))。withColumnRenamed(“previous_page_title”,“referrer”)。排序(desc(“click_count”))。選擇(“referrer”,“click_count”)。限製(10))
創建一個管道使用新的筆記本。
運行管道,管道的細節頁麵,點擊開始。
你也可以導入Python代碼作為一個包。下麵的代碼片段從< DLT >筆記本進口test_utils
包的dlt_packages
目錄裏麵的回購根筆記本。的dlt_packages
目錄包含文件test_utils.py
和__init__ . py
,test_utils.py
定義的函數create_test_table ()
:
進口dlt@dlt。表defmy_table():返回dlt。讀(…)#……進口dlt_packages.test_utils作為test_utilstest_utils。create_test_table(火花)