表刪除、更新和合並<一個class="headerlink" href="//www.eheci.com/docs/delta/#table-deletes-updates-and-merges" title="">

Delta Lake支持一些語句,以方便從Delta表中刪除數據和更新數據。

要了解Delta Lake中刪除和更新數據的概述和演示,請觀看這個YouTube視頻(54分鍾)。

有關從Delta Lake捕獲變化數據的更多信息,請觀看這段YouTube視頻(53分鍾)。

從表中刪除<一個class="headerlink" href="//www.eheci.com/docs/delta/#delete-from-a-table" title="">

可以從Delta表中刪除與謂詞匹配的數據。例如,在一個名為people10m或者一條路徑/ tmp /δ/ people-10m的值,刪除與人員對應的所有行生日之前的專欄1955,可執行以下命令:

刪除people10m在哪裏生日<“1955-01-01”刪除δ' /tmp/δ/-10在哪裏生日<“1955-01-01”

請注意

Python API在Databricks Runtime 6.1及以上版本中可用。

delta.tables進口pyspark.sql.functions進口deltaTableDeltaTableforPath火花“/ tmp /δ/ people-10m”使用sql格式的字符串聲明謂詞。deltaTable刪除"出生日期< '1955-01-01'"使用Spark SQL函數聲明謂詞。deltaTable刪除上校“生日”<“1960-01-01”

請注意

Scala API在Databricks Runtime 6.0及以上版本中可用。

進口ioδ_瓦爾deltaTableDeltaTableforPath火花“/ tmp /δ/ people-10m”//使用sql格式的字符串聲明謂詞。deltaTable刪除"出生日期< '1955-01-01'"進口orgapache火花sql功能_進口火花值得一提的_//使用Spark SQL函數和隱式來聲明謂詞。deltaTable刪除上校“生日”<“1955-01-01”

請注意

Java API可在Databricks Runtime 6.0及以上版本中使用。

進口io.delta.tables。*進口org.apache.spark.sql.functionsDeltaTabledeltaTableDeltaTableforPath火花“/ tmp /δ/ people-10m”);//使用sql格式的字符串聲明謂詞。deltaTable刪除"出生日期< '1955-01-01'");//使用Spark SQL函數聲明謂詞deltaTable刪除功能上校“生日”).lt功能點燃“1955-01-01”)));

看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-apidoc.html">Delta Lake api獲取詳細信息。

重要的

刪除從Delta表的最新版本中刪除數據,但不會從物理存儲中刪除數據,直到顯式地清空舊版本。看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-utility.html">真空獲取詳細信息。

提示

在可能的情況下,為分區的Delta表在分區列上提供謂詞,因為這樣的謂詞可以顯著加快操作。

更新表<一個class="headerlink" href="//www.eheci.com/docs/delta/#update-a-table" title="">

可以更新與Delta表中的謂詞匹配的數據。例如,在一個名為people10m或者一條路徑/ tmp /δ/ people-10m中的縮寫,以更改性別列從orgydF4y2BaF男性orgydF4y2Ba,可執行以下命令:

更新people10m性別“女”在哪裏性別“F”更新people10m性別“男”在哪裏性別“米”更新δ' /tmp/δ/-10性別“女”在哪裏性別“F”更新δ' /tmp/δ/-10性別“男”在哪裏性別“米”

請注意

Python API在Databricks Runtime 6.1及以上版本中可用。

delta.tables進口pyspark.sql.functions進口deltaTableDeltaTableforPath火花“/ tmp /δ/ people-10m”使用sql格式的字符串聲明謂詞。deltaTable更新條件"性別= 'F'"“性別”“女”使用Spark SQL函數聲明謂詞。deltaTable更新條件上校“性別”= =“米”“性別”點燃“男”

請注意

Scala API在Databricks Runtime 6.0及以上版本中可用。

