使用JDBC查詢數據庫
Databricks支持使用JDBC連接外部數據庫。本文通過Python、SQL和Scala中的示例提供配置和使用這些連接的基本語法。
Partner Connect為與許多外部數據源同步數據提供了優化的集成。看到什麼是Databricks合作夥伴連接?.
重要的
本文中的示例不包括JDBC url中的用戶名和密碼。磚推薦使用秘密來存儲數據庫憑據。例如:
用戶名=dbutils.秘密.得到(範圍=“jdbc”,關鍵=“用戶名”)密碼=dbutils.秘密.得到(範圍=“jdbc”,關鍵=“密碼”)
瓦爾用戶名=dbutils.秘密.得到(範圍=“jdbc”,關鍵=“用戶名”)瓦爾密碼=dbutils.秘密.得到(範圍=“jdbc”,關鍵=“密碼”)
要用SQL引用Databricks的秘密,您必須在集群初始化過程中配置Spark配置屬性.
有關秘密管理的完整示例,請參見秘密工作流示例.
建立雲連接
Databricks vpc配置為隻支持Spark集群。當連接到另一個基礎設施時,最佳實踐是使用VPC凝視.VPC對等建立成功後,可以通過netcat
群集上的實用程序。
%sh nc -vz . sh
用JDBC讀取數據
您必須配置許多設置,以便使用JDBC讀取數據。注意,每個數據庫使用不同的格式< jdbc_url >
.
employees_table=(火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).負載())
創建臨時視圖employees_table_vw使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> ')
瓦爾employees_table=火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).負載()
Spark自動從數據庫表中讀取模式,並將其類型映射回Spark SQL類型。
employees_table.printSchema
描述employees_table_vw
employees_table.printSchema
您可以對這個JDBC表運行查詢:
顯示(employees_table.選擇(“年齡”,“工資”).groupBy(“年齡”).avg(“工資”))
選擇年齡,avg(工資)作為工資從employees_table_vw集團通過年齡
顯示(employees_table.選擇(“年齡”,“工資”).groupBy(“年齡”).avg(“工資”))
用JDBC寫數據
使用JDBC將數據保存到表中使用與讀取類似的配置。請看下麵的例子:
(employees_table.寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).保存())
創建表格new_employees_table使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> ')作為選擇*從employees_table_vw
employees_table.寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).保存()
默認行為嚐試創建一個新表,如果同名表已經存在,則拋出一個錯誤。
可以使用以下語法向現有表追加數據:
(employees_table.寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).模式(“添加”).保存())
創建表格如果不存在new_employees_table使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> ');插入成new_employees_table選擇*從employees_table_vw;
employees_table.寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).模式(“添加”).保存()
您可以使用以下語法覆蓋現有的表:
(employees_table.寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).模式(“覆蓋”).保存())
創建或取代表格new_employees_table使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> ')作為選擇*從employees_table_vw;
employees_table.寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).模式(“覆蓋”).保存()
控製JDBC查詢的並行性
默認情況下,JDBC驅動程序隻使用一個線程查詢源數據庫。為了提高讀取的性能,需要指定一些選項來控製Databricks同時對數據庫進行多少次查詢。對於小型集群,設置numPartitions
選項等於集群中執行器核的數量,確保所有節點並行查詢數據。
警告
設置numPartitions
在大型集群上設置過高的值可能會導致遠程數據庫的性能下降,因為同時進行的查詢太多可能會使服務不堪重負。這對於應用程序數據庫來說尤其麻煩。注意不要將該值設置在50以上。
請注意
類的源數據庫中計算索引的列可以加快查詢速度partitionColumn
.
下麵的代碼示例演示了配置8核集群的並行性:
employees_table=(火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”)#可使用的列,該列具有可用於並行化的均勻分布的值範圍.選項(“partitionColumn”,“< partition_key >”)使用partitionColumn提取數據的#最小值.選項(“下界”,“< min_value >”)使用partitionColumn提取數據的最大值.選項(“upperBound”,“< max_value >”)#分配數據的分區數量。不要設置的非常大(~數百).選項(“numPartitions”,8).負載())
創建臨時視圖employees_table_vw使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> ',partitionColumn“< partition_key >”,下界“< min_value >”,upperBound“< max_value >”,numPartitions8)
瓦爾employees_table=火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”)//一個可以使用的列,該列具有均勻分布的可用於並行化的值範圍.選項(“partitionColumn”,“< partition_key >”)//使用partitionColumn提取數據的最小值.選項(“下界”,“< min_value >”)//使用partitionColumn提取數據的最大值.選項(“upperBound”,“< max_value >”)//分配數據的分區數量。不要設置的非常大(~數百).選項(“numPartitions”,8).負載()
請注意
Databricks支持所有Apache Spark配置JDBC的選項.
當使用JDBC寫入數據庫時,Apache Spark使用內存中的分區數量來控製並行性。在寫入數據以控製並行性之前,可以對數據進行重新分區。避免在大型集群上使用大量分區,以避免使遠程數據庫不堪重負。下麵的例子演示了在寫入前重新分區到8個分區:
(employees_table.重新分區(8).寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).保存())
創建表格new_employees_table使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> ')作為選擇* / / * +重新分區(8)*從employees_table_vw
employees_table.重新分區(8).寫.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< new_table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).保存()
下推一個查詢到數據庫引擎
您可以將整個查詢下推到數據庫,隻返回結果。的表格
參數標識要讀取的JDBC表。您可以使用任何在SQL查詢中有效的內容從
條款。
pushdown_query="(select * from where emp_no < 10008) as emp_alias"employees_table=(火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,pushdown_query).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).負載())
創建臨時視圖employees_table_vw使用JDBC選項(url“< jdbc_url >”,數據表"(select * from where emp_no < 10008) as emp_alias",用戶“<用戶名>”,密碼' <密碼> ')
瓦爾pushdown_query="(select * from where emp_no < 10008) as emp_alias"瓦爾employees_table=火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,pushdown_query).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).負載()
控製每個查詢獲取的行數
JDBC驅動程序有一個fetchSize
控製一次從遠程數據庫獲取的行數的參數。
設置 |
結果 |
---|---|
過低 |
由於多次往返導致的高延遲(每個查詢返回的行很少) |
太高了 |
內存不足錯誤(一次查詢返回的數據太多) |
最優值與工作負載相關。注意事項包括:
查詢返回多少列?
返回什麼數據類型?
每一列返回的字符串有多長?
係統可能有非常小的默認值,並從調優中受益。例如:Oracle的默認值fetchSize
是10。將其增加到100可以將需要執行的查詢總數減少10倍。JDBC結果是網絡流量,因此要避免非常大的數字,但對於許多數據集來說,最佳值可能是數千。
使用fetchSize
選項,如下例所示:
employees_table=(火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).選項(“fetchSize”,“100”).負載())
創建臨時視圖employees_table_vw使用JDBC選項(url“< jdbc_url >”,數據表“< table_name >”,用戶“<用戶名>”,密碼' <密碼> '.fetchSizeOne hundred.)
瓦爾employees_table=火花.讀.格式(“jdbc”).選項(“url”,“< jdbc_url >”).選項(“數據表”,“< table_name >”).選項(“用戶”,“<用戶名>”).選項(“密碼”,“<密碼>”).選項(“fetchSize”,“100”).負載()