表刪除、更新和合並<一個class="headerlink" href="//www.eheci.com/docs/delta/#table-deletes-updates-and-merges" title="">
Delta Lake支持一些語句,以方便從Delta表中刪除數據和更新數據。
要了解Delta Lake中刪除和更新數據的概述和演示,請觀看這個YouTube視頻(54分鍾)。
有關從Delta Lake捕獲變化數據的更多信息,請觀看這段YouTube視頻(53分鍾)。
從表中刪除<一個class="headerlink" href="//www.eheci.com/docs/delta/#delete-from-a-table" title="">
可以從Delta表中刪除與謂詞匹配的數據。例如,在一個名為people10m
或者一條路徑/ tmp /δ/ people-10m
的值,刪除與人員對應的所有行生日
之前的專欄1955
,可執行以下命令:
刪除從people10m在哪裏生日<“1955-01-01”刪除從δ.' /tmp/δ/人-10米`在哪裏生日<“1955-01-01”
請注意
Python API在Databricks Runtime 6.1及以上版本中可用。
從delta.tables進口*從pyspark.sql.functions進口*deltaTable=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)使用sql格式的字符串聲明謂詞。deltaTable.刪除("出生日期< '1955-01-01'")使用Spark SQL函數聲明謂詞。deltaTable.刪除(上校(“生日”)<“1960-01-01”)
請注意
Scala API在Databricks Runtime 6.0及以上版本中可用。
進口io.δ.表._瓦爾deltaTable=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)//使用sql格式的字符串聲明謂詞。deltaTable.刪除("出生日期< '1955-01-01'")進口org.apache.火花.sql.功能._進口火花.值得一提的._//使用Spark SQL函數和隱式來聲明謂詞。deltaTable.刪除(上校(“生日”)<“1955-01-01”)
請注意
Java API可在Databricks Runtime 6.0及以上版本中使用。
進口io.delta.tables。*;進口org.apache.spark.sql.functions;DeltaTabledeltaTable=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”);//使用sql格式的字符串聲明謂詞。deltaTable.刪除("出生日期< '1955-01-01'");//使用Spark SQL函數聲明謂詞deltaTable.刪除(功能.上校(“生日”).lt(功能.點燃(“1955-01-01”)));
看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-apidoc.html">Delta Lake api獲取詳細信息。
重要的
刪除
從Delta表的最新版本中刪除數據,但不會從物理存儲中刪除數據,直到顯式地清空舊版本。看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-utility.html">真空獲取詳細信息。
提示
在可能的情況下,為分區的Delta表在分區列上提供謂詞,因為這樣的謂詞可以顯著加快操作。
更新表<一個class="headerlink" href="//www.eheci.com/docs/delta/#update-a-table" title="">
可以更新與Delta表中的謂詞匹配的數據。例如,在一個名為people10m
或者一條路徑/ tmp /δ/ people-10m
中的縮寫,以更改性別
列從米
orgydF4y2BaF
來男性
orgydF4y2Ba女
,可執行以下命令:
更新people10m集性別=“女”在哪裏性別=“F”;更新people10m集性別=“男”在哪裏性別=“米”;更新δ.' /tmp/δ/人-10米`集性別=“女”在哪裏性別=“F”;更新δ.' /tmp/δ/人-10米`集性別=“男”在哪裏性別=“米”;
請注意
Python API在Databricks Runtime 6.1及以上版本中可用。
從delta.tables進口*從pyspark.sql.functions進口*deltaTable=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)使用sql格式的字符串聲明謂詞。deltaTable.更新(條件="性別= 'F'",集={“性別”:“女”})使用Spark SQL函數聲明謂詞。deltaTable.更新(條件=上校(“性別”)= =“米”,集={“性別”:點燃(“男”)})
請注意
Scala API在Databricks Runtime 6.0及以上版本中可用。
進口io.δ.表._瓦爾deltaTable=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)//使用sql格式的字符串聲明謂詞。deltaTable.updateExpr("性別= 'F'",地圖(“性別”->“女”)進口org.apache.火花.sql.功能._進口火花.值得一提的._//使用Spark SQL函數和隱式來聲明謂詞。deltaTable.更新(上校(“性別”)= = =“M”,地圖(“性別”->點燃(“男性”)));
請注意
Scala API在Databricks Runtime 6.0及以上版本中可用。
進口io.delta.tables。*;進口org.apache.spark.sql.functions;進口java.util.HashMap;DeltaTabledeltaTable=DeltaTable.forPath(火花,“/數據/事件/”);//使用sql格式的字符串聲明謂詞。deltaTable.updateExpr("性別= 'F'",新HashMap<字符串,字符串>(){{把(“性別”,“女”);}});//使用Spark SQL函數聲明謂詞deltaTable.更新(功能.上校(性別).情商(“M”),新HashMap<字符串,列>(){{把(“性別”,功能.點燃(“男性”));}});
看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-apidoc.html">Delta Lake api獲取詳細信息。
提示
與刪除類似,使用分區上的謂詞,更新操作可以獲得顯著的加速。
使用merge將插入表<一個class="headerlink" href="//www.eheci.com/docs/delta/#upsert-into-a-table-using-merge" title="">
方法可以將源表、視圖或DataFrame中的數據插入到目標Delta表中合並
SQL操作。Delta Lake支持插入、更新和刪除合並
,並且它支持SQL標準之外的擴展語法,以方便高級用例。
假設您有一個名為people10mupdates
或者源路徑/ tmp /δ/ people-10m-updates
的目標表的新數據people10m
或者目標路徑/ tmp /δ/ people-10m
.其中一些新記錄可能已經出現在目標數據中。要合並新數據,您需要更新人員所在的行id
已經存在並插入沒有匹配的新行嗎id
是禮物。您可以執行以下命令:
合並成people10m使用people10mupdates在people10m.id=people10mupdates.id當匹配然後更新集id=people10mupdates.id,firstName=people10mupdates.firstName,middleName=people10mupdates.middleName,姓=people10mupdates.姓,性別=people10mupdates.性別,生日=people10mupdates.生日,ssn=people10mupdates.ssn,工資=people10mupdates.工資當不匹配然後插入(id,firstName,middleName,姓,性別,生日,ssn,工資)值(people10mupdates.id,people10mupdates.firstName,people10mupdates.middleName,people10mupdates.姓,people10mupdates.性別,people10mupdates.生日,people10mupdates.ssn,people10mupdates.工資)
有關語法的詳細信息,請參見
Databricks RuntimeX及以上:<一個class="reference internal" href="//www.eheci.com/docs/docs/spark/latest/spark-sql/language-manual/delta-merge-into.html">合並成
Databricks運行時5.5 LTS和6.x:<一個class="reference internal" href="//www.eheci.com/docs/docs/spark/2.x/spark-sql/language-manual/merge-into.html">Merge Into (Delta Lake on Databricks)
從delta.tables進口*deltaTablePeople=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)deltaTablePeopleUpdates=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m-updates”)dfUpdates=deltaTablePeopleUpdates.toDF()deltaTablePeople.別名(“人”)\.合並(dfUpdates.別名(“更新”),”的人。id=更新s.id')\.whenMatchedUpdate(集={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\.whenNotMatchedInsert(值={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\.執行()
進口io.δ.表._進口org.apache.火花.sql.功能._瓦爾deltaTablePeople=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)瓦爾deltaTablePeopleUpdates=DeltaTable.forPath(火花,“tmp /δ/ people-10m-updates”)瓦爾dfUpdates=deltaTablePeopleUpdates.toDF()deltaTablePeople.作為(“人”).合並(dfUpdates.作為(“更新”),”的人。id=更新s.id").whenMatched.updateExpr(地圖(“id”->“updates.id”,“firstName”->“updates.firstName”,“middleName”->“updates.middleName”,“姓”->“updates.lastName”,“性別”->“updates.gender”,“生日”->“updates.birthDate”,“ssn”->“updates.ssn”,“工資”->“updates.salary”)).whenNotMatched.insertExpr(地圖(“id”->“updates.id”,“firstName”->“updates.firstName”,“middleName”->“updates.middleName”,“姓”->“updates.lastName”,“性別”->“updates.gender”,“生日”->“updates.birthDate”,“ssn”->“updates.ssn”,“工資”->“updates.salary”)).執行()
進口io.delta.tables。*;進口org.apache.spark.sql.functions;進口java.util.HashMap;DeltaTabledeltaTable=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)數據集<行>dfUpdates=火花.讀(“δ”).負載(“/ tmp /δ/ people-10m-updates”)deltaTable.作為(“人”).合並(dfUpdates.作為(“更新”),”的人。id=更新s.id").whenMatched().updateExpr(新HashMap<字符串,字符串>(){{把(“id”,“updates.id”);把(“firstName”,“updates.firstName”);把(“middleName”,“updates.middleName”);把(“姓”,“updates.lastName”);把(“性別”,“updates.gender”);把(“生日”,“updates.birthDate”);把(“ssn”,“updates.ssn”);把(“工資”,“updates.salary”);}}).whenNotMatched().insertExpr(新HashMap<字符串,字符串>(){{把(“id”,“updates.id”);把(“firstName”,“updates.firstName”);把(“middleName”,“updates.middleName”);把(“姓”,“updates.lastName”);把(“性別”,“updates.gender”);把(“生日”,“updates.birthDate”);把(“ssn”,“updates.ssn”);把(“工資”,“updates.salary”);}}).執行();
看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/delta-apidoc.html">Delta Lake apiScala, Java和Python語法細節。
Delta Lake合並操作通常需要對源數據進行兩次傳遞。如果源數據包含不確定的表達式,多次傳遞源數據可能產生不同的行,從而導致不正確的結果。不確定表達式的一些常見示例包括當前日期
而且current_timestamp
功能。如果無法避免使用非確定性函數,請考慮將源數據保存到存儲器中,例如保存為臨時Delta表。緩存源數據可能無法解決這個問題,因為緩存失效可能導致源數據部分或全部重新計算(例如,當集群在向下擴展時丟失一些執行程序時)。
操作語義<一個class="headerlink" href="//www.eheci.com/docs/delta/#operation-semantics" title="">
這裏有一個詳細的描述合並
編程操作。
可以有任意數量的
whenMatched
而且whenNotMatched
條款。請注意
在Databricks Runtime 7.2及以下版本中,
合並
最多可以有2個whenMatched
從句和最多1個whenNotMatched
條款。whenMatched
子句在源行與基於匹配條件的目標表行匹配時執行。這些子句具有以下語義。whenMatched
子句最多隻能有一個更新
和一個刪除
行動。的更新
行動合並
僅更新指定的列(類似於更新
操作)匹配的目標行。的刪除
操作刪除匹配的行。每一個
whenMatched
子句可以有可選條件。如果此子句條件存在,則更新
orgydF4y2Ba刪除
僅當子句條件為true時,才對任何匹配的源-目標行對執行操作。如果有多個
whenMatched
子句,則它們將按照指定的順序求值。所有whenMatched
子句,除了最後一個,都必須有條件。如果沒有
whenMatched
對於匹配合並條件的源和目標行對,條件計算為true,則目標行保持不變。若要使用源數據集的相應列更新目標Delta表的所有列,請使用
whenMatched(…).updateAll ()
.這相當於:whenMatched(…)。updateExpr(地圖(“col1”->“source.col1”,“col2”->“source.col2”,…))
為目標Delta表的所有列。因此,此操作假定源表與目標表中的列具有相同的列,否則查詢將拋出分析錯誤。
請注意
當啟用自動模式遷移時,此行為將更改。看到<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。
whenNotMatched
當源行與基於匹配條件的任何目標行不匹配時,將執行子句。這些子句具有以下語義。whenNotMatched
從句隻能有the插入
行動。根據指定的列和相應的表達式生成新行。您不需要指定目標表中的所有列。對於未指定的目標列,零
被插入。請注意
在Databricks Runtime 6.5及以下版本中,必須為目標表中的所有列提供
插入
行動。每一個
whenNotMatched
子句可以有可選條件。如果存在子句條件,則僅當源行條件為真時才插入源行。否則,源列將被忽略。如果有多個
whenNotMatched
子句,則它們將按照指定的順序求值。所有whenNotMatched
子句,除了最後一個,都必須有條件。若要將目標Delta表的所有列與源數據集的相應列插入,請使用
whenNotMatched(…).insertAll ()
.這相當於:whenNotMatched(…)。insertExpr(地圖(“col1”->“source.col1”,“col2”->“source.col2”,…))
為目標Delta表的所有列。因此,此操作假定源表與目標表中的列具有相同的列,否則查詢將拋出分析錯誤。
請注意
當啟用自動模式遷移時,此行為將更改。看到<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。
重要的
一個
合並
如果源數據集的多行匹配,並且merge嚐試更新目標Delta表的同行,則操作可能失敗。根據merge的SQL語義,這樣的更新操作是不明確的,因為不清楚應該使用哪個源行來更新匹配的目標行。可以對源表進行預處理,以消除多個匹配的可能性。看到<一個class="reference internal" href="//www.eheci.com/docs/delta/#write-change-data-into-a-delta-table">變更數據捕獲示例它展示了如何預處理更改數據集(即源數據集),以便在將更改應用到目標Delta表之前僅保留每個鍵的最新更改。一個
合並
如果源數據集不確定,操作可能產生不正確的結果。這是因為合並
可以對源數據集執行兩次掃描,如果兩次掃描產生的數據不同,則對表所做的最終更改可能不正確。來源中的不確定性可以以多種方式出現。其中一些是如下:從非delta表讀取。例如,從CSV表中讀取,其中底層文件可以在多次掃描之間更改。
使用非確定性操作。例如,
Dataset.filter ()
使用當前時間戳過濾數據的操作可能在多次掃描之間產生不同的結果。
您可以應用SQL
合並
隻有當視圖被定義為時,才能對SQL VIEW進行操作創建視圖viewName作為選擇*從deltaTable
.
請注意
在Databricks Runtime 7.3 LTS及以上版本中,當無條件刪除匹配時,允許多個匹配(因為即使有多個匹配,無條件刪除也不會有歧義)。
模式驗證<一個class="headerlink" href="//www.eheci.com/docs/delta/#schema-validation" title="">
合並
自動驗證由插入和更新表達式生成的數據模式是否與表的模式兼容。它使用以下規則來確定是否合並
操作兼容:
為
更新
而且插入
動作,指定的目標列必須存在於目標Delta表中。為
updateAll
而且insertAll
動作,源數據集必須有目標Delta表的所有列。源數據集可以有額外的列,但它們會被忽略。對於所有操作,如果產生目標列的表達式生成的數據類型與目標Delta表中的相應列不同,
合並
嚐試將它們強製轉換為表中的類型。
自動模式演化<一個class="headerlink" href="//www.eheci.com/docs/delta/#automatic-schema-evolution" title="">
請注意
模式演化合並
已在Databricks Runtime 6.6及以上版本提供。
默認情況下,updateAll
而且insertAll
為目標Delta表中的所有列分配與源數據集同名的列。源數據集中與目標表中的列不匹配的任何列都將被忽略。然而,在某些用例中,需要自動向目標Delta表添加源列。操作期間自動更新表架構合並
操作updateAll
而且insertAll
(至少其中一個),您可以設置Spark會話配置spark.databricks.delta.schema.autoMerge.enabled
來真正的
在運行合並
操作。
請注意
模式演化僅在兩者之一時發生
updateAll
(更新集*
)或insertAll
(插入*
)行動,或兩者兼有。更新
而且插入
操作不能顯式地引用目標表中不存在的目標列(即使存在)updateAll
orgydF4y2BainsertAll
作為從句之一)。請看下麵的例子。
請注意
在Databricks Runtime 7.4及以下版本中,合並
僅支持頂級列的模式演化,而不支持嵌套列的模式演化。
這裏有幾個例子說明合並
有和沒有模式演化的操作。
列 |
查詢(Scala) |
沒有模式演進的行為(默認) |
模式演變的行為 |
---|---|---|---|
目標列: 源列: |
targetDeltaTable.別名(“t”).合並(sourceDataFrame.別名(“s”),"t.key = s.key").whenMatched()。updateAll().whenNotMatched()。insertAll().執行()
|
表模式保持不變;隻列 |
表模式更改為 |
目標列: 源列: |
targetDeltaTable.別名(“t”).合並(sourceDataFrame.別名(“s”),"t.key = s.key").whenMatched()。updateAll().whenNotMatched()。insertAll().執行()
|
|
表模式更改為 |
目標列: 源列: |
targetDeltaTable.別名(“t”).合並(sourceDataFrame.別名(“s”),"t.key = s.key").whenMatched()。更新(地圖(“newValue”->上校(“s.newValue”))).whenNotMatched()。insertAll().執行()
|
|
|
目標列: 源列: |
targetDeltaTable.別名(“t”).合並(sourceDataFrame.別名(“s”),"t.key = s.key").whenMatched()。updateAll().whenNotMatched()。插入(地圖(“關鍵”->上校(“s.key”),“newValue”->上校(“s.newValue”))).執行()
|
|
|
包含結構數組的模式的特殊注意事項<一個class="headerlink" href="//www.eheci.com/docs/delta/#special-considerations-for-schemas-that-contain-arrays-of-structs" title="">
δ合並成
支持按名稱解析結構字段,並為結構數組演進模式。啟用模式進化後,目標表模式將為結構數組進化,這也適用於數組內的任何嵌套結構。
請注意
該特性在Databricks Runtime 9.1及以上版本中可用。對於Databricks Runtime 9.0及以下版本,隱式Spark強製轉換用於結構數組,以按位置解析結構字段,並且合並操作的效果與數組中結構的模式演化不一致。
下麵是幾個例子,說明合並操作對結構數組的影響,包括模式演化和不包括模式演化。
源模式 |
目標模式 |
沒有模式演進的行為(默認) |
模式演變的行為 |
---|---|---|---|
數組 |
數組 |
表模式保持不變。列將按名稱解析並更新或插入。 |
表模式保持不變。列將按名稱解析並更新或插入。 |
數組 |
數組 |
|
表架構更改為array |
數組 |
數組 |
|
目標表架構更改為array |
性能調優<一個class="headerlink" href="//www.eheci.com/docs/delta/#performance-tuning" title="">
您可以使用以下方法減少合並所花費的時間:
減少匹配的搜索空間:默認為
合並
操作將搜索整個Delta表以查找源表中的匹配項。加快速度的一種方法合並
是通過在匹配條件中添加已知約束來縮小搜索空間。例如,假設您有一個表,該表是由國家
而且日期
你想用合並
更新最後一天和特定國家的信息。添加條件事件.日期=當前日期()和事件.國家=“美國”
將使查詢更快,因為它隻在相關分區中查找匹配。此外,它還將減少與其他並發操作發生衝突的機會。看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/concurrency-control.html">並發控製欲知詳情。
緊湊的文件:如果數據存儲在許多小文件中,讀取數據以搜索匹配可能會變慢。您可以將小文件壓縮成大文件以提高讀吞吐量。看到<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/best-practices.html">緊湊的文件獲取詳細信息。
為寫操作控製shuffle分區:
合並
操作多次打亂數據以計算和寫入更新的數據。用於shuffle的任務數量由Spark會話配置控製spark.sql.shuffle.partitions
.設置該參數不僅可以控製並行度,還可以決定輸出文件的數量。增大該值會增加並行度,但也會生成更多的小數據文件。
啟用優化的寫入:對於分區表,
合並
可以產生的小文件數量遠遠大於shuffle分區的數量。這是因為每個shuffle任務都可以在多個分區中寫入多個文件,這可能成為性能瓶頸。您可以通過啟用來減少文件數量<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/auto-optimize.html">優化的寫.
請注意
在Databricks Runtime 7.4及以上版本中,<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/auto-optimize.html">優化的寫自動啟用。合並
對分區表的操作。
調優表中的文件大小:在Databricks運行時8.2及以上版本中,Databricks可以自動檢測Delta表是否有頻繁
合並
重寫文件的操作,可能會選擇減少已重寫文件的大小,以預期將來會進一步重寫文件。請參閱有關<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/file-mgmt.html">調優文件大小獲取詳細信息。低Shuffle合並:在Databricks Runtime 9.0及以上版本,<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/low-shuffle-merge.html">低Shuffle合並的優化實現
合並
這為大多數常見工作負載提供了更好的性能。此外,它還保留了現有的數據布局優化,例如<一個class="reference internal" href="//www.eheci.com/docs/docs/delta/optimizations/file-mgmt.html">z值在未修改的數據上。
合並的例子<一個class="headerlink" href="//www.eheci.com/docs/delta/#merge-examples" title="">
下麵是一些如何使用的例子合並
在不同的場景中。
寫入Delta表時重複數據刪除一個><一個class="headerlink" href="//www.eheci.com/docs/delta/#data-deduplication-when-writing-into-delta-tables" title="">
一個常見的ETL用例是通過將日誌附加到一個表中來收集到Delta表中。但是,源通常會生成重複的日誌記錄,需要後續的重複數據刪除步驟來處理它們。與合並
時,可避免插入重複的記錄。
合並成日誌使用newDedupedLogs在日誌.uniqueId=newDedupedLogs.uniqueId當不匹配然後插入*
deltaTable.別名(“日誌”).合並(newDedupedLogs.別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId")\.whenNotMatchedInsertAll()\.執行()
deltaTable.作為(“日誌”).合並(newDedupedLogs.作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId").whenNotMatched().insertAll().執行()
deltaTable.作為(“日誌”).合並(newDedupedLogs.作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId").whenNotMatched().insertAll().執行();
請注意
包含新日誌的數據集本身需要重複數據刪除。通過merge的SQL語義,它將新數據與表中的現有數據進行匹配並重複刪除,但如果新數據集中有重複數據,則插入它。因此,在合並到表之前,要對新數據進行重複數據刪除。
如果您知道可能隻會在幾天內得到重複的記錄,那麼您可以通過按日期對表進行分區,然後指定要匹配的目標表的日期範圍,從而進一步優化查詢。
合並成日誌使用newDedupedLogs在日誌.uniqueId=newDedupedLogs.uniqueId和日誌.日期>當前日期()-時間間隔7天當不匹配和newDedupedLogs.日期>當前日期()-時間間隔7天然後插入*
deltaTable.別名(“日誌”).合並(newDedupedLogs.別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()-時間間隔7天")\.whenNotMatchedInsertAll(“newDedupedLogs。日期>當前日期()-時間間隔7天")\.執行()
deltaTable.作為(“日誌”).合並(newDedupedLogs.作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()-時間間隔7天").whenNotMatched(“newDedupedLogs。日期>當前日期()-時間間隔7天").insertAll().執行()
deltaTable.作為(“日誌”).合並(newDedupedLogs.作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()-時間間隔7天").whenNotMatched(“newDedupedLogs。日期>當前日期()-時間間隔7天").insertAll().執行();
這比前一個命令更有效,因為它隻查找最近7天的日誌中的副本,而不是整個表。此外,您可以使用這種僅插入的合並和結構化流來執行連續的重複數據刪除日誌。
在流查詢中,可以使用合並操作
foreachBatch
使用重複數據刪除將任何流數據連續寫入Delta表。參見以下內容<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-in-streaming">流的例子欲知更多有關foreachBatch
.在另一個流查詢中,您可以連續地從這個Delta表中讀取重複數據刪除後的數據。這是可能的,因為僅插入合並僅向Delta表追加新數據。
請注意
僅插入合並在Databricks Runtime 6.2及以上版本中被優化為僅追加數據。在Databricks Runtime 6.1及以下版本中,僅插入合並操作的寫操作不能作為流讀取。
緩慢改變數據(SCD)對Delta表的類型2操作一個><一個class="headerlink" href="//www.eheci.com/docs/delta/#slowly-changing-data-scd-type-2-operation-into-delta-tables" title="">
另一個常見操作是SCD Type 2,它維護維度表中每個鍵的所有更改的曆史記錄。這樣的操作需要更新現有行,將鍵的先前值標記為舊值,並將新行插入為最新值。給定一個包含更新的源表和包含維度數據的目標表,可以用合並
.
下麵是維護客戶地址曆史以及每個地址的活動日期範圍的具體示例。當客戶的地址需要更新時,必須將以前的地址標記為當前地址,更新其活動日期範圍,並將新地址添加為當前地址。
將更改數據寫入Delta表一個><一個class="headerlink" href="//www.eheci.com/docs/delta/#write-change-data-into-a-delta-table" title="">
與SCD類似,另一個常見的用例(通常稱為更改數據捕獲(CDC))是將從外部數據庫生成的所有數據更改應用到Delta表中。換句話說,應用於外部表的一組更新、刪除和插入需要應用於Delta表。你可以用合並
如下。
Upsert從流查詢使用foreachBatch
你可以使用的組合合並
而且foreachBatch
(見<一個class="reference external" href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">foreachbatch一個>(獲取更多信息)將複雜的upserts從流查詢寫入Delta表。例如:
在更新模式下寫入流聚合:這比完全模式更有效率。
將數據庫更改流寫入Delta表:<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-in-cdc">合並查詢,用於寫入更改數據可用於
foreachBatch
來連續地對Delta表應用更改流。使用重複數據刪除功能將流數據寫入Delta表:<一個class="reference internal" href="//www.eheci.com/docs/delta/#merge-in-dedup">用於重複數據刪除的僅插入合並查詢可用於
foreachBatch
使用自動重複數據刪除功能,連續向Delta表寫入數據(帶重複)。
請注意
確保你的
合並
聲明內foreachBatch
是冪等的,因為重新啟動流查詢可以對同一批數據應用多次操作。當
合並
用於foreachBatch
,流查詢的輸入數據速率(通過StreamingQueryProgress
在筆記本速率圖中可見)可以報告為數據在源處生成的實際速率的倍數。這是因為合並
多次讀取輸入數據,導致輸入指標相乘。如果這是一個瓶頸,您可以在此之前緩存批DataFrame合並
然後將其解緩存合並
.