進口ioδ_瓦爾deltaTableDeltaTableforPath火花“/ tmp /δ/ people-10m”//使用sql格式的字符串聲明謂詞。deltaTableupdateExpr"性別= 'F'"地圖“性別”->“女”進口orgapache火花sql功能_進口火花值得一提的_//使用Spark SQL函數和隱式來聲明謂詞。deltaTable更新上校“性別”= = =“M”地圖“性別”->點燃“男性”)));

請注意

Scala API在Databricks Runtime 6.0及以上版本中可用。

進口io.delta.tables。*進口org.apache.spark.sql.functions進口java.util.HashMapDeltaTabledeltaTableDeltaTableforPath火花“/數據/事件/”);//使用sql格式的字符串聲明謂詞。deltaTableupdateExpr"性別= 'F'"HashMap<字符串字符串>(){{“性別”“女”);}});//使用Spark SQL函數聲明謂詞deltaTable更新功能上校性別).情商“M”),HashMap<字符串>(){{“性別”功能點燃“男性”));}});

看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-apidoc.html">Delta Lake api獲取詳細信息。

提示

與刪除類似,使用分區上的謂詞,更新操作可以獲得顯著的加速。

使用merge將插入表<一個class="headerlink" href="//www.eheci.com/docs/delta/#upsert-into-a-table-using-merge" title="">

方法可以將源表、視圖或DataFrame中的數據插入到目標Delta表中合並SQL操作。Delta Lake支持插入、更新和刪除合並,並且它支持SQL標準之外的擴展語法,以方便高級用例。

假設您有一個名為people10mupdates或者源路徑/ tmp /δ/ people-10m-updates的目標表的新數據people10m或者目標路徑/ tmp /δ/ people-10m.其中一些新記錄可能已經出現在目標數據中。要合並新數據,您需要更新人員所在的行id已經存在並插入沒有匹配的新行嗎id是禮物。您可以執行以下命令:

合並people10m使用people10mupdatespeople10midpeople10mupdatesid匹配然後更新idpeople10mupdatesidfirstNamepeople10mupdatesfirstNamemiddleNamepeople10mupdatesmiddleNamepeople10mupdates性別people10mupdates性別生日people10mupdates生日ssnpeople10mupdatesssn工資people10mupdates工資匹配然後插入idfirstNamemiddleName性別生日ssn工資people10mupdatesidpeople10mupdatesfirstNamepeople10mupdatesmiddleNamepeople10mupdatespeople10mupdates性別people10mupdates生日people10mupdatesssnpeople10mupdates工資

有關語法的詳細信息,請參見

  • Databricks RuntimeX及以上:<一個class="reference internal" href="//www.eheci.com/docs/docs/spark/latest/spark-sql/language-manual/delta-merge-into.html">合並成

  • Databricks運行時5.5 LTS和6.x:<一個class="reference internal" href="//www.eheci.com/docs/docs/spark/2.x/spark-sql/language-manual/merge-into.html">Merge Into (Delta Lake on Databricks)

