我們要遷移到Azure三角洲住表作為數據工廠管道負載CSV文件和輸出δ表數據磚。
管道通過外部應用程序觸發需求哪些地方文件存儲文件夾,然後管道運行和過程。文件包含事務,我們的目的是刪除這些交易在最後一層,取而代之的是每個公司和新數據字段。
我們試著在許多方麵實現,但不明白apply_changes函數。在所有的示例和文檔假定舊數據包括隨著新的,這並非如此。我們想刪除一切在這最後一層,新到達的數據文件但遇到不同的錯誤。
我們認為我們封裝在一個方法從流讀取新的數據和最後一個表的數據幀,用類似下麵的結合。萎靡不振的兩個數據集,apply_changes知道刪除gld層:
def create_tmp_my_table (): @dlt。表(name = " tmp_my_table "臨時= True, table_properties ={“質量”:“黃金”,“三角洲。minReaderVersion”:“2”、“三角洲。minWriterVersion ": " 5 "}) def tmp_my_table(): #假設兩個完全相同的模式df_old = dlt.read .withColumn (“slv_my_old_data”) (“DeleteFlag”,點燃(1))df_stream = dlt.read_stream .withColumn (“slv_my_new_data”) (“DeleteFlag”,亮了(0))df_final = df_stream.union (df_old)返回(df_final) create_tmp_my_table (dlt)。create_target_table (name = " gld_my_table table_properties ={“質量”:“黃金”,“三角洲。minReaderVersion”:“2”、“三角洲。dlt minWriterVersion ": " 5 "})。apply_changes(目標=“gld_my表”,源=“tmp_my表”,鍵= [“Hash_Key”], apply_as_deletes = expr (“DeleteFlag = 1”), #刪除條件sequence_by =坳(“Load_Date”))
未能開始流tmp_cash_flows append模式或完整的模式。Append模式錯誤:結合流媒體和批處理DataFrames /不支持數據集
然後我們認為好的,我們隻讀舊的批處理數據,看看至少我們可以刪除:
def create_tmp_my_table (): @dlt。表(name = " tmp_my_table "臨時= True, table_properties ={“質量”:“黃金”,“三角洲。minReaderVersion”:“2”、“三角洲。minWriterVersion ": " 5 "}) def tmp_my_table(): #以前堅持的表不是流df = dlt.read .withColumn (“slv_my_old_data”) (“DeleteFlag”,點燃(1))返回(df) create_tmp_my_table (dlt)。create_target_table (name = " gld_my_table table_properties ={“質量”:“黃金”,“三角洲。minReaderVersion”:“2”、“三角洲。dlt minWriterVersion ": " 5 "})。apply_changes(目標=“gld_my表”,源=“tmp_my表”,鍵= [“Hash_Key”], apply_as_deletes = expr (“DeleteFlag = 1”), #刪除條件sequence_by =坳(“Load_Date”))
但是產生了另一個錯誤:
檢測到一個數據更新(例如xxxxxxx.snappy.parquet)在源表版本25。這是目前不支持。如果你想忽略更新,設置選項“ignoreChanges”到“真正的”。如果你想反映的數據更新,請重啟該查詢以全新的關卡目錄
這是添加更多的挑戰,仍在考慮如何處理但不會暴露在這裏為了不混合。
所以拆開,替換應如何妥善處理在三角洲住表嗎?有人某處有一個很好的例子嗎?
@Enric Llop:
當使用達美住表執行“拆開,替換”的操作,你想替換現有的數據與新的數據表中,有幾件事要記住。
首先,apply_changes函數用於從一個源表適用於目標表。源表可以是δ表或流DataFrame,而目標表必須是三角洲表。執行“拆開,替換”的操作,您可以創建一個臨時表,隻包含新數據,然後使用apply_changes apply_as_delete選項刪除目標表中的現有數據之前插入新數據。這裏有一個例子:
從三角洲。表從pyspark.sql進口*。功能導入點燃#創建臨時表隻有new_data = DeltaTable的新數據。forPath(火花,“路徑/ /新/數據”)new_data.createOrReplaceTempView (“tmp_new_data”) tmp_new_data =火花。sql (“SELECT *, 0 DeleteFlag tmp_new_data”) #創建目標表target_table = DeltaTable如果它不存在。forPath(火花,“路徑/ /目標/表”)target_table.createOrReplaceTempView (tmp_target_table) #應用更改目標表,刪除現有數據#和插入新的數據target_table.alias(“目標”)。合並(tmp_new_data.alias(“源”),“目標。Hash_Key =來源。Hash_Key”) .whenMatchedDelete(條件= "來源。DeleteFlag = 1”) .whenNotMatchedInsertAll () . execute ()
在本例中,我們創建一個臨時表tmp_new_data隻包含新數據,DeleteFlag列設置為0。我們還創建一個臨時視圖tmp_target_table指的是目標表。然後我們使用合並函數從tmp_new_data tmp_target_table應用更改。whenMatchedDelete選項刪除目標表的任何行DeleteFlag是1,有效地消除現有數據。從tmp_new_data whenNotMatchedInsertAll選項插入所有行不匹配任何目標表中的行。
注意,這個例子假設Hash_Key列作為表的主鍵。您可能需要調整合並函數匹配的聯接條件列在你的桌子上。還要注意,合並功能是Delta-specific操作,不得使用其他類型的表。