結構化流寫入Azure Synapse

Azure Synapse連接器為Azure Synapse提供高效且可伸縮的結構化流寫入支持,通過批處理寫入和使用提供一致的用戶體驗複製用於Databricks集群和Azure Synapse實例之間的大數據傳輸。

Databricks和Synapse之間的結構化流支持為配置增量ETL作業提供了簡單的語義。用於將數據從Databricks加載到Synapse的模型引入了延遲,可能無法滿足近實時工作負載的SLA要求。看到在Azure Synapse Analytics中查詢數據

支持流式寫入Synapse的輸出模式

Azure Synapse連接器支持附加而且完整的記錄追加和聚合的輸出模式。有關輸出模式和兼容性矩陣的詳細信息,請參見結構化流媒體指南

突觸容錯語義

默認情況下,Azure Synapse Streaming提供端到端服務僅一次通過使用DBFS中的檢查點位置、Azure Synapse中的檢查點表和鎖定機製的組合來可靠地跟蹤查詢的進度,保證將數據寫入Azure Synapse表,以確保流可以處理任何類型的失敗、重試和查詢重啟。

此外,您還可以通過設置為Azure Synapse流選擇限製較少的“至少一次”語義spark.databricks.sqldw.streaming.exactlyOnce.enabled選項,在這種情況下,在間歇連接到Azure Synapse失敗或意外查詢終止的情況下,可能會發生數據複製。

用於寫入Azure Synapse的結構化流語法

下麵的代碼示例演示了在Scala和Python中使用結構化流將流寫入Synapse:

//在notebook session conf中設置Blob存儲帳戶訪問密鑰。火花相依“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”“< your-storage-account-access-key >”//準備流媒體源;這可以是卡夫卡或者一個簡單的速率流。瓦爾dfDataFrame火花readStream格式“速度”選項“rowsPerSecond”“100000”選項“numPartitions”“16”負載()//應用一些轉換到數據,然後使用//在Azure Synapse中連續寫入數據到表的結構化流API。dfwriteStream格式“com.databricks.spark.sqldw”選項“url”" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”選項“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”選項“forwardSparkAzureStorageCredentials”“真正的”選項“數據表”“< your-table-name >”選項“checkpointLocation”“/ tmp_checkpoint_location”開始()
#在notebook會話conf中設置Blob存儲帳戶訪問密鑰火花相依“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”“< your-storage-account-access-key >”#準備流媒體源;這可以是卡夫卡或者一個簡單的速率流。df火花readStream格式“速度”選項“rowsPerSecond”“100000”選項“numPartitions”“16”負載()#對數據應用一些轉換,然後使用#結構化流API連續寫入數據到Azure Synapse的表中。dfwriteStream格式“com.databricks.spark.sqldw”選項“url”" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”選項“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”選項“forwardSparkAzureStorageCredentials”“真正的”選項“數據表”“< your-table-name >”選項“checkpointLocation”“/ tmp_checkpoint_location”開始()

有關配置的完整列表,請參見在Azure Synapse Analytics中查詢數據

突觸流檢查點表管理

Azure Synapse連接器刪除在啟動新的流查詢時創建的流檢查點表。這種行為與checkpointLocation通常指定給對象存儲。Databricks建議您定期刪除將來不會運行的查詢的檢查點表。

默認情況下,所有檢查點表都有這個名稱<前綴> _ < query_id >,在那裏<前綴>是否具有默認值的可配置前綴databricks_streaming_checkpoint而且query_id流查詢ID與_角色刪除。

要查找過期或已刪除的流查詢的所有檢查點表,運行查詢:

選擇sys在哪裏的名字就像“databricks_streaming_checkpoint %”

可以通過Spark SQL配置選項配置前綴spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix

Databricks Synapse連接器流選項參考

選項提供的Spark SQL支持以下流選項批處理選項

參數

要求

默認的

筆記

checkpointLocation

是的

沒有默認的

DBFS上的位置,結構化流將使用該位置寫入元數據和檢查點信息。看到使用檢查點從失敗中恢複在結構化流編程指南。

numStreamingTempDirsToKeep

沒有

0

指示流中為定期清理微批而保留的(最新)臨時目錄的數量。當設置為0,微批提交後立即觸發目錄刪除,否則保留最新提供的微批數量,並刪除其餘目錄。使用-1禁用定期清理。

請注意

checkpointLocation而且numStreamingTempDirsToKeep隻與從Databricks到Azure Synapse中的新表的流寫入相關。