delta.tables進口deltaTablePeopleDeltaTableforPath火花“/ tmp /δ/ people-10m”deltaTablePeopleUpdatesDeltaTableforPath火花“/ tmp /δ/ people-10m-updates”dfUpdatesdeltaTablePeopleUpdatestoDF()deltaTablePeople別名“人”合並dfUpdates別名“更新”),”的人。id=更新s.id'whenMatchedUpdate“id”“updates.id”“firstName”“updates.firstName”“middleName”“updates.middleName”“姓”“updates.lastName”“性別”“updates.gender”“生日”“updates.birthDate”“ssn”“updates.ssn”“工資”“updates.salary”whenNotMatchedInsert“id”“updates.id”“firstName”“updates.firstName”“middleName”“updates.middleName”“姓”“updates.lastName”“性別”“updates.gender”“生日”“updates.birthDate”“ssn”“updates.ssn”“工資”“updates.salary”執行()
進口ioδ_進口orgapache火花sql功能_瓦爾deltaTablePeopleDeltaTableforPath火花“/ tmp /δ/ people-10m”瓦爾deltaTablePeopleUpdatesDeltaTableforPath火花“tmp /δ/ people-10m-updates”瓦爾dfUpdatesdeltaTablePeopleUpdatestoDF()deltaTablePeople作為“人”合並dfUpdates作為“更新”),”的人。id=更新s.id"whenMatchedupdateExpr地圖“id”->“updates.id”“firstName”->“updates.firstName”“middleName”->“updates.middleName”“姓”->“updates.lastName”“性別”->“updates.gender”“生日”->“updates.birthDate”“ssn”->“updates.ssn”“工資”->“updates.salary”))whenNotMatchedinsertExpr地圖“id”->“updates.id”“firstName”->“updates.firstName”“middleName”->“updates.middleName”“姓”->“updates.lastName”“性別”->“updates.gender”“生日”->“updates.birthDate”“ssn”->“updates.ssn”“工資”->“updates.salary”))執行()
進口io.delta.tables。*進口org.apache.spark.sql.functions進口java.util.HashMapDeltaTabledeltaTableDeltaTableforPath火花“/ tmp /δ/ people-10m”數據集<>dfUpdates火花“δ”).負載“/ tmp /δ/ people-10m-updates”deltaTable作為“人”合並dfUpdates作為“更新”),”的人。id=更新s.id"whenMatched()updateExprHashMap<字符串字符串>(){{“id”“updates.id”);“firstName”“updates.firstName”);“middleName”“updates.middleName”);“姓”“updates.lastName”);“性別”“updates.gender”);“生日”“updates.birthDate”);“ssn”“updates.ssn”);“工資”“updates.salary”);}})whenNotMatched()insertExprHashMap<字符串字符串>(){{“id”“updates.id”);“firstName”“updates.firstName”);“middleName”“updates.middleName”);“姓”“updates.lastName”);“性別”“updates.gender”);“生日”“updates.birthDate”);“ssn”“updates.ssn”);“工資”“updates.salary”);}})執行();

看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-apidoc.html">Delta Lake apiScala, Java和Python語法細節。

Delta Lake合並操作通常需要對源數據進行兩次傳遞。如果源數據包含不確定的表達式,多次傳遞源數據可能產生不同的行,從而導致不正確的結果。不確定表達式的一些常見示例包括當前日期而且current_timestamp功能。如果無法避免使用非確定性函數,請考慮將源數據保存到存儲器中,例如保存為臨時Delta表。緩存源數據可能無法解決這個問題,因為緩存失效可能導致源數據部分或全部重新計算(例如,當集群在向下擴展時丟失一些執行程序時)。

操作語義<一個class="headerlink" href="//www.eheci.com/docs/delta/#operation-semantics" title="">

這裏有一個詳細的描述合並編程操作。

  • 可以有任意數量的whenMatched而且whenNotMatched條款。

    請注意

    在Databricks Runtime 7.2及以下版本中,合並最多可以有2個whenMatched從句和最多1個whenNotMatched條款。

  • whenMatched子句在源行與基於匹配條件的目標表行匹配時執行。這些子句具有以下語義。

    • whenMatched子句最多隻能有一個更新和一個刪除行動。的更新行動合並僅更新指定的列(類似於更新操作)匹配的目標行。的刪除操作刪除匹配的行。

    • 每一個whenMatched子句可以有可選條件。如果此子句條件存在,則更新orgydF4y2Ba刪除僅當子句條件為true時,才對任何匹配的源-目標行對執行操作。

    • 如果有多個whenMatched子句,則它們將按照指定的順序求值。所有whenMatched子句,除了最後一個,都必須有條件。

    • 如果沒有whenMatched對於匹配合並條件的源和目標行對,條件計算為true,則目標行保持不變。

    • 若要使用源數據集的相應列更新目標Delta表的所有列,請使用whenMatched(…).updateAll ().這相當於:

      whenMatched(…)。updateExpr地圖“col1”->“source.col1”“col2”->“source.col2”…))

      為目標Delta表的所有列。因此,此操作假定源表與目標表中的列具有相同的列,否則查詢將拋出分析錯誤。

      請注意

      當啟用自動模式遷移時,此行為將更改。看到<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。

  • whenNotMatched當源行與基於匹配條件的任何目標行不匹配時,將執行子句。這些子句具有以下語義。

    • whenNotMatched從句隻能有the插入行動。根據指定的列和相應的表達式生成新行。您不需要指定目標表中的所有列。對於未指定的目標列,被插入。

      請注意

      在Databricks Runtime 6.5及以下版本中,必須為目標表中的所有列提供插入行動。

    • 每一個whenNotMatched子句可以有可選條件。如果存在子句條件,則僅當源行條件為真時才插入源行。否則,源列將被忽略。

    • 如果有多個whenNotMatched子句,則它們將按照指定的順序求值。所有whenNotMatched子句,除了最後一個,都必須有條件。

    • 若要將目標Delta表的所有列與源數據集的相應列插入,請使用whenNotMatched(…).insertAll ().這相當於:

      whenNotMatched(…)。insertExpr地圖“col1”->“source.col1”“col2”->“source.col2”…))

      為目標Delta表的所有列。因此,此操作假定源表與目標表中的列具有相同的列,否則查詢將拋出分析錯誤。

      請注意

      當啟用自動模式遷移時,此行為將更改。看到<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。

