三角洲生活表Python語言參考<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#delta-live-tables-python-language-reference" title="">

預覽

這個特性是在<一個class="reference internal" href="//www.eheci.com/docs.gcp/release-notes/release-types.html">公共預覽

本文提供了細節的三角洲生活表Python編程接口。

SQL API的信息,請參閱<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/sql-ref.html">三角洲生活表SQL語言參考

具體配置自動加載程序的詳細信息,請參見<一個class="reference internal" href="//www.eheci.com/docs.gcp/ingestion/auto-loader/index.html">自動加載器是什麼?

限製<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#limitations" title="">

三角洲住Python接口表有以下限製:

  • Python視圖函數必須返回一個DataFrame。一些在DataFrames操作的函數不返回DataFrames,不應使用。因為DataFrame轉換執行完整的數據流圖已經解決,使用這樣的操作可能會產生意想不到的副作用。這些操作包括等功能收集(),count (),toPandas (),save (),saveAsTable ()。然而,您可以包括這些功能之外的視圖函數定義,因為這段代碼運行一次初始化階段在圖。

  • 主()不支持功能。的操作的火花需要立即加載輸入數據來計算輸出的模式。不支持此功能在三角洲住表。

導入dltPython模塊<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#import-the-dlt-python-module" title="">

三角洲住表的Python函數中定義dlt模塊。你的管道用Python API必須進口這個模塊:實現

進口dlt

創建一個三角洲住表物化視圖或流表<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-a-delta-live-tables-materialized-view-or-streaming-table" title="">

在Python中,δ住表確定更新數據集作為一個物化視圖或流表的基礎上,定義查詢。的@ table修飾符是用來定義物化視圖和流表。

定義一個物化視圖在Python中,適用@ table查詢執行靜態讀對一個數據源。定義一個流表,適用@ table查詢執行流讀取數據源。這兩個數據集類型有相同的語法規範如下:

進口dlt@dlt(的名字=“<名稱>”,評論=“< >評論”,spark_conf={“<鍵>”:“<價值”,“<鍵”:“< >價值”},table_properties={“<鍵>”:“< >價值”,“<鍵>”:“< >價值”},路徑=“< storage-location-path >”,partition_cols=(“<劃分字段>”,“<劃分字段>”),模式=“模式定義”,臨時=)@dlt預計@dltexpect_or_fail@dltexpect_or_drop@dltexpect_all@dltexpect_all_or_drop@dltexpect_all_or_faildef<函數- - - - - -的名字>():返回(<查詢>)

創建一個三角洲住表視圖<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-a-delta-live-tables-view" title="">

定義一個視圖在Python中,應用@view裝飾。就像@ table修飾符,您可以使用視圖在三角洲住表靜態或流媒體數據集。下麵是用Python語法定義視圖:

進口dlt@dlt視圖(的名字=“<名稱>”,評論=“< >評論”)@dlt預計@dltexpect_or_fail@dltexpect_or_drop@dltexpect_all@dltexpect_all_or_drop@dltexpect_all_or_faildef<函數- - - - - -的名字>():返回(<查詢>)

例如:定義表和視圖<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-define-tables-and-views" title="">

定義一個表或視圖在Python中,應用@dlt.view@dlt.table裝飾功能。您可以使用函數名或的名字參數指定表或視圖的名稱。下麵的例子定義了兩個不同的數據集:一個視圖taxi_raw這需要JSON文件作為輸入源和一個表filtered_data這需要的taxi_raw認為輸入:

進口dlt@dlt視圖deftaxi_raw():返回火花格式(“json”)負載(“/ databricks-datasets / nyctaxi /樣本/ json /”)#使用函數名作為表名@dltdeffiltered_data():返回dlt(“taxi_raw”)在哪裏()#使用名稱參數作為表名@dlt(的名字=“filtered_data”)defcreate_filtered_data():返回dlt(“taxi_raw”)在哪裏()

例如:訪問數據集定義在相同的管道<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-access-a-dataset-defined-in-the-same-pipeline" title="">

除了閱讀從外部數據源,您可以訪問數據集定義在相同的管道與三角洲生活表read ()函數。下麵的例子演示了創建一個beplay体育app下载地址customers_filtered數據集使用read ()功能:

@dltdefbeplay体育app下载地址customers_raw():返回火花格式(“csv”)負載(“/數據/ cbeplay体育app下载地址ustomers.csv”)@dltdefbeplay体育app下载地址customers_filteredA():返回dlt(“beplay体育app下载地址customers_raw”)在哪裏()

您還可以使用spark.table ()函數來訪問數據集定義在相同的管道。當使用spark.table ()函數來訪問數據集定義的管道,在函數參數預先考慮生活數據集名稱關鍵字:

