表流讀和寫作
達美湖與火花結構化流通過readstream
和writestream
。達美湖克服了許多與流係統和文件相關的許多限製,包括:
與低潛伏期攝入產生的小文件合並
使用多個流(或並發批處理作業)維護“完全符合”處理
有效地發現使用文件作為流的源時,哪些文件是新的
也可以看看生產中的結構化流。
Delta表作為來源
當您加載Delta表作為流源並將其用於流查詢時,查詢會處理表中存在的所有數據以及啟動流之後到達的任何新數據。
您可以將路徑和表作為流加載。
火花。readstream。格式((“三角洲”)。加載((“/tmp/delta/事件”)進口io。三角洲。隱含。_火花。readstream。三角洲((“/tmp/delta/事件”)
或者
進口io。三角洲。隱含。_火花。readstream。格式((“三角洲”)。桌子((“事件”)
限製輸入率
以下選項可用於控製微分:
maxfilespertrigger
:在每個微批次中要考慮多少個新文件。默認值為1000。maxbytespertrigger
:在每個微批次中處理多少數據。此選項設置了“軟最大”,這意味著批處理過程大約是此數量的數據,並且可能會在最小的輸入單元大於此限製的情況下使流查詢向前移動。如果您使用trigger.once
對於您的流媒體,此選項將被忽略。默認情況下,這不是設置。
如果您使用maxbytespertrigger
和這個結合maxfilespertrigger
,微批量處理數據,直到maxfilespertrigger
或者maxbytespertrigger
達到限製。
筆記
如果由於logretentionDuration
配置Delta Lake在處理中的流滯後處理與源表的最新可用交易曆史記錄相對應的數據,但並未使流。這可能導致數據刪除。
忽略更新和刪除
結構化流不處理不是附加的輸入,並且如果表格在表上用作源,則會引發異常。處理更改的主要策略有兩種無法自動傳播下遊的變化的策略:
您可以刪除輸出和檢查點,並從一開始就重新啟動流。
您可以設置這兩個選項中的任何一個:
忽略
:忽略在分區邊界上刪除數據的交易。無知
:重新處理更新是否必須在源表中重寫數據,因為數據更改操作,例如更新
,,,,合並進入
,,,,刪除
(在分區中)或覆蓋
。不變的行可能仍會發出,因此您的下遊消費者應該能夠處理重複。刪除不會在下遊傳播。無知
元素忽略
。因此,如果您使用無知
,您的流不會因刪除或對源表的更新而破壞。
例子
例如,假設您有一個表user_events
和日期
,,,,USER_EMAIL
, 和行動
由分區的列日期
。你從user_events
表,您需要由於GDPR而從中刪除數據。
當您在分區邊界上刪除時(即在哪裏
在分區列上),文件已經按值進行了細分,因此刪除僅將這些文件從元數據中刪除。因此,如果您隻想從某些分區中刪除數據,則可以使用:
火花。readstream。格式((“三角洲”)。選項((“忽略”,,,,“真的”)。加載((“/tmp/delta/user_events”)
但是,如果您必須根據USER_EMAIL
,那麼您將需要使用:
火花。readstream。格式((“三角洲”)。選項((“無知”,,,,“真的”)。加載((“/tmp/delta/user_events”)
如果您更新USER_EMAIL
與更新
語句,包含USER_EMAIL
有問題的重寫。當您使用時無知
,新記錄在同一文件中的所有其他未改變記錄下遊傳播。您的邏輯應該能夠處理這些傳入的重複記錄。
指定初始位置
筆記
此功能可在Databricks Runtime 7.3 LTS及更高版本上獲得。
您可以使用以下選項來指定三角洲湖流源的起點,而無需處理整個表。
起點
:從頭開始的三角洲湖版本。從此版本開始的所有表更改(包括)將由流源讀取。您可以從版本
列描述曆史命令輸出。在Databricks運行時7.4及以上,僅返回最新更改,請指定
最新的
。開始Timestamp
:從時間戳開始。時間戳或時間戳(包含)或之後的所有表更改將由流源讀取。之一:時間戳字符串。例如,
“ 2019-01-01T00:00:00.000Z”
。日期字符串。例如,
“ 2019-01-01”
。
您不能同時設置這兩個選項;您隻能使用其中之一。它們僅在啟動新的流詢問時才生效。如果已經啟動流媒體查詢,並且在其檢查點中記錄了進度,則這些選項將被忽略。
重要的
盡管您可以從指定版本或時間戳啟動流源,但流源的架構始終是增量表的最新架構。您必須確保在指定版本或時間戳之後,不會更改Delta表的不兼容架構。否則,流媒體源用不正確的模式讀取數據時可能會返回錯誤的結果。
Delta表作為水槽
您還可以使用結構化流媒體將數據寫入三角洲表。交易日誌使三角洲湖能夠確切地確保處理,即使還有其他流或批次查詢同時在桌子上運行。
筆記
三角洲湖真空
功能刪除所有未由Delta Lake管理的文件,但跳過任何以開始的目錄_
。您可以使用目錄結構(例如
。
指標
筆記
在Databricks運行時8.1及以上可用。
您可以找出字節數和尚待處理的文件數量流查詢過程作為基於數字
和基於數字
指標。如果您在筆記本中運行流,則可以在下麵看到這些指標原始數據在流查詢進度儀表板:
{“來源”:[[{“描述”:“ deltasource [file:/path/to/source]”,,,,“指標”:{“基於數字”:“ 3456”,,,,“基於數字”:“ 8”},,}這是給予的}
附加模式
默認情況下,流以附錄模式運行,該模式將新記錄添加到表中。
您可以使用路徑方法:
事件。writestream。格式((“三角洲”)。OutputMode((“附加”)。選項((“ checkpointlocation”,,,,“/tmp/delta/_checkpoints/”)。開始((“/delta/Events”)
事件。writestream。格式((“三角洲”)。OutputMode((“附加”)。選項((“ checkpointlocation”,,,,“/tmp/delta/events/_checkpoints/”)。開始((“/tmp/delta/事件”)進口io。三角洲。隱含。_事件。writestream。OutputMode((“附加”)。選項((“ checkpointlocation”,,,,“/tmp/delta/events/_checkpoints/”)。三角洲((“/tmp/delta/事件”)
或者totable
如下,Spark 3.1及更高版本的方法(Databricks運行時8.3及以上)。(在3.1之前的Spark版本(Databricks運行時8.2及以下),請使用桌子
方法。)
事件。writestream。格式((“三角洲”)。OutputMode((“附加”)。選項((“ checkpointlocation”,,,,“/tmp/delta/events/_checkpoints/”)。totable((“事件”)
事件。writestream。OutputMode((“附加”)。選項((“ checkpointlocation”,,,,“/tmp/delta/events/_checkpoints/”)。totable((“事件”)
完成模式
您還可以使用結構化流量用每批替換整個表。一個示例用例是使用聚合計算摘要:
((火花。readstream。格式((“三角洲”)。加載((“/tmp/delta/事件”)。通過...分組((“客戶ID”)。數數()。writestream。格式((“三角洲”)。OutputMode((“完全的”)。選項((“ checkpointlocation”,,,,“/tmp/delta/eventsbycustomer/_checkpoints/”)。開始((“/tmp/delta/eventsbycustomer”))
火花。readstream。格式((“三角洲”)。加載((“/tmp/delta/事件”)。通過...分組((“客戶ID”)。數數()。writestream。格式((“三角洲”)。OutputMode((“完全的”)。選項((“ checkpointlocation”,,,,“/tmp/delta/eventsbycustomer/_checkpoints/”)。開始((“/tmp/delta/eventsbycustomer”)
前麵的示例連續更新一個表,該表包含客戶的總數事件數量。
對於具有更寬鬆延遲要求的應用程序,您可以使用一次性觸發器保存計算資源。使用這些來在給定的時間表上更新摘要聚合表,僅處理自上次更新以來到達的新數據。
diadempotent桌子在foreachbatch
筆記
在Databricks運行時8.4及以上可用。
命令foreachbatch允許您指定流中查詢中任意轉換後,在每個微批次的輸出上執行的函數。這允許實施foreachbatch
可以將微批量輸出寫入一個或多個目標三角洲表目的地的功能。然而,foreachbatch
沒有使這些寫作的寫入嚐試,因為寫作嚐試缺乏有關批次是否重新執行的信息。例如,重讀失敗的批處理可能會導致重複的數據寫入。
為了解決這個問題,三角洲表支持以下DataFrameWriter
使寫作的選項:
txnappid
:您可以在每個的獨特字符串數據框架
寫。例如,您可以將流Query ID作為txnappid
。txnversion
:單調增加的數字,充當交易版本。
三角洲表使用txnappid
和txnversion
識別重複寫作並忽略它們。
如果批處理寫入故障中斷,則重新設計批次使用相同的應用程序和批次ID,這將有助於運行時正確識別重複寫入並忽略它們。應用ID(txnappid
)可以是任何用戶生成的唯一字符串,不必與流ID相關。
警告
如果刪除流檢查點並使用新檢查點重新啟動查詢,則必須提供不同的蘋果
;否則,從重新啟動查詢中的寫入將被忽略,因為它將包含相同的txnappid
批處理ID將從0開始。
相同DataFrameWriter
可以使用選項來實現非流程作業中的基本寫作。有關詳細信息願意寫作。
例子
app_id=...#用作應用程序ID的唯一字符串。防守writetodeltalaketableablempotent((batch_df,,,,batch_id):batch_df。寫。格式((...)。選項((“ txnversion”,,,,batch_id)。選項((“ txnappid”,,,,app_id)。節省((...)#位置1batch_df。寫。格式((...)。選項((“ txnversion”,,,,batch_id)。選項((“ txnappid”,,,,app_id)。節省((...)#位置2
瓦爾蘋果=...//用作應用程序ID的唯一字符串。Streamingdf。writestream。foreachbatch{((batchdf:數據框架,,,,批處理:長)=>batchdf。寫。格式(...)。選項((“ txnversion”,,,,批處理)。選項((“ txnappid”,,,,蘋果)。節省(...)//位置1batchdf。寫。格式(...)。選項((“ txnversion”,,,,批處理)。選項((“ txnappid”,,,,蘋果)。節省(...)//位置2}
執行溪流靜態連接
您可以依靠三角洲湖的交易保證和版本管理協議進行表演流靜態加入。流靜態加入使用無狀態加入將Delta表(靜態數據)(靜態數據)的最新有效版本加入到數據流。
當Databricks在流靜態加入中處理微批量數據時,來自靜態三角洲表的最新有效數據與當前微批次中的記錄相連。因為聯接是無狀態的,因此您無需配置水印,並且可以以低延遲處理結果。聯接中使用的靜態三角洲表中的數據應緩慢改變。
Streamingdf=火花。readstream。桌子((“訂單”)staticdf=火花。讀。桌子((“beplay体育app下载地址顧客”)詢問=((Streamingdf。加入((staticdf,,,,Streamingdf。客戶ID==staticdf。ID,,,,“內”)。writestream。選項((“ checkpointlocation”,,,,checkpoint_path)。桌子((“ orders_with_customer_info”))