重要的

  • 一個合並如果源數據集的多行匹配,並且merge嚐試更新目標Delta表的同行,則操作可能失敗。根據merge的SQL語義,這樣的更新操作是不明確的,因為不清楚應該使用哪個源行來更新匹配的目標行。可以對源表進行預處理,以消除多個匹配的可能性。看到<一個class="reference internal" href="//www.eheci.com/docs/delta/#write-change-data-into-a-delta-table">變更數據捕獲示例它展示了如何預處理更改數據集(即源數據集),以便在將更改應用到目標Delta表之前僅保留每個鍵的最新更改。

  • 一個合並如果源數據集不確定,操作可能產生不正確的結果。這是因為合並可以對源數據集執行兩次掃描,如果兩次掃描產生的數據不同,則對表所做的最終更改可能不正確。來源中的不確定性可以以多種方式出現。其中一些是如下:

    • 從非delta表讀取。例如,從CSV表中讀取,其中底層文件可以在多次掃描之間更改。

    • 使用非確定性操作。例如,Dataset.filter ()使用當前時間戳過濾數據的操作可能在多次掃描之間產生不同的結果。

  • 您可以應用SQL合並隻有當視圖被定義為時,才能對SQL VIEW進行操作創建視圖viewName作為選擇deltaTable

請注意

在Databricks Runtime 7.3 LTS及以上版本中,當無條件刪除匹配時,允許多個匹配(因為即使有多個匹配,無條件刪除也不會有歧義)。

模式驗證<一個class="headerlink" href="//www.eheci.com/docs/delta/#schema-validation" title="">

合並自動驗證由插入和更新表達式生成的數據模式是否與表的模式兼容。它使用以下規則來確定是否合並操作兼容:

  • 更新而且插入動作,指定的目標列必須存在於目標Delta表中。

  • updateAll而且insertAll動作,源數據集必須有目標Delta表的所有列。源數據集可以有額外的列,但它們會被忽略。

  • 對於所有操作,如果產生目標列的表達式生成的數據類型與目標Delta表中的相應列不同,合並嚐試將它們強製轉換為表中的類型。

自動模式演化<一個class="headerlink" href="//www.eheci.com/docs/delta/#automatic-schema-evolution" title="">

請注意

模式演化合並已在Databricks Runtime 6.6及以上版本提供。

默認情況下,updateAll而且insertAll為目標Delta表中的所有列分配與源數據集同名的列。源數據集中與目標表中的列不匹配的任何列都將被忽略。然而,在某些用例中,需要自動向目標Delta表添加源列。操作期間自動更新表架構合並操作updateAll而且insertAll(至少其中一個),您可以設置Spark會話配置spark.databricks.delta.schema.autoMerge.enabled真正的在運行合並操作。