@dltdefbeplay体育app下载地址customers_raw():返回火花格式(“csv”)負載(“/數據/ cbeplay体育app下载地址ustomers.csv”)@dltdefbeplay体育app下载地址customers_filteredB():返回火花(“LIVE.beplay体育app下载地址customers_raw”)在哪裏()

例如:在metastore讀取注冊表<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-read-from-a-table-registered-in-a-metastore" title="">

讀取數據從一個表在蜂巢metastore注冊,在函數參數省略了生活關鍵字選擇限定表名和數據庫名稱:

@dltdefbeplay体育app下载地址():返回火花(“sales.beplay体育app下载地址customers”)在哪裏()

例如閱讀從統一編目表,看看<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/unity-catalog.html">數據攝取到統一目錄管道

例如:訪問數據集使用spark.sql

您也可以返回一個數據集使用spark.sql表達一個查詢功能。閱讀從一個內部數據集,預謀生活。數據集名稱:

@dltdefchicago_beplay体育app下载地址customers():返回火花sql(“SELECT * FROM生活。beplay体育app下载地址customers_cleaned城市=“芝加哥”)

控製表是如何實現的<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#control-how-tables-are-materialized" title="">

表也提供額外的控製他們的具體化:

  • 指定如何表<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#schema-partition-example">分區使用partition_cols。您可以使用分區加快查詢速度。

  • 你可以設置表屬性定義一個視圖或表。看到<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/properties.html">三角洲生活表屬性表

  • 設置表數據使用的存儲位置路徑設置。默認情況下,管道表數據存儲在存儲位置路徑不設置。

  • 您可以使用<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta/generated-columns.html">生成的列在你的模式定義。看到<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#schema-partition-example">例如:指定一個模式和分區列

請注意

表的大小小於1 TB,磚建議讓三角洲生活表控製數據的組織。除非你希望你表超出tb,通常不應當指定分區列。

例如:指定一個模式和分區列<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#example-specify-a-schema-and-partition-columns" title="">

您可以指定一個表模式使用PythonStructType或一個SQL DDL字符串。指定一個DDL字符串時,可以包括定義<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta/generated-columns.html">生成的列

下麵的示例創建一個表銷售指定一個模式使用PythonStructType:

sales_schema=StructType([StructField(“customer_id”,StringType(),真正的),StructField(“customer_name”,StringType(),真正的),StructField(“number_of_line_items”,StringType(),真正的),StructField(“order_datetime”,StringType(),真正的),StructField(“order_number”,LongType(),真正的)])@dlt(評論=“銷售原始數據”,模式=sales_schema)def銷售():返回(“…”)

下麵的示例指定的模式使用DDL字符串表,定義了一個生成的列,並定義一個分區列:

@dlt(評論=“銷售原始數據”,模式=”“”customer_id字符串,customer_name字符串,number_of_line_items字符串,order_datetime字符串,order_number長,order_day_of_week字符串生成總是像(dayofweek (order_datetime))”“”,partition_cols=(“order_day_of_week”])def銷售():返回(“…”)

默認情況下,三角洲生活表推斷的模式如果你不指定一個模式定義。

配置一個流表忽略流源表的變化<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#configure-a-streaming-table-to-ignore-changes-in-a-source-streaming-table" title="">

請注意

  • 使用skipChangeCommits國旗,你必須選擇預覽在管道設置通道。

  • skipChangeCommits國旗隻能與spark.readStream使用選擇()函數。你不能使用這個標誌dlt.read_stream ()函數。

默認情況下,流表需要擴展來源。當一個流表使用另一個流表源,源流表需要更新或者刪除,例如,GDPR“被遺忘”處理skipChangeCommits國旗可以設置在目標流表無視這些變化。更多信息關於這個國旗,看到的<一個class="reference internal" href="//www.eheci.com/docs.gcp/structured-streaming/delta-lake.html">忽略更新和刪除

@ tabledefb():返回火花readStream選項(“skipChangeCommits”,“真正的”)(“LIVE.A”)

Python三角洲生活表屬性<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#python-delta-live-tables-properties" title="">

下麵的表描述的選項和屬性可以指定與δ定義表和視圖時生活表:

@ table或@view

的名字

類型:str

一個可選的表或視圖的名稱。如果沒有定義,函數名稱用作表或視圖名稱。

評論

類型:str

一個可選描述表。

spark_conf

類型:dict

一個可選的火花配置列表這個查詢的執行。

table_properties

類型:dict

一個可選列表<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/properties.html">表屬性為表。

路徑

類型:str

一個可選的表數據的存儲位置。如果沒有設置,係統將默認存儲位置。

partition_cols

類型:一個集合str

一個可選的集合,例如,一個列表的一個或多個列用於分區表。

模式

類型:strStructType

一個可選的模式定義為表。模式可以被定義為一個SQL DDL字符串,或PythonStructType

