磚連接
磚允許您連接到連接你最喜歡的IDE (Eclipse, IntelliJ、PyCharm RStudio, Visual Studio代碼),筆記本電腦服務器(Jupyter筆記本,飛艇),和其他自定義應用程序數據磚集群。
本文解釋了磚連接是如何工作的,走你通過與磚連接的步驟開始,解釋如何解決可能出現的問題在使用磚連接時,使用磚和不同運行連接和運行在一個磚筆記本。
概述
磚是磚的客戶端庫運行時連接。它允許您使用火花api編寫工作和遠程數據磚集群上運行它們,而不是在當地引發會話。
例如,當您運行DataFrame命令spark.read.format(“鋪”).load (…) .groupBy (…) .agg(…),告訴()
使用磚連接,解析和規劃工作的運行在本地機器上。然後,工作的邏輯表示發送到火花集群中的服務器運行在磚執行。
磚連接,您可以:
從任何Python運行大規模刺激就業,Java, Scala,或R應用程序。任何你可以
進口pyspark
,進口org.apache.spark
,或要求(SparkR)
現在,您可以運行火花工作直接從您的應用程序,而不需要安裝任何IDE插件或使用火花提交腳本。單步調試和調試代碼在IDE甚至在處理遠程集群。
快速迭代開發庫。您不需要重新啟動集群在磚連接改變Python和Java庫依賴關係後,因為每個客戶機會話集群中的相互隔離。
關閉閑置集群沒有失去工作。因為客戶端應用程序是與集群脫鉤,這是影響集群重啟或升級,這通常會導致你失去所有的變量,抽樣和DataFrame對象定義在一個筆記本上。
請注意
Python開發的SQL查詢,磚建議您使用Python的磚SQL的連接器而不是磚連接。Python的磚SQL的連接器是比磚更容易建立連接。同時,磚連接解析和計劃工作在本地機器上運行,運行在遠程計算資源而工作。這可以讓它尤其難以調試運行時錯誤。Python的磚SQL連接器直接提交SQL查詢遠程計算資源和獲取結果。
需求
隻有以下磚運行時版本的支持:
磚運行時10.4 LTS ML,磚LTS 10.4運行時
磚運行時9.1 LTS ML,磚LTS 9.1運行時
磚運行時7.3 LTS ML,磚LTS 7.3運行時
小版本的Python安裝客戶端必須一樣的小磚集群的Python版本。表顯示了Python版本與每個磚安裝運行時。
磚的運行時版本的
Python版本
LTS 10.4 LTS ML, 10.4
3.8
LTS 9.1 LTS ML, 9.1
3.8
LTS 7.3 LTS ML, 7.3
3.7
例如,如果您正在使用Conda本地開發環境和集群上運行Python 3.7中,您必須創建一個環境版本,例如:
dbconnect conda創建——名稱python=37 conda
磚連接主要和次要的包的版本必須匹配你的磚運行時版本。磚建議你總是使用最新的包磚連接相匹配你的磚的運行時版本。例如,當使用磚運行時7.3 LTS集群,使用
databricks-connect = = 7.3 . *
包中。請注意
看到磚連接的發布說明可用數據磚連接的列表發布和維護更新。
Java運行時環境(JRE) 8。客戶測試了OpenJDK 8 JRE。客戶端不支持Java 11。
請注意
在Windows上,如果你看到一個錯誤,磚連接找不到winutils.exe
,請參閱找不到winutils。exe在Windows上。
設置客戶端
請注意
在你開始之前建立磚連接的客戶端,您必須符合要求的磚的連接。
步驟1:安裝客戶端
卸載PySpark。這是必需的,因為
databricks-connect
包與PySpark衝突。有關詳細信息,請參見衝突PySpark安裝。皮普卸載pyspark
安裝磚連接的客戶端。
pip安裝- u“databricks-connect = = 7.3 *”。#或X.Y.*來match your cluster version.
請注意
總是指定
databricks-connect = = X.Y. *
而不是databricks-connect = X.Y
,確保最新的安裝包。
步驟2:配置連接屬性
收集以下配置屬性:
配置連接。您可以使用CLI、SQL配置或環境變量。從最高到最低配置方法的優先級是:SQL配置鑰匙,CLI和環境變量。
CLI
運行
databricks-connect
。databricks-connect配置
許可證將顯示:
版權(2018年)磚,公司。這圖書館(的“軟件”)可能不是使用除了在連接與的被許可方的磚的使用平台服務依照達成協議Beplay体育安卓版本…
接受許可和供應配置值。為磚的主機和磚的令牌,輸入工作區URL和個人訪問令牌您在步驟1中指出。
你接受以上協議嗎?[y / N] y設置新的配置值(離開輸入空接受默認):磚主機(目前沒有價值,必須從https://]: < databricks-url >磚令牌(沒有當前值):< databricks-token >集群ID(例如,0921 - 001415 jelly628)(沒有當前值):< cluster-id > Org ID (Azure-only,看到了嗎? o = orgId URL) [0]: < org-id >端口[15001]:<口>
SQL配置或環境變量。下麵的表顯示了SQL配置鍵和對應的環境變量配置屬性您在步驟1中指出。設置SQL配置鍵,使用
sql(“集配置=值”)
。例如:sql(“集spark.databricks.service.clusterId = 0304 - 201045 - abcdefgh”)
。參數
SQL配置關鍵
環境變量名稱
磚的主機
spark.databricks.service.address
DATABRICKS_ADDRESS
磚的令牌
spark.databricks.service.token
DATABRICKS_API_TOKEN
集群ID
spark.databricks.service.clusterId
DATABRICKS_CLUSTER_ID
Org ID
spark.databricks.service.orgId
DATABRICKS_ORG_ID
港口
spark.databricks.service.port
DATABRICKS_PORT
測試連接數據磚。
databricks-connect測試
如果您配置的集群沒有運行,測試配置的集群將繼續運行,直到它開始autotermination時間。輸出應該類似:
* PySpark是安裝在/…/ 3.5.6 / lib / python3.5 /網站/ PySpark *檢查java版本的java版本“1.8.0_152”java (TM)(構建1.8.0_152-b16) java SE運行時環境熱點(TM) 64位服務器虛擬機(構建25.152 b16轉椅,混合模式)*測試scala命令18/12/10 16:38:44 NativeCodeLoader警告:無法加載native-hadoop庫為您的平台……Beplay体育安卓版本使用builtin-java類,適用的使用引發的違約log4j配置文件:org/apache/spark/log4j-defaults。屬性默認日誌級別設置為“警告”。調整日誌級別使用sc.setLogLevel(中的)。對於SparkR,使用setLogLevel(中的)。18/12/10 16:38:50警告MetricsSystem:使用默認名稱SparkStatusTracker因為無論是spark.metrics.namespace還是spark.app來源。我d is set. 18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://10.8.5.214:4040 Spark context available as 'sc' (master = local[*], app id = local-1544488730553). Spark session available as 'spark'. View job details at
/?o=0#/setting/clusters/ /sparkUi View job details at ?o=0#/setting/clusters/ /sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at /?o=0#/setting/clusters/ /sparkUi
設置您的IDE或筆記本電腦服務器
部分描述了如何配置您的首選IDE或筆記本電腦服務器使用磚連接的客戶端。
在本節中:
Jupyter筆記本
磚連接配置腳本自動添加包到您的項目配置。開始一個Python內核中運行:
從pyspark.sql進口SparkSession火花=SparkSession。構建器。getOrCreate()
要啟用%的sql
簡稱跑步和可視化的SQL查詢,使用以下代碼片段:
從IPython.core.magic進口line_magic,line_cell_magic,魔法,magics_class@magics_class類DatabricksConnectMagics(魔法):@line_cell_magicdefsql(自我,行,細胞=沒有一個):如果細胞和行:提高ValueError(細胞魔法”“線必須是空的,行)試一試:從autovizwidget.widget.utils進口display_dataframe除了ImportError:打印(“請運行“pip安裝autovizwidget”啟用可視化部件。”)display_dataframe=λx:x返回display_dataframe(自我。get_spark()。sql(細胞orgydF4y2Ba行)。toPandas())defget_spark(自我):user_ns=get_ipython()。user_ns如果“火花”在user_ns:返回user_ns(“火花”]其他的:從pyspark.sql進口SparkSessionuser_ns(“火花”]=SparkSession。構建器。getOrCreate()返回user_ns(“火花”]知識產權=get_ipython()知識產權。register_magics(DatabricksConnectMagics)
PyCharm
磚連接配置腳本自動添加包到您的項目配置。
Python 3集群
當你創建一個PyCharm項目,選擇現有的翻譯。從下拉菜單中,選擇您創建(參見Conda環境需求)。
去Run >編輯配置。
添加
PYSPARK_PYTHON = python3
作為一個環境變量。
SparkR和RStudio桌麵
下載並解壓開源的火花到您的本地機器上。選擇相同的版本在你的磚集群(Hadoop 2.7)。
運行
databricks-connectget-jar-dir
。這個命令返回一個路徑/usr/local/lib/python3.5/dist-packages / pyspark / jar
。複製的文件路徑上麵一個目錄例如,JAR目錄文件路徑/usr/local/lib/python3.5/dist-packages / pyspark
,這是SPARK_HOME
目錄中。配置火花自由路徑和火花家裏通過將它們添加到你的R腳本。集
< spark-lib-path >
你打開的目錄開源火花包在步驟1中。集< spark-home-path >
從步驟2磚連接目錄。#指向OSS包路徑,例如,/ / /…/ spark-2.4.0-bin-hadoop2.7道路圖書館(SparkR,lib.loc=.libPaths(c(file.path(“< spark-lib-path >”,“R”,“自由”),.libPaths())))#點磚PySpark連接安裝,例如,/ / /…/ PySpark路徑Sys.setenv(SPARK_HOME=“< spark-home-path >”)
發起一個火花會話並開始運行SparkR命令。
sparkR.session()df< -as.DataFrame(忠實的)頭(df)df1< -有斑紋的(df,函數(x){x},模式(df))收集(df1)
sparklyr和RStudio桌麵
預覽
這個特性是在公共預覽。
請注意
磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。
在你開始使用磚連接之前,必須符合要求的和設置客戶端磚的連接。
你可以複製sparklyr-dependent代碼開發本地使用磚連接並運行它在你的磚磚筆記本或RStudio托管服務器的工作區以最小的不需要修改代碼。
需求
sparklyr 1.2或以上。
與匹配磚磚運行時的7.3或以上連接。
安裝、配置和使用sparklyr
在RStudio桌麵,從凹口安裝sparklyr 1.2或以上或從GitHub安裝最新的主版本。
#安裝從凹口install.packages(“sparklyr”)從GitHub或安裝最新的主版本號install.packages(“devtools”)devtools::install_github(“sparklyr / sparklyr”)
激活Python環境磚連接安裝和運行以下命令在終端得到
< spark-home-path >
:databricks-connect get-spark-home
發起一個火花會話並開始運行sparklyr命令。
圖書館(sparklyr)sc< -spark_connect(方法=“磚”,spark_home=“< spark-home-path >”)iris_tbl< -copy_to(sc,虹膜,覆蓋=真正的)圖書館(dplyr)src_tbls(sc)iris_tbl% > %數
關閉連接。
spark_disconnect(sc)
IntelliJ (Scala或Java)
運行
databricks-connectget-jar-dir
。的依賴關係指向的目錄返回的命令。去文件> >項目結構>模塊依賴項>“+”符號> jar或目錄。
為了避免衝突,我們強烈建議刪除任何其他火花從類路徑中安裝。如果這是不可能的,確保jar添加在前麵的類路徑中。特別是,他們必須提前安裝其他版本的火花(否則你會使用其中一個其他火花版本和本地運行或扔一個
ClassDefNotFoundError
)。檢查在IntelliJ突破的設置選項。默認值是所有並將導致網絡超時如果你設置斷點調試。將其設置為線程為了避免停止網絡後台線程。
Eclipse
運行
databricks-connectget-jar-dir
。點的外部jar配置目錄返回的命令。去圖書館項目菜單>屬性> Java構建路徑> >添加外部jar。
為了避免衝突,我們強烈建議刪除任何其他火花從類路徑中安裝。如果這是不可能的,確保jar添加在前麵的類路徑中。特別是,他們必須提前安裝其他版本的火花(否則你會使用其中一個其他火花版本和本地運行或扔一個
ClassDefNotFoundError
)。
Visual Studio代碼
驗證Python擴展安裝。
打開命令麵板(命令+ Shift + P在macOS和Ctrl + Shift + P在Windows / Linux)。
選擇一個Python解釋器。去代碼> Preferences >設置,並選擇python的設置。
運行
databricks-connectget-jar-dir
。從命令返回的目錄添加到用戶設置JSON
python.venvPath
。這應該被添加到Python配置。禁用短絨。單擊…在右邊編輯json設置。修改後的設置如下:
如果與一個虛擬環境中運行,這是推薦的方式為Python開發在VS代碼中,在命令麵板類型
選擇python翻譯
並指向您的環境匹配集群的Python版本。例如,如果您的集群是Python 3.5,你的當地環境應該是Python 3.5。
SBT
使用SBT,您必須配置您的build.sbt
文件鏈接對磚的連接罐而不是通常的火花庫依賴關係。你這樣做的unmanagedBase
下麵的示例構建文件指令,假設一個Scala應用的com.example.Test
主要對象:
從IDE運行示例
進口java.util.ArrayList;進口並不知道;進口java.sql.Date;進口org.apache.spark.sql.SparkSession;進口org.apache.spark.sql.types。*;進口org.apache.spark.sql.Row;進口org.apache.spark.sql.RowFactory;進口org.apache.spark.sql.Dataset;公共類應用程序{公共靜態無效主要(字符串[]arg遊戲)拋出異常{SparkSession火花=SparkSession。構建器()。瀏覽器名稱(“臨時工演示”)。配置(“spark.master”,“本地”)。getOrCreate();/ /創建一個火花DataFrame組成的高和低的溫度/ /通過機場代碼和日期。StructType模式=新StructType(新StructField[]{新StructField(“AirportCode”,數據類型。StringType,假,元數據。空()),新StructField(“日期”,數據類型。DateType,假,元數據。空()),新StructField(“TempHighF”,數據類型。IntegerType,假,元數據。空()),新StructField(“TempLowF”,數據類型。IntegerType,假,元數據。空()),});列表<行>dataList=新ArrayList<行>();dataList。添加(RowFactory。創建(“BLI”,日期。返回對象的值(“2021-04-03”),52,43));dataList。添加(RowFactory。創建(“BLI”,日期。返回對象的值(“2021-04-02”),50,38));dataList。添加(RowFactory。創建(“BLI”,日期。返回對象的值(“2021-04-01”),52,41));dataList。添加(RowFactory。創建(“PDX”,日期。返回對象的值(“2021-04-03”),64年,45));dataList。添加(RowFactory。創建(“PDX”,日期。返回對象的值(“2021-04-02”),61年,41));dataList。添加(RowFactory。創建(“PDX”,日期。返回對象的值(“2021-04-01”),66年,39));dataList。添加(RowFactory。創建(“海”,日期。返回對象的值(“2021-04-03”),57,43));dataList。添加(RowFactory。創建(“海”,日期。返回對象的值(“2021-04-02”),54,39));dataList。添加(RowFactory。創建(“海”,日期。返回對象的值(“2021-04-01”),56,41));數據集<行>臨時工=火花。createDataFrame(dataList,模式);/ /創建一個表的數據磚集群,然後填滿/ /表DataFrame的內容。/ /從先前的運行,如果表已經存在/ /先刪除它。火花。sql(“使用默認”);火花。sql(“如果存在demo_temps_table”刪除表);臨時工。寫()。saveAsTable(“demo_temps_table”);/ /查詢磚集群上的表,返回的行/ /在機場代碼不是BLI和日期晚/ /比2021-04-01。組織和秩序的結果高/ /溫度按照降序排列。數據集<行>df_temps=火花。sql(“從demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”);df_temps。顯示();/ /結果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被刪除的表數據磚集群。火花。sql(“DROP TABLE demo_temps_table”);}}
從pyspark.sql進口SparkSession從pyspark.sql.types進口*從datetime進口日期火花=SparkSession。構建器。瀏覽器名稱(“temps-demo”)。getOrCreate()#創建一個火花DataFrame組成的高和低的溫度#機場代碼和日期。模式=StructType([StructField(“AirportCode”,StringType(),假),StructField(“日期”,DateType(),假),StructField(“TempHighF”,IntegerType(),假),StructField(“TempLowF”,IntegerType(),假)])數據=((“BLI”,日期(2021年,4,3),52,43),(“BLI”,日期(2021年,4,2),50,38),(“BLI”,日期(2021年,4,1),52,41),(“PDX”,日期(2021年,4,3),64年,45),(“PDX”,日期(2021年,4,2),61年,41),(“PDX”,日期(2021年,4,1),66年,39),(“海”,日期(2021年,4,3),57,43),(“海”,日期(2021年,4,2),54,39),(“海”,日期(2021年,4,1),56,41]]臨時工=火花。createDataFrame(數據,模式)#磚集群的創建一個表,然後填滿# DataFrame的表的內容。#從先前的運行,如果表已經存在#刪除它。火花。sql(使用默認的)火花。sql(“刪除表如果存在demo_temps_table”)臨時工。寫。saveAsTable(“demo_temps_table”)#查詢磚集群上的表,返回的行#在機場代碼不是BLI和日期晚比2021-04-01 #。組織和秩序的結果高#溫度按照降序排列。df_temps=火花。sql(“從demo_temps_table SELECT *”\“AirportCode ! = BLI和日期>‘2021-04-01’”\“GROUP BY AirportCode,日期、TempHighF TempLowF”\“TempHighF DESC秩序”)df_temps。顯示()#結果:## + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +# | AirportCode | |日期TempHighF | TempLowF |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +45 # | PDX | 64 | 2021-04-03 | |# | PDX | 61 | 2021-04-02 | 41 |43 57 #海| | 2021-04-03 | | |54 #海| | 2021-04-02 | | |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +#清理被刪除的表數據磚集群。火花。sql(“DROP TABLE demo_temps_table”)
進口org。apache。火花。sql。SparkSession進口org。apache。火花。sql。類型。_進口org。apache。火花。sql。行進口java。sql。日期對象演示{def主要(arg遊戲:數組(字符串]){瓦爾火花=SparkSession。構建器。主(“本地”)。getOrCreate()/ /創建一個火花DataFrame組成的高和低的溫度/ /通過機場代碼和日期。瓦爾模式=StructType(數組(StructField(“AirportCode”,StringType,假),StructField(“日期”,DateType,假),StructField(“TempHighF”,IntegerType,假),StructField(“TempLowF”,IntegerType,假)))瓦爾數據=列表(行(“BLI”,日期。返回對象的值(“2021-04-03”),52,43),行(“BLI”,日期。返回對象的值(“2021-04-02”),50,38),行(“BLI”,日期。返回對象的值(“2021-04-01”),52,41),行(“PDX”,日期。返回對象的值(“2021-04-03”),64年,45),行(“PDX”,日期。返回對象的值(“2021-04-02”),61年,41),行(“PDX”,日期。返回對象的值(“2021-04-01”),66年,39),行(“海”,日期。返回對象的值(“2021-04-03”),57,43),行(“海”,日期。返回對象的值(“2021-04-02”),54,39),行(“海”,日期。返回對象的值(“2021-04-01”),56,41))瓦爾抽樣=火花。sparkContext。makeRDD(數據)瓦爾臨時工=火花。createDataFrame(抽樣,模式)/ /創建一個表的數據磚集群,然後填滿/ /表DataFrame的內容。/ /從先前的運行,如果表已經存在/ /先刪除它。火花。sql(“使用默認”)火花。sql(“如果存在demo_temps_table”刪除表)臨時工。寫。saveAsTable(“demo_temps_table”)/ /查詢磚集群上的表,返回的行/ /在機場代碼不是BLI和日期晚/ /比2021-04-01。組織和秩序的結果高/ /溫度按照降序排列。瓦爾df_temps=火花。sql(“從demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”)df_temps。顯示()/ /結果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被刪除的表數據磚集群。火花。sql(“DROP TABLE demo_temps_table”)}}
使用依賴關係
通常你的主類或Python文件將有其他依賴jar文件和文件。您可以添加這樣的依賴jar文件和文件通過調用sparkContext.addJar (“path-to-the-jar”)
orgydF4y2BasparkContext.addPyFile(文件路徑)
。你也可以加入雞蛋和zip文件的文件addPyFile ()
接口。每次你在IDE運行代碼,依賴jar文件和文件在集群上安裝。
從自由進口噴火從pyspark.sql進口SparkSession火花=SparkSession。構建器。getOrCreate()sc=火花。sparkContext# sc.setLogLevel(“信息”)打印(“測試簡單的計數”)打印(火花。範圍(One hundred.)。數())打印(“測試addPyFile隔離”)sc。addPyFile(“lib.py”)打印(sc。並行化(範圍(10))。地圖(λ我:噴火(2))。收集())類噴火(對象):def__init__(自我,x):自我。x=x
Python + Java udf
從pyspark.sql進口SparkSession從pyspark.sql.column進口_to_java_column,_to_seq,列# #在這個例子中,udf。jar包含編譯後的Java / Scala udf的:#包com.example##進口org.apache.spark.sql._#進口org.apache.spark.sql.expressions._#進口org.apache.spark.sql.functions.udf##{對象測試# val plusOne: UserDefinedFunction = udf((我:長)= > i + 1)#}火花=SparkSession。構建器\。配置(“spark.jars”,“/道路/ / udf.jar”)\。getOrCreate()sc=火花。sparkContextdefplus_one_udf(上校):f=sc。_jvm。com。例子。測試。plusOne()返回列(f。應用(_to_seq(sc,(上校),_to_java_column)))sc。_jsc。addJar(“/道路/ / udf.jar”)火花。範圍(One hundred.)。withColumn(“plusOne”,plus_one_udf(“id”))。顯示()
包com。例子進口org。apache。火花。sql。SparkSession情況下類噴火(x:字符串)對象測試{def主要(arg遊戲:數組(字符串):單位={瓦爾火花=SparkSession。構建器()…。getOrCreate();火花。sparkContext。setLogLevel(“信息”)println(“運行簡單的顯示查詢……”)火花。讀。格式(“鋪”)。負載(“/ tmp / x”)。顯示()println(“運行簡單的UDF查詢……”)火花。sparkContext。addJar(”。/目標/ scala - 2.11 / hello-world_2.11-1.0.jar”)火花。udf。注冊(“f”,(x:Int)= >x+1)火花。範圍(10)。selectExpr(“f (id)”)。顯示()println(“運行定製對象查詢……”)瓦爾obj=火花。sparkContext。並行化(Seq(噴火(“再見”),噴火(“嗨”)))。收集()println(obj。toSeq)}}
訪問DBUtils
您可以使用dbutils.fs
和dbutils.secrets
公用事業的磚公用事業模塊。支持命令dbutils.fs.cp
,dbutils.fs.head
,dbutils.fs.ls
,dbutils.fs.mkdirs
,dbutils.fs.mv
,dbutils.fs.put
,dbutils.fs.rm
,dbutils.secrets.get
,dbutils.secrets.getBytes
,dbutils.secrets.list
,dbutils.secrets.listScopes
。看到文件係統實用程序(dbutils.fs)或運行dbutils.fs.help ()
和秘密效用(dbutils.secrets)或運行dbutils.secrets.help ()
。
從pyspark.sql進口SparkSession從pyspark.dbutils進口DBUtils火花=SparkSession。構建器。getOrCreate()dbutils=DBUtils(火花)打印(dbutils。fs。ls(“dbfs: /))打印(dbutils。秘密。listScopes())
使用磚運行時的7.3 LTS以上時,訪問DBUtils模塊的方式在當地或磚集群的工作,使用以下get_dbutils ()
:
defget_dbutils(火花):從pyspark.dbutils進口DBUtils返回DBUtils(火花)
否則,使用以下get_dbutils ()
:
defget_dbutils(火花):如果火花。相依。得到(“spark.databricks.service.client.enabled”)= =“真正的”:從pyspark.dbutils進口DBUtils返回DBUtils(火花)其他的:進口IPython返回IPython。get_ipython()。user_ns(“dbutils”]
瓦爾dbutils=com。磚。服務。DBUtilsprintln(dbutils。fs。ls(“dbfs: /))println(dbutils。秘密。listScopes())
訪問Hadoop文件係統
您也可以直接訪問DBFS使用標準的Hadoop文件係統接口:
>進口org。apache。hadoop。fs。_/ /得到新的DBFS連接>瓦爾dbfs=文件係統。得到(火花。sparkContext。hadoopConfiguration)dbfs:org。apache。hadoop。fs。文件係統=com。磚。後端。守護進程。數據。客戶端。DBFS@二維036335年/ /列表文件>dbfs。listStatus(新路徑(“dbfs: /))res1:數組(org。apache。hadoop。fs。FileStatus]=數組(FileStatus{路徑=dbfs:/美元;isDirectory=真正的;…})/ /打開文件>瓦爾流=dbfs。開放(新路徑(“dbfs: /道路/ / your_file”))流:org。apache。hadoop。fs。FSDataInputStream=org。apache。hadoop。fs。FSDataInputStream@7aa4ef24/ /獲取文件內容為字符串>進口org。apache。下議院。io。_>println(新字符串(IOUtils。toByteArray(流)))
Hadoop設置配置
在客戶端可以設置使用Hadoop的配置spark.conf.set
API,它適用於SQL和DataFrame操作。Hadoop配置設置sparkContext
必須在集群配置中設置或使用一個筆記本。這是因為配置設置sparkContext
不與用戶會話,但適用於整個集群。
故障排除
運行databricks-connect測試
檢查連接問題。本節介紹您可能遇到的一些常見問題,以及如何解決它們。
Python版本不匹配
檢查您使用的Python版本當地至少有相同的小版本版本在集群上(例如,3.5.1
與3.5.2
是好的,3.5
與3.6
不是)。
如果你有多個Python版本安裝在本地,確保磚使用正確的連接是通過設置PYSPARK_PYTHON
環境變量(例如,PYSPARK_PYTHON = python3
)。
服務器未啟用
確保集群火花服務器啟用了spark.databricks.service.server.enabled真正的
。您應該看到下麵的線在司機日誌如果它是:
18/10/25 21:39:18信息SparkConfUtils美元:設置火花配置:spark.databricks.service.server。啟用- >真實……18/10/25 21:39:21信息SparkContext:加載火花服務RPC服務器18/10/25 21:39:21信息SparkServiceRPCServer:火花服務RPC服務器開始18/10/25 21:39:21信息服務器:jetty-9.3.20。v20170531 18/10/25 21:39:21信息AbstractConnector:開始ServerConnector@6a6c7f42 {HTTP / 1.1, (HTTP / 1.1)}{0.0.0.0:15001} 18/10/25 21:39:21信息服務器:@5879ms開始
衝突PySpark安裝
的databricks-connect
包與PySpark衝突。安裝兩個初始化時將導致錯誤引發上下文在Python中。這可以體現在幾個方麵,包括“流破壞”或“找不到”的錯誤。如果你有PySpark安裝到您的Python環境,確保安裝databricks-connect之前卸載。卸載PySpark之後,一定要完全重新安裝磚連接的包:
pip卸載pyspark pip卸載databricks-connect pip安裝- u“databricks-connect = = 9.1 *”。#或X.Y.*來match your cluster version.
相互衝突的SPARK_HOME
如果您以前使用過火花機,IDE可以配置為使用一個其他版本的火花而不是磚連接的火花。這可以體現在幾個方麵,包括“流破壞”或“找不到”的錯誤。你可以看到哪個版本的火花被檢查的價值SPARK_HOME
環境變量:
係統。出。println(係統。采用(“SPARK_HOME”));
進口操作係統打印(操作係統。環境(“SPARK_HOME”])
println(sys。env。得到(“SPARK_HOME”))
衝突或失蹤路徑
二進製文件的條目
可以配置路徑,這樣的命令spark-shell
將運行其他之前安裝的二進製代替磚提供的一個連接。這可能會導致databricks-connect測試
失敗。你應該確保優先考慮磚連接的二進製文件,或刪除之前安裝的。
如果你不能運行命令spark-shell
,也有可能你的路徑並不是自動建立的皮普安裝
,你將需要添加安裝本
手動dir到您的路徑。可以使用磚與ide,即使這不是設置。然而,databricks-connect測試
命令將不能正常工作。
衝突的序列化設置在集群上
如果你看到“流損壞”運行時錯誤databricks-connect測試
,這可能是由於不兼容的集群序列化配置。例如,設置spark.io.compression.codec
配置會導致這個問題。為了解決這個問題,考慮從集群移除這些配置設置,或設置配置在磚連接的客戶端。
找不到winutils.exe
在Windows上
如果您使用的是磚連接在Windows上看:
錯誤殼牌:失敗的來定位的winutils二進製在的hadoop二進製路徑java。io。IOException:可以不定位可執行的零\本\winutils。exe在的Hadoop二進製文件。
按照指示在Windows上配置Hadoop路徑。
限製
磚連接不支持下麵的磚的特點和第三方平台:Beplay体育安卓版本
結構化的流。
運行任意代碼不是一個遠程集群上火花工作的一部分。
本機Scala、Python和Rδ表操作的api(例如,
DeltaTable.forPath
不支持)。然而,SQL API (spark.sql (…)
)和三角洲湖操作和火花的API(例如,spark.read.load
三角洲表上)都支持。進入副本。
Apache飛艇0.7。x和是low.
連接到集群訪問控製表。
連接到集群處理隔離(換句話說,啟用
spark.databricks.pyspark.enableProcessIsolation
被設置為真正的
)。δ
克隆
SQL命令。全局臨時視圖。
考拉。
創建表表作為選擇…
SQL命令並不總是工作。相反,使用spark.sql(“選擇……”).write.saveAsTable(“表”)
。