請注意

  • 模式演化僅在兩者之一時發生updateAll更新)或insertAll插入)行動,或兩者兼有。

  • 更新而且插入操作不能顯式地引用目標表中不存在的目標列(即使存在)updateAllorgydF4y2BainsertAll作為從句之一)。請看下麵的例子。

請注意

在Databricks Runtime 7.4及以下版本中,合並僅支持頂級列的模式演化,而不支持嵌套列的模式演化。

這裏有幾個例子說明合並有和沒有模式演化的操作。

查詢(Scala)

沒有模式演進的行為(默認)

模式演變的行為

目標列:鍵,價值

源列:鍵,值,newValue

targetDeltaTable別名“t”合並sourceDataFrame別名“s”),"t.key = s.key"whenMatched()。updateAll()whenNotMatched()。insertAll()執行()

表模式保持不變;隻列關鍵價值更新/插入。

表模式更改為(關鍵值,newValue)updateAll更新列價值而且newValue,insertAll插入行(關鍵值,newValue)

目標列:鍵,oldValue

源列:鍵,newValue

targetDeltaTable別名“t”合並sourceDataFrame別名“s”),"t.key = s.key"whenMatched()。updateAll()whenNotMatched()。insertAll()執行()

updateAll而且insertAll操作拋出錯誤,因為目標列oldValue不在源頭。

表模式更改為(關鍵oldValue,newValue)updateAll更新列關鍵而且newValue離開oldValue保持不變,insertAll插入行(關鍵空,newValue)(即,oldValue插入為).

目標列:鍵,oldValue

源列:鍵,newValue

targetDeltaTable別名“t”合並sourceDataFrame別名“s”),"t.key = s.key"whenMatched()。更新地圖“newValue”->上校“s.newValue”)))whenNotMatched()。insertAll()執行()

更新因為列拋出錯誤newValue目標表中不存在。

更新仍然拋出錯誤,因為列newValue目標表中不存在。

目標列:鍵,oldValue

源列:鍵,newValue

targetDeltaTable別名“t”合並sourceDataFrame別名“s”),"t.key = s.key"whenMatched()。updateAll()whenNotMatched()。插入地圖“關鍵”->上校“s.key”),“newValue”->上校“s.newValue”)))執行()

插入因為列拋出錯誤newValue目標表中不存在。

插入仍然拋出作為列的錯誤newValue目標表中不存在。

包含結構數組的模式的特殊注意事項<一個class="headerlink" href="//www.eheci.com/docs/delta/#special-considerations-for-schemas-that-contain-arrays-of-structs" title="">

δ合並支持按名稱解析結構字段,並為結構數組演進模式。啟用模式進化後,目標表模式將為結構數組進化,這也適用於數組內的任何嵌套結構。

請注意

該特性在Databricks Runtime 9.1及以上版本中可用。對於Databricks Runtime 9.0及以下版本,隱式Spark強製轉換用於結構數組,以按位置解析結構字段,並且合並操作的效果與數組中結構的模式演化不一致。

下麵是幾個例子,說明合並操作對結構數組的影響,包括模式演化和不包括模式演化。

源模式

目標模式

沒有模式演進的行為(默認)

模式演變的行為

數組>

數組>

表模式保持不變。列將按名稱解析並更新或插入。

表模式保持不變。列將按名稱解析並更新或插入。

數組>

數組>

更新而且插入拋出錯誤是因為c而且d目標表中不存在。

表架構更改為array>。c而且d插入為用於目標表中的現有項。更新而且插入用填充源表中的條目一個鑄造成字符串和b作為

數組>>

數組>>

更新而且插入拋出錯誤是因為d目標表中不存在。

目標表架構更改為array>>。d插入為用於目標表中的現有項。

性能調優<一個class="headerlink" href="//www.eheci.com/docs/delta/#performance-tuning" title="">

