結構化流寫入Azure Synapse
Azure Synapse連接器為Azure Synapse提供高效且可伸縮的結構化流寫入支持,通過批處理寫入和使用提供一致的用戶體驗複製
用於Databricks集群和Azure Synapse實例之間的大數據傳輸。
Databricks和Synapse之間的結構化流支持為配置增量ETL作業提供了簡單的語義。用於將數據從Databricks加載到Synapse的模型引入了延遲,可能無法滿足近實時工作負載的SLA要求。看到在Azure Synapse Analytics中查詢數據.
支持流式寫入Synapse的輸出模式
Azure Synapse連接器支持附加
而且完整的
記錄追加和聚合的輸出模式。有關輸出模式和兼容性矩陣的詳細信息,請參見結構化流媒體指南.
突觸容錯語義
默認情況下,Azure Synapse Streaming提供端到端服務僅一次通過使用DBFS中的檢查點位置、Azure Synapse中的檢查點表和鎖定機製的組合來可靠地跟蹤查詢的進度,保證將數據寫入Azure Synapse表,以確保流可以處理任何類型的失敗、重試和查詢重啟。
此外,您還可以通過設置為Azure Synapse流選擇限製較少的“至少一次”語義spark.databricks.sqldw.streaming.exactlyOnce.enabled
選項假
,在這種情況下,在間歇連接到Azure Synapse失敗或意外查詢終止的情況下,可能會發生數據複製。
用於寫入Azure Synapse的結構化流語法
下麵的代碼示例演示了在Scala和Python中使用結構化流將流寫入Synapse:
//在notebook session conf中設置Blob存儲帳戶訪問密鑰。火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”,“< your-storage-account-access-key >”)//準備流媒體源;這可以是卡夫卡或者一個簡單的速率流。瓦爾df:DataFrame=火花.readStream.格式(“速度”).選項(“rowsPerSecond”,“100000”).選項(“numPartitions”,“16”).負載()//應用一些轉換到數據,然後使用//在Azure Synapse中連續寫入數據到表的結構化流API。df.writeStream.格式(“com.databricks.spark.sqldw”).選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”).選項(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”).選項(“forwardSparkAzureStorageCredentials”,“真正的”).選項(“數據表”,“< your-table-name >”).選項(“checkpointLocation”,“/ tmp_checkpoint_location”).開始()
#在notebook會話conf中設置Blob存儲帳戶訪問密鑰火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”,“< your-storage-account-access-key >”)#準備流媒體源;這可以是卡夫卡或者一個簡單的速率流。df=火花.readStream\.格式(“速度”)\.選項(“rowsPerSecond”,“100000”)\.選項(“numPartitions”,“16”)\.負載()#對數據應用一些轉換,然後使用#結構化流API連續寫入數據到Azure Synapse的表中。df.writeStream\.格式(“com.databricks.spark.sqldw”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)\.選項(“forwardSparkAzureStorageCredentials”,“真正的”)\.選項(“數據表”,“< your-table-name >”)\.選項(“checkpointLocation”,“/ tmp_checkpoint_location”)\.開始()
有關配置的完整列表,請參見在Azure Synapse Analytics中查詢數據.
突觸流檢查點表管理
Azure Synapse連接器不刪除在啟動新的流查詢時創建的流檢查點表。這種行為與checkpointLocation
通常指定給對象存儲。Databricks建議您定期刪除將來不會運行的查詢的檢查點表。
默認情況下,所有檢查點表都有這個名稱<前綴> _ < query_id >
,在那裏<前綴>
是否具有默認值的可配置前綴databricks_streaming_checkpoint
而且query_id
流查詢ID與_
角色刪除。
要查找過期或已刪除的流查詢的所有檢查點表,運行查詢:
選擇*從sys.表在哪裏的名字就像“databricks_streaming_checkpoint %”
可以通過Spark SQL配置選項配置前綴spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
.
Databricks Synapse連接器流選項參考
的選項
提供的Spark SQL支持以下流選項批處理選項:
參數 |
要求 |
默認的 |
筆記 |
---|---|---|---|
|
是的 |
沒有默認的 |
DBFS上的位置,結構化流將使用該位置寫入元數據和檢查點信息。看到使用檢查點從失敗中恢複在結構化流編程指南。 |
|
沒有 |
0 |
指示流中為定期清理微批而保留的(最新)臨時目錄的數量。當設置為 |
請注意
checkpointLocation
而且numStreamingTempDirsToKeep
隻與從Databricks到Azure Synapse中的新表的流寫入相關。