臨時

類型:bool

創建一個臨時表。不持續這個表的元數據。

默認值是“假”。

表或視圖的定義

def <函數名> ()

一個Python函數,定義了數據集。如果的名字參數沒有設置,那麼<函數名>用作目標數據集名稱。

查詢

火花SQL語句返回一個火花數據集或考拉DataFrame。

使用dlt.read ()spark.table ()執行一個完整的閱讀從一個數據集定義在相同的管道。當使用spark.table ()函數來讀取數據集定義在相同的管道,預謀生活數據集名稱的關鍵字函數參數。例如,閱讀從一個數據集命名beplay体育app下载地址:

spark.table (“LIVEbeplay体育app下载地址.customers”)

您還可以使用spark.table ()函數來讀取注冊表的metastore省略生活關鍵字,選擇符合條件的表名與數據庫名稱:

spark.table (“salesbeplay体育app下载地址.customers”)

使用dlt.read_stream ()執行流讀取數據集定義在相同的管道。

使用spark.sql函數定義創建SQL查詢返回的數據集。

使用<一個class="reference external" href="//www.eheci.com/api-docs/python/pyspark/latest/pyspark.sql/dataframe.html">PySpark與Python語法來定義三角洲生活表查詢。

預期

@expect(“描述”、“約束”)

聲明一個數據質量約束了描述。如果違反了期望,一行包含目標數據集的行。

@expect_or_drop(“描述”、“約束”)

聲明一個數據質量約束了描述。如果連續違反了期望,把目標數據集的行。

@expect_or_fail(“描述”、“約束”)

聲明一個數據質量約束了描述。如果連續違反了期望,立即停止執行。

@expect_all(期望)

聲明一個或多個數據質量約束。預期Python字典,關鍵是期望描述和價值期望約束。如果連續違反任何預期,包括在目標數據集的行。

@expect_all_or_drop(期望)

聲明一個或多個數據質量約束。預期Python字典,關鍵是期望描述和價值期望約束。如果連續違反任何預期,從目標數據集的行。

@expect_all_or_fail(期望)

聲明一個或多個數據質量約束。預期Python字典,關鍵是期望描述和價值期望約束。如果連續違反任何期望,立即停止執行。

改變δ生活表中數據獲取與Python<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#change-data-capture-with-python-in-delta-live-tables" title="">

使用apply_changes ()函數在Python API使用三角洲住表疾病預防控製中心的功能。三角洲生活表Python疾控中心還提供了接口<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-target-fn">create_streaming_table ()函數。您可以使用此函數創建所需的目標表apply_changes ()函數。

apply_changes(目標=“<目標表>”,=“<數據源>”,=(“key1”,“key2”,“keyN”),sequence_by=“< sequence-column >”,ignore_null_updates=,apply_as_deletes=沒有一個,apply_as_truncates=沒有一個,column_list=沒有一個,except_column_list=沒有一個,stored_as_scd_type=<類型>,track_history_column_list=沒有一個,track_history_except_column_list=沒有一個)

請注意

的默認行為插入更新事件是插入疾控中心事件從源:更新任何目標表中的行匹配指定的鍵(s)或時插入一個新行匹配的目標表中的記錄不存在。處理的刪除可以指定的事件應用作為刪除條件。

重要的

你必須聲明一個目標流表應用更改。您可以選擇指定目標表的模式。當指定的模式apply_changes目標表,你還必須包括__START_AT__END_AT列有相同的數據類型sequence_by字段。

看到<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/cdc.html">改變數據獲取與三角洲生活表

參數

目標

類型:str

要更新的表的名稱。您可以使用<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-target-fn">create_streaming_table ()函數執行之前創建目標表apply_changes ()函數。

這個參數是必需的。

類型:str

數據源包含疾病預防控製中心記錄。

這個參數是必需的。

類型:列表

列或列的組合唯一地標識源數據中的一行。這是用來確定哪些疾病預防控製中心事件適用於目標表中的特定記錄。