您可以使用以下方法減少合並所花費的時間:

  • 減少匹配的搜索空間:默認為合並操作將搜索整個Delta表以查找源表中的匹配項。加快速度的一種方法合並是通過在匹配條件中添加已知約束來縮小搜索空間。例如,假設您有一個表,該表是由國家而且日期你想用合並更新最後一天和特定國家的信息。添加條件

    事件日期當前日期()事件國家“美國”

    將使查詢更快,因為它隻在相關分區中查找匹配。此外,它還將減少與其他並發操作發生衝突的機會。看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/concurrency-control.html">並發控製欲知詳情。

  • 緊湊的文件:如果數據存儲在許多小文件中,讀取數據以搜索匹配可能會變慢。您可以將小文件壓縮成大文件以提高讀吞吐量。看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/best-practices.html">緊湊的文件獲取詳細信息。

  • 為寫操作控製shuffle分區:合並操作多次打亂數據以計算和寫入更新的數據。用於shuffle的任務數量由Spark會話配置控製spark.sql.shuffle.partitions.設置該參數不僅可以控製並行度,還可以決定輸出文件的數量。增大該值會增加並行度,但也會生成更多的小數據文件。

  • 啟用優化的寫入:對於分區表,合並可以產生的小文件數量遠遠大於shuffle分區的數量。這是因為每個shuffle任務都可以在多個分區中寫入多個文件,這可能成為性能瓶頸。您可以通過啟用來減少文件數量<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/auto-optimize.html">優化的寫

請注意

在Databricks Runtime 7.4及以上版本中,<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/auto-optimize.html">優化的寫自動啟用。合並對分區表的操作。

  • 調優表中的文件大小:在Databricks運行時8.2及以上版本中,Databricks可以自動檢測Delta表是否有頻繁合並重寫文件的操作,可能會選擇減少已重寫文件的大小,以預期將來會進一步重寫文件。請參閱有關<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/file-mgmt.html">調優文件大小獲取詳細信息。

  • 低Shuffle合並:在Databricks Runtime 9.0及以上版本,<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/low-shuffle-merge.html">低Shuffle合並的優化實現合並這為大多數常見工作負載提供了更好的性能。此外,它還保留了現有的數據布局優化,例如<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/file-mgmt.html">z值在未修改的數據上。

合並的例子<一個class="headerlink" href="//www.eheci.com/docs/delta/#merge-examples" title="">

下麵是一些如何使用的例子合並在不同的場景中。

寫入Delta表時重複數據刪除<一個class="headerlink" href="//www.eheci.com/docs/delta/#data-deduplication-when-writing-into-delta-tables" title="">

一個常見的ETL用例是通過將日誌附加到一個表中來收集到Delta表中。但是,源通常會生成重複的日誌記錄,需要後續的重複數據刪除步驟來處理它們。與合並時,可避免插入重複的記錄。

合並日誌使用newDedupedLogs日誌uniqueIdnewDedupedLogsuniqueId匹配然後插入
deltaTable別名“日誌”合並newDedupedLogs別名“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId"whenNotMatchedInsertAll()執行()
deltaTable作為“日誌”合並newDedupedLogs作為“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId"whenNotMatched()insertAll()執行()
deltaTable作為“日誌”合並newDedupedLogs作為“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId"whenNotMatched()insertAll()執行();

請注意

包含新日誌的數據集本身需要重複數據刪除。通過merge的SQL語義,它將新數據與表中的現有數據進行匹配並重複刪除,但如果新數據集中有重複數據,則插入它。因此,在合並到表之前,要對新數據進行重複數據刪除。

如果您知道可能隻會在幾天內得到重複的記錄,那麼您可以通過按日期對表進行分區,然後指定要匹配的目標表的日期範圍,從而進一步優化查詢。

合並日誌使用newDedupedLogs日誌uniqueIdnewDedupedLogsuniqueId日誌日期>當前日期()-時間間隔7匹配newDedupedLogs日期>當前日期()-時間間隔7然後插入
deltaTable別名“日誌”合並newDedupedLogs別名“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()-時間間隔7天"whenNotMatchedInsertAll“newDedupedLogs。日期>當前日期()-時間間隔7天"執行()
deltaTable作為“日誌”).合並newDedupedLogs作為“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()-時間間隔7天"whenNotMatched“newDedupedLogs。日期>當前日期()-時間間隔7天"insertAll()執行()
deltaTable作為“日誌”).合並newDedupedLogs作為“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()-時間間隔7天"whenNotMatched“newDedupedLogs。日期>當前日期()-時間間隔7天"insertAll()執行();

