三角洲生活表Python語言參考<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#delta-live-tables-python-language-reference" title="">
預覽
這個特性是在<一個class="reference internal" href="//www.eheci.com/docs.gcp/release-notes/release-types.html">公共預覽。
本文提供了細節的三角洲生活表Python編程接口。
SQL API的信息,請參閱<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/sql-ref.html">三角洲生活表SQL語言參考。
具體配置自動加載程序的詳細信息,請參見<一個class="reference internal" href="//www.eheci.com/docs.gcp/ingestion/auto-loader/index.html">自動加載器是什麼?。
限製<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#limitations" title="">
三角洲住Python接口表有以下限製:
Python
表
和視圖
函數必須返回一個DataFrame。一些在DataFrames操作的函數不返回DataFrames,不應使用。因為DataFrame轉換執行後完整的數據流圖已經解決,使用這樣的操作可能會產生意想不到的副作用。這些操作包括等功能收集()
,count ()
,toPandas ()
,save ()
,saveAsTable ()
。然而,您可以包括這些功能之外的表
或視圖
函數定義,因為這段代碼運行一次初始化階段在圖。的
主()
不支持功能。的主
操作的火花需要立即加載輸入數據來計算輸出的模式。不支持此功能在三角洲住表。
導入dlt
Python模塊<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#import-the-dlt-python-module" title="">
三角洲住表的Python函數中定義dlt
模塊。你的管道用Python API必須進口這個模塊:實現
進口dlt
創建一個三角洲住表物化視圖或流表<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-a-delta-live-tables-materialized-view-or-streaming-table" title="">
在Python中,δ住表確定更新數據集作為一個物化視圖或流表的基礎上,定義查詢。的@ table
修飾符是用來定義物化視圖和流表。
定義一個物化視圖在Python中,適用@ table
查詢執行靜態讀對一個數據源。定義一個流表,適用@ table
查詢執行流讀取數據源。這兩個數據集類型有相同的語法規範如下:
進口dlt@dlt。表(的名字=“<名稱>”,評論=“< >評論”,spark_conf={“<鍵>”:“<價值”,“<鍵”:“< >價值”},table_properties={“<鍵>”:“< >價值”,“<鍵>”:“< >價值”},路徑=“< storage-location-path >”,partition_cols=(“<劃分字段>”,“<劃分字段>”),模式=“模式定義”,臨時=假)@dlt。預計@dlt。expect_or_fail@dlt。expect_or_drop@dlt。expect_all@dlt。expect_all_or_drop@dlt。expect_all_or_faildef<函數- - - - - -的名字>():返回(<查詢>)
創建一個三角洲住表視圖<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-a-delta-live-tables-view" title="">
定義一個視圖在Python中,應用@view
裝飾。就像@ table
修飾符,您可以使用視圖在三角洲住表靜態或流媒體數據集。下麵是用Python語法定義視圖:
進口dlt@dlt。視圖(的名字=“<名稱>”,評論=“< >評論”)@dlt。預計@dlt。expect_or_fail@dlt。expect_or_drop@dlt。expect_all@dlt。expect_all_or_drop@dlt。expect_all_or_faildef<函數- - - - - -的名字>():返回(<查詢>)
例如:定義表和視圖<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-define-tables-and-views" title="">
定義一個表或視圖在Python中,應用@dlt.view
或@dlt.table
裝飾功能。您可以使用函數名或的名字
參數指定表或視圖的名稱。下麵的例子定義了兩個不同的數據集:一個視圖taxi_raw
這需要JSON文件作為輸入源和一個表filtered_data
這需要的taxi_raw
認為輸入:
進口dlt@dlt。視圖deftaxi_raw():返回火花。讀。格式(“json”)。負載(“/ databricks-datasets / nyctaxi /樣本/ json /”)#使用函數名作為表名@dlt。表deffiltered_data():返回dlt。讀(“taxi_raw”)。在哪裏(…)#使用名稱參數作為表名@dlt。表(的名字=“filtered_data”)defcreate_filtered_data():返回dlt。讀(“taxi_raw”)。在哪裏(…)
例如:訪問數據集定義在相同的管道<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-access-a-dataset-defined-in-the-same-pipeline" title="">
除了閱讀從外部數據源,您可以訪問數據集定義在相同的管道與三角洲生活表read ()
函數。下麵的例子演示了創建一個beplay体育app下载地址customers_filtered
數據集使用read ()
功能:
@dlt。表defbeplay体育app下载地址customers_raw():返回火花。讀。格式(“csv”)。負載(“/數據/ cbeplay体育app下载地址ustomers.csv”)@dlt。表defbeplay体育app下载地址customers_filteredA():返回dlt。讀(“beplay体育app下载地址customers_raw”)。在哪裏(…)
您還可以使用spark.table ()
函數來訪問數據集定義在相同的管道。當使用spark.table ()
函數來訪問數據集定義的管道,在函數參數預先考慮生活
數據集名稱關鍵字:
@dlt。表defbeplay体育app下载地址customers_raw():返回火花。讀。格式(“csv”)。負載(“/數據/ cbeplay体育app下载地址ustomers.csv”)@dlt。表defbeplay体育app下载地址customers_filteredB():返回火花。表(“LIVE.beplay体育app下载地址customers_raw”)。在哪裏(…)
例如:在metastore讀取注冊表<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-read-from-a-table-registered-in-a-metastore" title="">
讀取數據從一個表在蜂巢metastore注冊,在函數參數省略了生活
關鍵字選擇限定表名和數據庫名稱:
@dlt。表defbeplay体育app下载地址():返回火花。表(“sales.beplay体育app下载地址customers”)。在哪裏(…)
例如閱讀從統一編目表,看看<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/unity-catalog.html">數據攝取到統一目錄管道。
例如:訪問數據集使用spark.sql
您也可以返回一個數據集使用spark.sql
表達一個查詢功能。閱讀從一個內部數據集,預謀生活。
數據集名稱:
@dlt。表defchicago_beplay体育app下载地址customers():返回火花。sql(“SELECT * FROM生活。beplay体育app下载地址customers_cleaned城市=“芝加哥”)
控製表是如何實現的<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#control-how-tables-are-materialized" title="">
表也提供額外的控製他們的具體化:
指定如何表<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#schema-partition-example">分區使用
partition_cols
。您可以使用分區加快查詢速度。你可以設置表屬性定義一個視圖或表。看到<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/properties.html">三角洲生活表屬性表。
設置表數據使用的存儲位置
路徑
設置。默認情況下,管道表數據存儲在存儲位置路徑
不設置。您可以使用<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta/generated-columns.html">生成的列在你的模式定義。看到<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#schema-partition-example">例如:指定一個模式和分區列。
請注意
表的大小小於1 TB,磚建議讓三角洲生活表控製數據的組織。除非你希望你表超出tb,通常不應當指定分區列。
例如:指定一個模式和分區列<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-specify-a-schema-and-partition-columns" title="">
您可以指定一個表模式使用PythonStructType
或一個SQL DDL字符串。指定一個DDL字符串時,可以包括定義<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta/generated-columns.html">生成的列。
下麵的示例創建一個表銷售
指定一個模式使用PythonStructType
:
sales_schema=StructType([StructField(“customer_id”,StringType(),真正的),StructField(“customer_name”,StringType(),真正的),StructField(“number_of_line_items”,StringType(),真正的),StructField(“order_datetime”,StringType(),真正的),StructField(“order_number”,LongType(),真正的)])@dlt。表(評論=“銷售原始數據”,模式=sales_schema)def銷售():返回(“…”)
下麵的示例指定的模式使用DDL字符串表,定義了一個生成的列,並定義一個分區列:
@dlt。表(評論=“銷售原始數據”,模式=”“”customer_id字符串,customer_name字符串,number_of_line_items字符串,order_datetime字符串,order_number長,order_day_of_week字符串生成總是像(dayofweek (order_datetime))”“”,partition_cols=(“order_day_of_week”])def銷售():返回(“…”)
默認情況下,三角洲生活表推斷的模式表
如果你不指定一個模式定義。
配置一個流表忽略流源表的變化<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#configure-a-streaming-table-to-ignore-changes-in-a-source-streaming-table" title="">
請注意
使用
skipChangeCommits
國旗,你必須選擇預覽在管道設置通道。的
skipChangeCommits
國旗隻能與spark.readStream
使用選擇()
函數。你不能使用這個標誌dlt.read_stream ()
函數。
默認情況下,流表需要擴展來源。當一個流表使用另一個流表源,源流表需要更新或者刪除,例如,GDPR“被遺忘”處理skipChangeCommits
國旗可以設置在目標流表無視這些變化。更多信息關於這個國旗,看到的<一個class="reference internal" href="//www.eheci.com/docs.gcp/structured-streaming/delta-lake.html">忽略更新和刪除。
@ tabledefb():返回火花。readStream。選項(“skipChangeCommits”,“真正的”)。表(“LIVE.A”)
Python三角洲生活表屬性<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#python-delta-live-tables-properties" title="">
下麵的表描述的選項和屬性可以指定與δ定義表和視圖時生活表:
@ table或@view |
---|
的名字 類型: 一個可選的表或視圖的名稱。如果沒有定義,函數名稱用作表或視圖名稱。 |
評論 類型: 一個可選描述表。 |
spark_conf 類型: 一個可選的火花配置列表這個查詢的執行。 |
table_properties 類型: 一個可選列表<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/properties.html">表屬性為表。 |
路徑 類型: 一個可選的表數據的存儲位置。如果沒有設置,係統將默認存儲位置。 |
partition_cols 類型: 一個可選的集合,例如,一個 |
模式 類型: 一個可選的模式定義為表。模式可以被定義為一個SQL DDL字符串,或Python |
臨時 類型: 創建一個臨時表。不持續這個表的元數據。 默認值是“假”。 |
表或視圖的定義 |
---|
def <函數名> () 一個Python函數,定義了數據集。如果 |
查詢 火花SQL語句返回一個火花數據集或考拉DataFrame。 使用
您還可以使用
使用 使用 使用<一個class="reference external" href="//www.eheci.com/api-docs/python/pyspark/latest/pyspark.sql/dataframe.html">PySpark與Python語法來定義三角洲生活表查詢。 |
預期 |
---|
@expect(“描述”、“約束”) 聲明一個數據質量約束了 |
@expect_or_drop(“描述”、“約束”) 聲明一個數據質量約束了 |
@expect_or_fail(“描述”、“約束”) 聲明一個數據質量約束了 |
@expect_all(期望) 聲明一個或多個數據質量約束。 |
@expect_all_or_drop(期望) 聲明一個或多個數據質量約束。 |
@expect_all_or_fail(期望) 聲明一個或多個數據質量約束。 |
改變δ生活表中數據獲取與Python<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#change-data-capture-with-python-in-delta-live-tables" title="">
使用apply_changes ()
函數在Python API使用三角洲住表疾病預防控製中心的功能。三角洲生活表Python疾控中心還提供了接口<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-target-fn">create_streaming_table ()函數。您可以使用此函數創建所需的目標表apply_changes ()
函數。
apply_changes(目標=“<目標表>”,源=“<數據源>”,鍵=(“key1”,“key2”,“keyN”),sequence_by=“< sequence-column >”,ignore_null_updates=假,apply_as_deletes=沒有一個,apply_as_truncates=沒有一個,column_list=沒有一個,except_column_list=沒有一個,stored_as_scd_type=<類型>,track_history_column_list=沒有一個,track_history_except_column_list=沒有一個)
請注意
的默認行為插入
和更新
事件是插入疾控中心事件從源:更新任何目標表中的行匹配指定的鍵(s)或時插入一個新行匹配的目標表中的記錄不存在。處理的刪除
可以指定的事件應用作為刪除當
條件。
重要的
你必須聲明一個目標流表應用更改。您可以選擇指定目標表的模式。當指定的模式apply_changes
目標表,你還必須包括__START_AT
和__END_AT
列有相同的數據類型sequence_by
字段。
看到<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/cdc.html">改變數據獲取與三角洲生活表。
參數 |
---|
目標 類型: 要更新的表的名稱。您可以使用<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-target-fn">create_streaming_table ()函數執行之前創建目標表 這個參數是必需的。 |
源 類型: 數據源包含疾病預防控製中心記錄。 這個參數是必需的。 |
鍵 類型: 列或列的組合唯一地標識源數據中的一行。這是用來確定哪些疾病預防控製中心事件適用於目標表中的特定記錄。 您可以指定:
參數 這個參數是必需的。 |
sequence_by 類型: 疾控中心的列名稱指定邏輯順序事件源數據。三角洲生活表使用這個序列處理變更的事件到達的順序。 您可以指定:
參數 這個參數是必需的。 |
ignore_null_updates 類型: 允許攝入更新包含目標列的一個子集。當一個事件匹配現有的行和疾控中心 這個參數是可選的。 默認值是 |
apply_as_deletes 類型: 指定當事件應該被視為一個疾控中心 您可以指定:
這個參數是可選的。 |
apply_as_truncates 類型: 指定當一個中心事件應該被視為一個完整的表 的 您可以指定:
這個參數是可選的。 |
column_listexcept_column_list 類型: 列的一個子集,包括在目標表中。使用
參數 這個參數是可選的。 默認是在目標表時沒有包含所有列 |
stored_as_scd_type 類型: 是否保存記錄SCD 1型或SCD 2型。 設置為 這一條款是可選的。 缺省值是SCD 1型。 |
track_history_column_listtrack_history_except_column_list 類型: 輸出列的子集被跟蹤目標表的曆史。當 參數 這個參數是可選的。 默認是在目標表時沒有包含所有列 使用這些參數,必須設置 |
為疾病預防控製中心創建一個目標表輸出<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-a-target-table-for-cdc-output" title="">
使用create_streaming_table ()
函數創建的目標表apply_changes ()
輸出記錄。
請注意
的create_target_table ()
和create_streaming_live_table ()
函數是棄用。磚建議更新現有代碼使用create_streaming_table ()
函數。
create_streaming_table(的名字=“<表名稱>”,評論=“< >評論”spark_conf={“<鍵>”:“<價值”,“<鍵”:“< >價值”},table_properties={“<鍵>”:“< >價值”,“<鍵>”:“< >價值”},partition_cols=(“<劃分字段>”,“<劃分字段>”),路徑=“< storage-location-path >”,模式=“模式定義”)
參數 |
---|
的名字 類型: 表名。 這個參數是必需的。 |
評論 類型: 一個可選描述表。 |
spark_conf 類型: 一個可選的火花配置列表這個查詢的執行。 |
table_properties 類型: 一個可選列表<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/properties.html">表屬性為表。 |
partition_cols 類型: 一個可選的一列或多列列表用於分區表。 |
路徑 類型: 一個可選的表數據的存儲位置。如果沒有設置,係統將默認存儲位置。 |
模式 類型: 一個可選的模式定義為表。模式可以被定義為一個SQL DDL字符串,或Python |