您可以指定:

  • 字符串的列表:["標識",“orderId”)

  • 火花SQL的列表坳()功能:[坳(“標識”),坳(“orderId”)

參數坳()函數不能包括限定符。例如,您可以使用坳(標識),但你不能使用坳(source.userId)

這個參數是必需的。

sequence_by

類型:str坳()

疾控中心的列名稱指定邏輯順序事件源數據。三角洲生活表使用這個序列處理變更的事件到達的順序。

您可以指定:

  • 一個字符串:“sequenceNum”

  • 一個火花的SQL坳()功能:坳(“sequenceNum”)

參數坳()函數不能包括限定符。例如,您可以使用坳(標識),但你不能使用坳(source.userId)

這個參數是必需的。

ignore_null_updates

類型:bool

允許攝入更新包含目標列的一個子集。當一個事件匹配現有的行和疾控中心ignore_null_updates真正的,列將保留現有的價值目標。這也適用於嵌套列的值。當ignore_null_updates,現有的值將被覆蓋值。

這個參數是可選的。

默認值是

apply_as_deletes

類型:strexpr ()

指定當事件應該被視為一個疾控中心刪除而不是插入。處理無序的數據,刪除行暫時保留作為一個墓碑在底層三角洲表,並創建一個視圖metastore過濾掉這些墓碑。保留時間間隔可以配置了pipelines.cdc.tombstoneGCThresholdInSeconds表屬性

您可以指定:

  • 一個字符串:”操作=“刪除”

  • 一個火花的SQLexpr ()功能:expr(“操作=“刪除”)

這個參數是可選的。

apply_as_truncates

類型:strexpr ()

指定當一個中心事件應該被視為一個完整的表截斷。因為這一條款觸發目標表的完整截斷,它應該隻用於特定的用例需要此功能。

apply_as_truncates支持參數隻對SCD 1型。化合物2型不支持截斷。

您可以指定:

  • 一個字符串:”操作=“截斷”

  • 一個火花的SQLexpr ()功能:expr(“操作=“截斷”)

這個參數是可選的。

column_listexcept_column_list

類型:列表

列的一個子集,包括在目標表中。使用column_list指定列的完整列表包括。使用except_column_list排除指定列。你可以申報價值作為一個字符串列表,或引發SQL坳()功能:

  • column_list=["標識",“名稱”,“城市”)

  • column_list=[坳(“標識”),坳(“名字”),坳(“城市”)]

  • except_column_list=["操作",“sequenceNum”)

  • except_column_list=[坳(“操作”),坳(“sequenceNum”)

參數坳()函數不能包括限定符。例如,您可以使用坳(標識),但你不能使用坳(source.userId)

這個參數是可選的。

默認是在目標表時沒有包含所有列column_listexcept_column_list參數傳遞給函數。

stored_as_scd_type

類型:strint

是否保存記錄SCD 1型或SCD 2型。

設置為1化合物1型或2化合物2型。

這一條款是可選的。

缺省值是SCD 1型。

track_history_column_listtrack_history_except_column_list

類型:列表

輸出列的子集被跟蹤目標表的曆史。當pipelines.enableTrackHistory設置、使用track_history_column_list指定列被跟蹤的完整列表。使用track_history_except_column_list指定的列被排除在跟蹤。你可以申報價值作為一個字符串列表,或引發SQL坳()功能:-track_history_column_list=["標識",“名稱”,“城市”)。- - - - - -track_history_column_list=[坳(“標識”),坳(“名字”),坳(“城市”)]- - - - - -track_history_except_column_list=["操作",“sequenceNum”)- - - - - -track_history_except_column_list=[坳(“操作”),坳(“sequenceNum”)

參數坳()函數不能包括限定符。例如,您可以使用坳(標識),但你不能使用坳(source.userId)

這個參數是可選的。

默認是在目標表時沒有包含所有列track_history_column_listtrack_history_except_column_list參數傳遞給函數。

使用這些參數,必須設置pipelines.enableTrackHistory在管道設置。否則,就會拋出一個異常。當pipelines.enableTrackHistory沒有設置,為每個輸入行生成一個曆史記錄。

為疾病預防控製中心創建一個目標表輸出<一個class="headerlink" href="//www.eheci.com/docs.gcp/delta-live-tables/#create-a-target-table-for-cdc-output" title="">

使用create_streaming_table ()函數創建的目標表apply_changes ()輸出記錄。

請注意

create_target_table ()create_streaming_live_table ()函數是棄用。磚建議更新現有代碼使用create_streaming_table ()函數。

create_streaming_table(的名字=“<表名稱>”,評論=“< >評論”spark_conf={“<鍵>”:“<價值”,“<鍵”:“< >價值”},table_properties={“<鍵>”:“< >價值”,“<鍵>”:“< >價值”},partition_cols=(“<劃分字段>”,“<劃分字段>”),路徑=“< storage-location-path >”,模式=“模式定義”)

參數

的名字

類型:str

表名。

這個參數是必需的。

評論

類型:str

一個可選描述表。

spark_conf

類型:dict

一個可選的火花配置列表這個查詢的執行。

table_properties

類型:dict

一個可選列表<一個class="reference internal" href="//www.eheci.com/docs.gcp/delta-live-tables/properties.html">表屬性為表。

partition_cols

類型:數組

一個可選的一列或多列列表用於分區表。

路徑

類型:str

一個可選的表數據的存儲位置。如果沒有設置,係統將默認存儲位置。

模式

類型:strStructType

一個可選的模式定義為表。模式可以被定義為一個SQL DDL字符串,或PythonStructType