這比前一個命令更有效,因為它隻查找最近7天的日誌中的副本,而不是整個表。此外,您可以使用這種僅插入的合並和結構化流來執行連續的重複數據刪除日誌。

  • 在流查詢中,可以使用合並操作foreachBatch使用重複數據刪除將任何流數據連續寫入Delta表。參見以下內容<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-in-streaming">流的例子欲知更多有關foreachBatch

  • 在另一個流查詢中,您可以連續地從這個Delta表中讀取重複數據刪除後的數據。這是可能的,因為僅插入合並僅向Delta表追加新數據。

請注意

僅插入合並在Databricks Runtime 6.2及以上版本中被優化為僅追加數據。在Databricks Runtime 6.1及以下版本中,僅插入合並操作的寫操作不能作為流讀取。

緩慢改變數據(SCD)對Delta表的類型2操作<一個class="headerlink" href="//www.eheci.com/docs/delta/#slowly-changing-data-scd-type-2-operation-into-delta-tables" title="">

另一個常見操作是SCD Type 2,它維護維度表中每個鍵的所有更改的曆史記錄。這樣的操作需要更新現有行,將鍵的先前值標記為舊值,並將新行插入為最新值。給定一個包含更新的源表和包含維度數據的目標表,可以用合並

下麵是維護客戶地址曆史以及每個地址的活動日期範圍的具體示例。當客戶的地址需要更新時,必須將以前的地址標記為當前地址,更新其活動日期範圍,並將新地址添加為當前地址。

SCD類型2使用合並筆記本<一個class="headerlink" href="//www.eheci.com/docs/delta/#scd-type-2-using-merge-notebook" title="">

將更改數據寫入Delta表<一個class="headerlink" href="//www.eheci.com/docs/delta/#write-change-data-into-a-delta-table" title="">

與SCD類似,另一個常見的用例(通常稱為更改數據捕獲(CDC))是將從外部數據庫生成的所有數據更改應用到Delta表中。換句話說,應用於外部表的一組更新、刪除和插入需要應用於Delta表。你可以用合並如下。

使用MERGE筆記本記錄變更數據<一個class="headerlink" href="//www.eheci.com/docs/delta/#write-change-data-using-merge-notebook" title="">

Upsert從流查詢使用foreachBatch

你可以使用的組合合並而且foreachBatch(見<一個class="reference external" href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">foreachbatch(獲取更多信息)將複雜的upserts從流查詢寫入Delta表。例如:

  • 在更新模式下寫入流聚合:這比完全模式更有效率。

  • 將數據庫更改流寫入Delta表:<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-in-cdc">合並查詢,用於寫入更改數據可用於foreachBatch來連續地對Delta表應用更改流。

  • 使用重複數據刪除功能將流數據寫入Delta表:<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-in-dedup">用於重複數據刪除的僅插入合並查詢可用於foreachBatch使用自動重複數據刪除功能,連續向Delta表寫入數據(帶重複)。

請注意

  • 確保你的合並聲明內foreachBatch是冪等的,因為重新啟動流查詢可以對同一批數據應用多次操作。

  • 合並用於foreachBatch,流查詢的輸入數據速率(通過StreamingQueryProgress在筆記本速率圖中可見)可以報告為數據在源處生成的實際速率的倍數。這是因為合並多次讀取輸入數據,導致輸入指標相乘。如果這是一個瓶頸,您可以在此之前緩存批DataFrame合並然後將其解緩存合並

使用merge和foreachBatch notebook在更新模式下寫入流聚合<一個class="headerlink" href="//www.eheci.com/docs/delta/#write-streaming-aggregates-in-update-mode-using-merge-and-foreachbatch-notebook" title="">