Azure Synapse Analytics
Azure Synapse Analytics(以前稱為SQL數據倉庫)是一個基於雲的企業數據倉庫,它利用大規模並行處理(MPP)跨pb級數據快速運行複雜查詢。使用Azure作為大數據解決方案的關鍵組件。導入大數據到Azure簡單混合基T-SQL查詢,或者複製語句,然後使用MPP的功能來運行高性能分析。當您進行集成和分析時,數據倉庫將成為您的業務可以依賴的唯一真相版本。
您可以使用Azure Synapse連接器從Databricks訪問Azure Synapse,這是Apache Spark使用的數據源實現Azure Blob存儲、PolyBase或複製
語句在Databricks集群和Azure Synapse實例之間有效傳輸大量數據。
Databricks集群和Azure Synapse實例都訪問一個公共Blob存儲容器,以便在這兩個係統之間交換數據。在Databricks中,Apache Spark作業是由Azure Synapse連接器觸發的,用於從Blob存儲容器讀取數據和向Blob存儲容器寫入數據。在Azure Synapse端,由PolyBase執行的數據加載和卸載操作由Azure Synapse連接器通過JDBC觸發。在Databricks Runtime 7.0及以上版本中,複製
默認情況下,Azure Synapse連接器通過JDBC將數據加載到Azure Synapse中。
請注意
複製
僅在Azure Synapse Gen2實例上可用,該實例提供更好的性能.如果您的數據庫仍然使用Gen1實例,我們建議您將數據庫遷移到Gen2。
Azure Synapse連接器更適合ETL而不是交互式查詢,因為每次查詢執行都可以將大量數據提取到Blob存儲中。如果您計劃對同一個Azure Synapse表執行多個查詢,我們建議您以Parquet等格式保存提取的數據。
需求
一個數據庫主密鑰Azure Synapse。
身份驗證
Azure Synapse連接器使用三種類型的網絡連接:
Spark驅動程序到Azure Synapse
Spark驅動程序和執行程序到Azure存儲帳戶
Azure Synapse到Azure存儲帳戶
┌─────────┐┌─────────────────────────>存儲││<────────────────────────┐│存儲acc鍵/││帳戶存儲acc鍵/││管理服務ID /└─────────┘OAuth 2.0 /│││││││││存儲acc鍵/│││OAuth 2.0 /││││v v┌──────v────┐┌──────────┐┌──────────┐│┌──────────┴┐突觸│││火花│││火花│││分析<────────────────────>司機││<───────────────>│執行人│└──────────┘JDBC與└──────────┘配置└───────────┘用戶名&密碼/火花
下麵幾節描述每個連接的身份驗證配置選項。
Spark驅動程序到Azure Synapse
Spark驅動程序可以使用帶有用戶名和密碼的JDBC連接到Azure Synapse,也可以使用帶有服務主體的OAuth 2.0進行身份驗證。
用戶名和密碼
我們建議您使用Azure門戶提供的連接字符串進行這兩種身份驗證類型,這兩種類型為Spark驅動程序和Azure Synapse實例之間通過JDBC連接發送的所有數據啟用安全套接字層(SSL)加密。如果需要驗證SSL加密是否已啟用,可以搜索加密= true
在連接字符串中。
為了允許Spark驅動程序到達Azure Synapse,我們建議您設置允許訪問Azure服務來在通過Azure門戶在Azure Synapse服務器的防火牆窗格上。此設置允許來自所有Azure IP地址和所有Azure子網的通信,這允許Spark驅動程序到達Azure Synapse實例。
帶有服務主體的OAuth 2.0
您可以使用具有訪問底層存儲帳戶權限的服務主體對Azure Synapse Analytics進行身份驗證。有關使用服務主體憑據訪問Azure存儲帳戶的詳細信息,請參見通過Azure服務主體使用OAuth 2.0訪問Azure數據湖存儲Gen2.您必須設置enableServicePrincipalAuth
選項真正的
在連接配置中參數使連接器能夠使用服務主體進行身份驗證。
您可以選擇為Azure Synapse Analytics連接使用不同的服務主體。為存儲帳戶配置服務主體憑據,為Synapse配置可選服務主體憑據:
;為Azure存儲帳戶定義服務主體憑據fs.azure.account.auth.type OAuthfs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProviderfs.azure.account.oauth2.client。id <應用程序id >fs.azure.account.oauth2.client。秘密< service-credential >fs.azure.account.oauth2.client。端點https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌;為Azure Synapse Analytics定義一組單獨的服務主體憑證(如果沒有定義,連接器將使用Azure存儲帳戶憑證)spark.databricks.sqldw.jdbc.service.principal.client.id <應用程序id >spark.databricks.sqldw.jdbc.service.principal.client.secret < service-credential >
//為Azure存儲帳戶定義Service Principal憑據火花.相依.集(“fs.azure.account.auth.type”,“OAuth”)火花.相依.集(“fs.azure.account.oauth.provider.type”,“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”)火花.相依.集(“fs.azure.account.oauth2.client.id”,“<應用程序id >”)火花.相依.集(“fs.azure.account.oauth2.client.secret”,“< service-credential >”)火花.相依.集(“fs.azure.account.oauth2.client.endpoint”,“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”)//為Azure Synapse Analytics定義一個單獨的服務主體憑證集(如果沒有定義,連接器將使用Azure存儲帳戶憑證)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.id”,“<應用程序id >”)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.secret”,“< service-credential >”)
#為Azure存儲帳戶定義服務主體憑證火花.相依.集(“fs.azure.account.auth.type”,“OAuth”)火花.相依.集(“fs.azure.account.oauth.provider.type”,“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”)火花.相依.集(“fs.azure.account.oauth2.client.id”,“<應用程序id >”)火花.相依.集(“fs.azure.account.oauth2.client.secret”,“< service-credential >”)火花.相依.集(“fs.azure.account.oauth2.client.endpoint”,“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”)#為Azure Synapse Analytics定義一組單獨的服務主體憑證(如果沒有定義,連接器將使用Azure存儲帳戶憑證)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.id”,“<應用程序id >”)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.secret”,“< service-credential >”)
#加載SparkR圖書館(SparkR)相依<-sparkR.callJMethod(sparkR.session(),“配置”)#為Azure存儲帳戶定義服務主體憑證sparkR.callJMethod(相依,“設置”,“fs.azure.account.auth.type”,“OAuth”)sparkR.callJMethod(相依,“設置”,“fs.azure.account.oauth.provider.type”,“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”)sparkR.callJMethod(相依,“設置”,“fs.azure.account.oauth2.client.id”,“<應用程序id >”)sparkR.callJMethod(相依,“設置”,“fs.azure.account.oauth2.client.secret”,“< service-credential >”)sparkR.callJMethod(相依,“設置”,“fs.azure.account.oauth2.client.endpoint”,“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”)#為Azure Synapse Analytics定義一組單獨的服務主體憑證(如果沒有定義,連接器將使用Azure存儲帳戶憑證)sparkR.callJMethod(相依,“設置”,“spark.databricks.sqldw.jdbc.service.principal.client.id”,“<應用程序id >”)sparkR.callJMethod(相依,“設置”,“spark.databricks.sqldw.jdbc.service.principal.client.secret”,“< service-credential >”)
Spark驅動程序和執行程序到Azure存儲帳戶
Azure存儲容器充當從Azure Synapse讀取或寫入大量數據時存儲大量數據的中介。Spark通過一個內置連接器連接到存儲容器:Azure Blob存儲或Azure數據湖存儲(ADLS) Gen2.Azure數據湖存儲Gen1是不支持並且隻允許SSL加密的HTTPS訪問。因此,唯一支持的URI方案是wasbs
而且abfss
.
認證選項如下:
Azure Blob存儲(
wasbs
)存儲帳戶訪問密鑰和秘密
Azure數據湖存儲Gen2 (
abfss
)存儲帳戶訪問密鑰和秘密
OAuth 2.0認證。有關OAuth 2.0和服務主體的更多信息,請參見通過Azure服務主體使用OAuth 2.0訪問Azure數據湖存儲Gen2).
下麵的示例使用存儲帳戶訪問密鑰方法演示了這兩種方法。這同樣適用於OAuth 2.0配置。
筆記本會話配置(首選)
使用這種方法,帳戶訪問密鑰在與運行該命令的筆記本相關聯的會話配置中設置。此配置不會影響附加到同一集群的其他筆記本電腦。火花
是SparkSession
對象在筆記本中提供。
火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)
Hadoop全局配置
類關聯的全局Hadoop配置將更新SparkContext
所有筆記本共享的對象。
sc.hadoopConfiguration.集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)
hadoopConfiguration
並非在所有版本的PySpark中都公開。雖然下麵的命令依賴於一些Spark內部,但它應該適用於所有的PySpark版本,將來不太可能被破壞或更改:
sc._jsc.hadoopConfiguration().集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)
Azure Synapse到Azure存儲帳戶
Azure Synapse還在加載和卸載臨時數據期間連接到存儲帳戶。
如果您已經為存儲帳戶設置了帳戶密鑰和秘密,您可以設置forwardSparkAzureStorageCredentials
來真正的
,在這種情況下,Azure Synapse連接器自動發現筆記本會話配置或全局Hadoop配置中設置的帳戶訪問密鑰,並通過創建臨時Azure將存儲帳戶訪問密鑰轉發給連接的Azure Synapse實例數據庫範圍的憑據.
或者,如果您使用ADLS Gen2 + OAuth 2.0身份驗證,或者您的Azure Synapse實例配置為具有托管服務標識(通常與VNet +服務端點設置),你必須設置useAzureMSI
來真正的
.在這種情況下,連接器將指定身份=的管理服務身份的
對於數據庫作用域憑據和no秘密
.
流媒體支持
Azure Synapse連接器為Azure Synapse提供了高效且可伸縮的結構化流寫入支持,提供了一致的批處理寫入用戶體驗,並使用PolyBase或複製
用於Databricks集群和Azure Synapse實例之間的大數據傳輸。與批處理寫類似,流主要是為ETL設計的,因此提供了更高的延遲,在某些情況下可能不適合實時數據處理。
使用(批處理)
您可以通過Scala、Python、SQL和R筆記本中的數據源API使用此連接器。
//否則,在notebook會話conf中設置Blob存儲帳戶訪問密鑰。火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)//從Azure Synapse表中獲取數據瓦爾df:DataFrame=火花.讀.格式(“com.databricks.spark.sqldw”).選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”).選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”).選項(“forwardSparkAzureStorageCredentials”,“真正的”).選項(“數據表”,“< your-table-name >”).負載()//從Azure Synapse查詢中加載數據。瓦爾df:DataFrame=火花.讀.格式(“com.databricks.spark.sqldw”).選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”).選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”).選項(“forwardSparkAzureStorageCredentials”,“真正的”).選項(“查詢”,select x, count(*) as CNT from table group by x).負載()//對數據應用一些轉換,然後使用//將數據寫回Azure Synapse中的另一個表的數據源API。df.寫.格式(“com.databricks.spark.sqldw”).選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”).選項(“forwardSparkAzureStorageCredentials”,“真正的”).選項(“數據表”,“< your-table-name >”).選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”).保存()
#否則,在notebook會話conf中設置Blob存儲帳戶訪問密鑰。火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)#從Azure Synapse表中獲取一些數據。df=火花.讀\.格式(“com.databricks.spark.sqldw”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.選項(“forwardSparkAzureStorageCredentials”,“真正的”)\.選項(“數據表”,“< your-table-name >”)\.負載()#從Azure Synapse查詢中加載數據df=火花.讀\.格式(“com.databricks.spark.sqldw”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.選項(“forwardSparkAzureStorageCredentials”,“真正的”)\.選項(“查詢”,select x, count(*) as CNT from table group by x)\.負載()對數據應用一些轉換,然後使用#數據源API將數據寫回Azure Synapse中的另一個表。df.寫\.格式(“com.databricks.spark.sqldw”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“forwardSparkAzureStorageCredentials”,“真正的”)\.選項(“數據表”,“< your-table-name >”)\.選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.保存()
——否則,在notebook session conf中設置Blob存儲帳戶訪問密鑰。集fs.azure.賬戶.關鍵.<你的-存儲-賬戶-的名字>.團.核心.窗戶.網= <你的-存儲-賬戶-訪問-關鍵>;——使用SQL讀取數據。創建表格example_table_in_spark_read使用com.磚.火花.sqldw選項(url" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forwardSparkAzureStorageCredentials“真正的”,數據表' < your-table-name >”,tempDir“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”);——使用SQL寫數據。——創建一個新表,如果已經存在同名的表,則拋出一個錯誤:創建表格example_table_in_spark_write使用com.磚.火花.sqldw選項(url" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forwardSparkAzureStorageCredentials“真正的”,數據表' < your-table-name >”,tempDir“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)作為選擇*從table_to_save_in_spark;
#加載SparkR圖書館(SparkR)#否則,在notebook會話conf中設置Blob存儲帳戶訪問密鑰。相依<-sparkR.callJMethod(sparkR.session(),“配置”)sparkR.callJMethod(相依,“設置”,“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)#從Azure Synapse表中獲取一些數據。df<-read.df(源=“com.databricks.spark.sqldw”,url=" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,數據表=“< your-table-name >”,tempDir=“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)#從Azure Synapse查詢中加載數據df<-read.df(源=“com.databricks.spark.sqldw”,url=" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,查詢=select x, count(*) as CNT from table group by x,tempDir=“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)對數據應用一些轉換,然後使用#數據源API將數據寫回Azure Synapse中的另一個表。write.df(df,源=“com.databricks.spark.sqldw”,url=" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,數據表=“< your-table-name >”,tempDir=“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)
使用(流)
你可以在Scala和Python筆記本中使用結構化流寫數據。
//在notebook session conf中設置Blob存儲帳戶訪問密鑰。火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)//準備流媒體源;這可以是卡夫卡或者一個簡單的速率流。瓦爾df:DataFrame=火花.readStream.格式(“速度”).選項(“rowsPerSecond”,“100000”).選項(“numPartitions”,“16”).負載()//應用一些轉換到數據,然後使用//在Azure Synapse中連續寫入數據到表的結構化流API。df.writeStream.格式(“com.databricks.spark.sqldw”).選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”).選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”).選項(“forwardSparkAzureStorageCredentials”,“真正的”).選項(“數據表”,“< your-table-name >”).選項(“checkpointLocation”,“/ tmp_checkpoint_location”).開始()
#在notebook會話conf中設置Blob存儲帳戶訪問密鑰火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .blob.core.windows.net”,“< your-storage-account-access-key >”)#準備流媒體源;這可以是卡夫卡或者一個簡單的速率流。df=火花.readStream\.格式(“速度”)\.選項(“rowsPerSecond”,“100000”)\.選項(“numPartitions”,“16”)\.負載()#對數據應用一些轉換,然後使用#結構化流API連續寫入數據到Azure Synapse的表中。df.writeStream\.格式(“com.databricks.spark.sqldw”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.選項(“forwardSparkAzureStorageCredentials”,“真正的”)\.選項(“數據表”,“< your-table-name >”)\.選項(“checkpointLocation”,“/ tmp_checkpoint_location”)\.開始()
配置
本節描述如何為連接器配置寫語義、所需的權限和其他配置參數。
本節:
支持批量寫入的保存模式
Azure Synapse連接器支持ErrorIfExists
,忽略
,附加
,覆蓋
保存模式,默認模式為ErrorIfExists
.有關Apache Spark中支持的保存模式的更多信息,請參見Spark SQL文檔中的保存模式.
支持流寫入的輸出模式
Azure Synapse連接器支持附加
而且完整的
記錄追加和聚合的輸出模式。有關輸出模式和兼容性矩陣的詳細信息,請參見結構化流媒體指南.
編寫語義
請注意
複製
已在Databricks Runtime 7.0及以上版本提供。
除了PolyBase, Azure Synapse連接器還支持複製
聲明。的複製
語句提供了一種更方便的方式將數據加載到Azure Synapse,而不需要創建外部表,需要更少的權限來加載數據,並提高了數據輸入到Azure Synapse的性能。
默認情況下,連接器自動發現最佳寫語義(複製
當目標是Azure Synapse Gen2實例時,否則是PolyBase)。你也可以用下麵的配置來指定寫語義:
//在notebook會話conf中配置Azure Synapse連接器的寫入語義。火花.相依.集(“spark.databricks.sqldw.writeSemantics”,“< write-semantics >”)
#在notebook會話conf中配置Azure Synapse連接器的寫入語義火花.相依.集(“spark.databricks.sqldw.writeSemantics”,“< write-semantics >”)
——在notebook會話conf中配置Azure Synapse連接器的寫入語義。集火花.磚.sqldw.writeSemantics= <寫-語義>;
#加載SparkR圖書館(SparkR)#在notebook會話conf中配置Azure Synapse連接器的寫入語義相依<-sparkR.callJMethod(sparkR.session(),“配置”)sparkR.callJMethod(相依,“設置”,“spark.databricks.sqldw.writeSemantics”,“< write-semantics >”)
在哪裏< write-semantics >
要麼是混合基
使用PolyBase,或複製
使用複製
聲明。
PolyBase需要Azure Synapse權限
當您使用PolyBase時,Azure Synapse連接器要求JDBC連接用戶具有在連接的Azure Synapse實例中運行以下命令的權限:
作為第一個命令的先決條件,連接器期望指定的Azure Synapse實例已經存在數據庫主密鑰。方法創建密鑰創建主密鑰命令。
此外,讀取Azure Synapse表設置通過數據表
或參考表格查詢
, JDBC用戶必須擁有訪問所需Azure Synapse表的權限。將數據寫入通過設置的Azure Synapse表數據表
, JDBC用戶必須有權限寫入這個Azure Synapse表。
下表總結了使用PolyBase的所有操作所需的權限:
操作 |
權限 |
使用時的權限外部數據源 |
---|---|---|
批處理寫 |
控製 |
看到批處理寫 |
流寫 |
控製 |
看到流寫 |
讀 |
控製 |
看到讀 |
需要Azure Synapse權限的PolyBase與外部數據源選項
請注意
在Databricks Runtime 8.4及以上版本中可用。
您可以將PolyBase與預先配置的外部數據源一起使用。看到externalDataSource
參數參數獲取更多信息。
要將PolyBase與預先配置的外部數據源一起使用,Azure Synapse連接器要求JDBC連接用戶擁有在連接的Azure Synapse實例中運行以下命令的權限:
要創建外部數據源,首先應該創建一個數據庫範圍的憑據。以下鏈接描述了如何為服務主體和ABFS位置的外部數據源創建有作用域的憑據:
請注意
外部數據源位置必須指向容器。如果位置是容器中的目錄,則連接器將不起作用。
下表總結了使用外部數據源選項進行PolyBase寫操作的權限:
操作 |
權限(插入到現有表中) |
權限(插入到新表中) |
---|---|---|
批處理寫 |
管理數據庫批量操作 插入 創建表 修改任何模式 修改任何外部數據源 更改任何外部文件格式 |
管理數據庫批量操作 插入 創建表 修改任何模式 修改任何外部數據源 更改任何外部文件格式 |
流寫 |
管理數據庫批量操作 插入 創建表 修改任何模式 修改任何外部數據源 更改任何外部文件格式 |
管理數據庫批量操作 插入 創建表 修改任何模式 修改任何外部數據源 更改任何外部文件格式 |
下表總結了帶有外部數據源選項的PolyBase讀操作的權限:
操作 |
權限 |
---|---|
讀 |
創建表 修改任何模式 修改任何外部數據源 更改任何外部文件格式 |
您可以使用此連接器通過Scala、Python、SQL和R筆記本中的數據源API進行讀取。
//從Azure Synapse表中獲取數據瓦爾df:DataFrame=火花.讀.格式(“com.databricks.spark.sqldw”).選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”).選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”).選項(“externalDataSource”,“< your-pre-provisioned-data-source >”).選項(“數據表”,“< your-table-name >”).負載()
#從Azure Synapse表中獲取一些數據。df=火花.讀\.格式(“com.databricks.spark.sqldw”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“tempDir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.選項(“externalDataSource”,“< your-pre-provisioned-data-source >”)\.選項(“數據表”,“< your-table-name >”)\.負載()
——使用SQL讀取數據。創建表格example_table_in_spark_read使用com.磚.火花.sqldw選項(url" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forwardSparkAzureStorageCredentials“真正的”,數據表' < your-table-name >”,tempDir“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”,externalDataSource' < your-pre-provisioned-data-source >”);
#從Azure Synapse表中獲取一些數據。df<-read.df(源=“com.databricks.spark.sqldw”,url=" jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,數據表=“< your-table-name >”,tempDir=“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”externalDataSource=“< your-pre-provisioned-data-source >”)
所需的Azure Synapse權限複製
聲明
請注意
在Databricks Runtime 7.0及以上版本中可用。
當你使用複製
語句,Azure Synapse連接器要求JDBC連接用戶有權限在連接的Azure Synapse實例中運行以下命令:
如果目標表在Azure Synapse中不存在,除了上麵的命令外,還需要運行以下命令的權限:
下表總結了用於批處理和流式寫的權限複製
:
操作 |
權限(插入到現有表中) |
權限(插入到新表中) |
---|---|---|
批處理寫 |
管理數據庫批量操作 插入 |
管理數據庫批量操作 插入 創建表 ALTER ON SCHEMA:: dbo |
流寫 |
管理數據庫批量操作 插入 |
管理數據庫批量操作 插入 創建表 ALTER ON SCHEMA:: dbo |
參數
參數映射或選項
Spark SQL中提供的支持以下設置:
參數 |
要求 |
默認的 |
筆記 |
---|---|---|---|
|
是的,除非 |
沒有默認的 |
在Azure Synapse中要創建或讀取的表。當將數據保存回Azure Synapse時,此參數是必需的。 你也可以使用 以前支持的 |
|
是的,除非 |
沒有默認的 |
在Azure Synapse中讀取的查詢。 對於查詢中引用的表,也可以使用 |
|
沒有 |
沒有默認的 |
Azure Synapse用戶名。必須配合使用嗎 |
|
沒有 |
沒有默認的 |
Azure Synapse密碼。必須配合使用嗎 |
|
是的 |
沒有默認的 |
JDBC URL |
|
沒有 |
由JDBC URL的子協議決定 |
要使用的JDBC驅動程序的類名。這個類必須在類路徑上。在大多數情況下,應該沒有必要指定這個選項,因為適當的驅動程序類名應該由JDBC URL的子協議自動確定。 以前支持的 |
|
是的 |
沒有默認的 |
一個 以前支持的 |
|
沒有 |
|
寫入Azure Synapse時將臨時文件保存到blob存儲的格式。默認為 |
|
沒有 |
|
Spark和Azure Synapse用於臨時編碼/解碼的壓縮算法。目前支持的值為: |
|
沒有 |
假 |
如果 當前版本的Azure Synapse連接器需要(確切地)其中之一 以前支持的 |
|
沒有 |
假 |
如果 當前版本的Azure Synapse連接器需要(確切地)其中之一 |
|
沒有 |
假 |
如果 當前版本的Azure Synapse連接器需要(確切地)其中之一 |
|
沒有 |
|
用於指定的字符串表選項在創建Azure Synapse表時設置 以前支持的 |
|
沒有 |
無默認值(空字符串) |
一個 如果這些命令中的任何一個失敗,都將被視為錯誤,並且不會執行寫操作。 |
|
沒有 |
無默認值(空字符串) |
一個 如果這些命令中的任何一個失敗,它將被視為一個錯誤,並且在成功地將數據寫入Azure Synapse實例後,您將得到一個異常。 |
|
沒有 |
256 |
以前支持的 |
|
是的 |
沒有默認的 |
DBFS上的位置,結構化流將使用該位置寫入元數據和檢查點信息。看到使用檢查點從失敗中恢複在結構化流編程指南。 |
|
沒有 |
0 |
指示流中為定期清理微批而保留的(最新)臨時目錄的數量。當設置為 |
|
沒有 |
|
每個查詢的連接標記。如果未指定或該值為空字符串,則將標記的默認值添加到JDBC URL中。默認值防止Azure DB監控工具對查詢發出虛假的SQL注入警報。 |
|
沒有 |
沒有默認的 |
的列長度 |
|
沒有 |
假 |
設置為 |
|
沒有 |
沒有默認的 |
預先配置的外部數據源,用於從Azure Synapse讀取數據。外部數據源隻能與PolyBase一起使用,並且刪除了CONTROL權限要求,因為連接器不需要創建作用域憑據和外部數據源來加載數據。 有關使用外部數據源時所需的用法和權限列表,請參見需要Azure Synapse權限的PolyBase與外部數據源選項. |
|
沒有 |
0 |
取消加載操作(PolyBase或COPY)之前在讀寫期間可拒絕的最大行數。被拒絕的行將被忽略。例如,如果10個記錄中有2個有錯誤,那麼將隻處理8個記錄。 |
請注意
tableOptions
,預作用
,postActions
,maxStrLength
隻有在將數據從Databricks寫入Azure Synapse中的新表時才相關。externalDataSource
僅當從Azure Synapse讀取數據和從Databricks寫入具有PolyBase語義的Azure Synapse新表時才相關。使用時不應指定其他存儲身份驗證類型externalDataSource
如forwardSparkAzureStorageCredentials
或useAzureMSI
.checkpointLocation
而且numStreamingTempDirsToKeep
隻與從Databricks到Azure Synapse中的新表的流寫入相關。盡管所有數據源選項名稱都不區分大小寫,但為了清晰起見,我們建議您使用“駝峰大小寫”指定它們。
查詢下推到Azure Synapse
Azure Synapse連接器實現了一組優化規則,將以下操作符推入Azure Synapse:
過濾器
項目
限製
的項目
而且過濾器
操作符支持以下表達式:
大多數布爾邏輯運算符
比較
基本算術運算
數字和字符串類型轉換
為限製
操作符,隻有在沒有指定順序時才支持下推。例如:
選擇(10)*從表格
,但不是選擇(10)*從表格訂單通過上校
.
請注意
Azure Synapse連接器不會下推操作字符串、日期或時間戳的表達式。
默認情況下,使用Azure Synapse連接器構建的查詢下推是啟用的。您可以通過設置禁用它spark.databricks.sqldw.pushdown
來假
.
臨時數據管理
Azure Synapse連接器不刪除它在Blob存儲容器中創建的臨時文件。因此,我們建議您定期刪除用戶提供的臨時文件tempDir
的位置。
為了方便數據清理,Azure Synapse連接器不直接在下麵存儲數據文件tempDir
,而是創建表單的子目錄:< tempDir > / < yyyy-MM-dd > / < HH-mm-ss-SSS > / < randomUUID > /
.您可以設置周期作業(使用Databricks . properties)工作功能或其他)遞歸刪除任何子目錄的曆史超過給定的閾值(例如,2天),假設沒有Spark作業運行的時間超過該閾值。
一個更簡單的替代方法是定期刪除整個容器,並創建一個具有相同名稱的新容器。這要求您為Azure Synapse連接器生成的臨時數據使用專用容器,並且您可以找到一個時間窗口,在該時間窗口中可以保證沒有涉及該連接器的查詢正在運行。
臨時對象管理
Azure Synapse連接器自動在Databricks集群和Azure Synapse實例之間傳輸數據。要從Azure Synapse表讀取數據或查詢數據,或將數據寫入Azure Synapse表,Azure Synapse連接器將創建臨時對象,包括數據庫作用域憑證
,外部數據源
,外部文件格式
,外部表格
幕後。這些對象隻在相應的Spark作業期間存在,之後應該自動刪除。
當集群使用Azure Synapse連接器運行查詢時,如果Spark驅動程序進程崩潰或強製重新啟動,或者集群強製終止或重新啟動,則可能不會刪除臨時對象。為了便於識別和手動刪除這些對象,Azure Synapse連接器在Azure Synapse實例中創建的所有中間臨時對象的名稱前加上一個這樣的標記:tmp_databricks_ < yyyy_MM_dd_HH_mm_ss_SSS > _ < randomUUID > _ < internalObject >
.
我們建議您定期使用以下查詢查找泄漏對象:
選擇*從sys.database_scoped_credentials在哪裏的名字就像“tmp_databricks_ %”
選擇*從sys.external_data_sources在哪裏的名字就像“tmp_databricks_ %”
選擇*從sys.external_file_formats在哪裏的名字就像“tmp_databricks_ %”
選擇*從sys.external_tables在哪裏的名字就像“tmp_databricks_ %”
流檢查點表管理
Azure Synapse連接器不刪除在啟動新的流查詢時創建的流檢查點表。這種行為與checkpointLocation
DBFS。因此,我們建議您在刪除DBFS上的檢查點位置的同時,定期刪除檢查點表,以便將來不會運行或已經刪除了檢查點位置的查詢。
默認情況下,所有檢查點表都有這個名稱<前綴> _ < query_id >
,在那裏<前綴>
是否具有默認值的可配置前綴databricks_streaming_checkpoint
而且query_id
流查詢ID與_
角色刪除。要查找過期或已刪除的流查詢的所有檢查點表,運行查詢:
選擇*從sys.表在哪裏的名字就像“databricks_streaming_checkpoint %”
可以通過Spark SQL配置選項配置前綴spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
.
常見問題(FAQ)
我在使用Azure Synapse連接器時收到一個錯誤。如何判斷此錯誤來自Azure Synapse還是Databricks?
為幫助您調試錯誤,特定於Azure Synapse連接器的代碼引發的任何異常都包裝在擴展SqlDWException
特征。異常也有以下區別:
SqlDWConnectorException
表示Azure Synapse連接器拋出的錯誤SqlDWSideException
表示連接的Azure Synapse實例拋出的錯誤
如果我的查詢失敗,錯誤“在會話配置或全局Hadoop配置中沒有找到訪問鍵”,我該怎麼辦?
中指定的存儲帳戶的筆記本會話配置或全局Hadoop配置中,Azure Synapse連接器無法找到存儲帳戶訪問密鑰tempDir
.看到使用(批處理)有關如何正確配置存儲帳戶訪問的例子。如果使用Azure Synapse連接器創建Spark表,則仍然必須提供存儲帳戶訪問憑據,以便對Spark表進行讀寫。
是否可以使用共享訪問簽名(SAS)訪問指定的Blob存儲容器tempDir
?
Azure Synapse不支持使用SAS訪問Blob存儲.因此Azure Synapse連接器不支持情景應用程序來訪問指定的Blob存儲容器tempDir
.
我使用Azure Synapse連接器創建了一個Spark表數據表
選項,寫入一些數據到這個Spark表,然後刪除這個Spark表。在Azure Synapse端創建的表會被刪除嗎?
不。Azure Synapse被認為是一個外部數據源。通過設置名稱的Azure Synapse表數據表
當Spark表被刪除時不會被刪除。
當寫一個DataFrame到Azure Synapse,為什麼我需要說.option(“數據表”,表).save ()
而不僅僅是.saveAsTable(表)
?
這是因為我們想要明確以下區別:.option(“數據表”,表名)
指數據庫(即Azure Synapse)表,而.saveAsTable(表)
為Spark表。事實上,你甚至可以將兩者結合起來:df.write。....option(“數據表”,tableNameDW) .saveAsTable (tableNameSpark)
它在Azure Synapse中創建了一個名為tableNameDW
以及Spark中的一個外部表tableNameSpark
由Azure Synapse表支持。
警告
注意以下兩者之間的區別.save ()
而且.saveAsTable ()
:
為
df.write。....option(“數據表”,tableNameDW) .mode(寫模式).save ()
,寫方式
在Azure Synapse表上執行,正如預期的那樣。為
df.write。....option(“數據表”,tableNameDW) .mode(寫模式).saveAsTable (tableNameSpark)
,寫方式
作用於Spark表,而tableNameDW
被無聲地覆蓋如果它已經存在於Azure Synapse中。
這種行為與寫入任何其他數據源沒有什麼不同。這隻是Spark的一個警告DataFrameWriter API.