磚連接

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

磚允許您連接到連接你最喜歡的IDE (Eclipse, IntelliJ、PyCharm RStudio, Visual Studio代碼),筆記本電腦服務器(Jupyter筆記本,飛艇),和其他自定義應用程序數據磚集群。

本文解釋了磚連接是如何工作的,走你通過與磚連接的步驟開始,解釋如何解決可能出現的問題在使用磚連接時,使用磚和不同運行連接和運行在一個磚筆記本。

概述

磚是磚的客戶端庫運行時連接。它允許您使用火花api編寫工作和遠程數據磚集群上運行它們,而不是在當地引發會話。

例如,當您運行DataFrame命令spark.read.format(“鋪”).load (…) .groupBy (…) .agg(…),告訴()使用磚連接,解析和規劃工作的運行在本地機器上。然後,工作的邏輯表示發送到火花集群中的服務器運行在磚執行。

磚連接,您可以:

  • 從任何Python運行大規模刺激就業,Java, Scala,或R應用程序。任何你可以進口pyspark,進口org.apache.spark,或要求(SparkR)現在,您可以運行火花工作直接從您的應用程序,而不需要安裝任何IDE插件或使用火花提交腳本。

  • 單步調試和調試代碼在IDE甚至在處理遠程集群。

  • 快速迭代開發庫。您不需要重新啟動集群在磚連接改變Python和Java庫依賴關係後,因為每個客戶機會話集群中的相互隔離。

  • 關閉閑置集群沒有失去工作。因為客戶端應用程序是與集群脫鉤,這是影響集群重啟或升級,這通常會導致你失去所有的變量,抽樣和DataFrame對象定義在一個筆記本上。

請注意

Python開發的SQL查詢,磚建議您使用Python的磚SQL的連接器而不是磚連接。Python的磚SQL的連接器是比磚更容易建立連接。同時,磚連接解析和計劃工作在本地機器上運行,運行在遠程計算資源而工作。這可以讓它尤其難以調試運行時錯誤。Python的磚SQL連接器直接提交SQL查詢遠程計算資源和獲取結果。

需求

  • 隻有以下磚運行時版本的支持:

    • 磚運行時10.4 LTS ML,磚LTS 10.4運行時

    • 磚運行時9.1 LTS ML,磚LTS 9.1運行時

    • 磚運行時7.3 LTS ML,磚LTS 7.3運行時

  • 小版本的Python安裝客戶端必須一樣的小磚集群的Python版本。表顯示了Python版本與每個磚安裝運行時。

    磚的運行時版本的

    Python版本

    LTS 10.4 LTS ML, 10.4

    3.8

    LTS 9.1 LTS ML, 9.1

    3.8

    LTS 7.3 LTS ML, 7.3

    3.7

    例如,如果您正在使用Conda本地開發環境和集群上運行Python 3.7中,您必須創建一個環境版本,例如:

    dbconnect conda創建——名稱python=37 conda
  • 磚連接主要和次要的包的版本必須匹配你的磚運行時版本。磚建議你總是使用最新的包磚連接相匹配你的磚的運行時版本。例如,當使用磚運行時7.3 LTS集群,使用databricks-connect = = 7.3 . *包中。

    請注意

    看到磚連接的發布說明可用數據磚連接的列表發布和維護更新。

  • Java運行時環境(JRE) 8。客戶測試了OpenJDK 8 JRE。客戶端不支持Java 11。

請注意

在Windows上,如果你看到一個錯誤,磚連接找不到winutils.exe,請參閱找不到winutils。exe在Windows上

設置客戶端

請注意

在你開始之前建立磚連接的客戶端,您必須符合要求的磚的連接。

步驟1:安裝客戶端

  1. 卸載PySpark。這是必需的,因為databricks-connect包與PySpark衝突。有關詳細信息,請參見衝突PySpark安裝

    皮普卸載pyspark
  2. 安裝磚連接的客戶端。

    pip安裝- u“databricks-connect = = 7.3 *”。#或X.Y.*來match your cluster version.

    請注意

    總是指定databricks-connect = = X.Y. *而不是databricks-connect = X.Y,確保最新的安裝包。

步驟2:配置連接屬性

  1. 收集以下配置屬性:

    • 工作空間的URL

    • 個人訪問令牌

    • 集群創建的ID。你可以從URL獲取集群ID。這裏的集群ID0304 - 201045 xxxxxxxx

      集群ID
    • 磚連接的連接的端口。設置為15001年

  2. 配置連接。您可以使用CLI、SQL配置或環境變量。從最高到最低配置方法的優先級是:SQL配置鑰匙,CLI和環境變量。

    • CLI

      1. 運行databricks-connect

        databricks-connect配置

        許可證將顯示:

        版權(2018年),公司圖書館(“軟件”)可能使用除了連接被許可方的磚的使用平台服務依照達成協議Beplay体育安卓版本
      2. 接受許可和供應配置值。為磚的主機磚的令牌,輸入工作區URL和個人訪問令牌您在步驟1中指出。

        你接受以上協議嗎?[y / N] y設置新的配置值(離開輸入空接受默認):磚主機(目前沒有價值,必須從https://]: < databricks-url >磚令牌(沒有當前值):< databricks-token >集群ID(例如,0921 - 001415 jelly628)(沒有當前值):< cluster-id > Org ID (Azure-only,看到了嗎? o = orgId URL) [0]: < org-id >端口[15001]:<口>
    • SQL配置或環境變量。下麵的表顯示了SQL配置鍵和對應的環境變量配置屬性您在步驟1中指出。設置SQL配置鍵,使用sql(“集配置=值”)。例如:sql(“集spark.databricks.service.clusterId = 0304 - 201045 - abcdefgh”)

      參數

      SQL配置關鍵

      環境變量名稱

      磚的主機

      spark.databricks.service.address

      DATABRICKS_ADDRESS

      磚的令牌

      spark.databricks.service.token

      DATABRICKS_API_TOKEN

      集群ID

      spark.databricks.service.clusterId

      DATABRICKS_CLUSTER_ID

      Org ID

      spark.databricks.service.orgId

      DATABRICKS_ORG_ID

      港口

      spark.databricks.service.port

      DATABRICKS_PORT

  3. 測試連接數據磚。

    databricks-connect測試

    如果您配置的集群沒有運行,測試配置的集群將繼續運行,直到它開始autotermination時間。輸出應該類似:

    * PySpark是安裝在/…/ 3.5.6 / lib / python3.5 /網站/ PySpark *檢查java版本的java版本“1.8.0_152”java (TM)(構建1.8.0_152-b16) java SE運行時環境熱點(TM) 64位服務器虛擬機(構建25.152 b16轉椅,混合模式)*測試scala命令18/12/10 16:38:44 NativeCodeLoader警告:無法加載native-hadoop庫為您的平台……Beplay体育安卓版本使用builtin-java類,適用的使用引發的違約log4j配置文件:org/apache/spark/log4j-defaults。屬性默認日誌級別設置為“警告”。調整日誌級別使用sc.setLogLevel(中的)。對於SparkR,使用setLogLevel(中的)。18/12/10 16:38:50警告MetricsSystem:使用默認名稱SparkStatusTracker因為無論是spark.metrics.namespace還是spark.app來源。我d is set. 18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://10.8.5.214:4040 Spark context available as 'sc' (master = local[*], app id = local-1544488730553). Spark session available as 'spark'. View job details at /?o=0#/setting/clusters//sparkUi View job details at ?o=0#/setting/clusters//sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at /?o=0#/setting/clusters//sparkUi

設置您的IDE或筆記本電腦服務器

部分描述了如何配置您的首選IDE或筆記本電腦服務器使用磚連接的客戶端。

Jupyter筆記本

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

磚連接配置腳本自動添加包到您的項目配置。開始一個Python內核中運行:

pyspark.sql進口SparkSession火花=SparkSession構建器getOrCreate()

要啟用%的sql簡稱跑步和可視化的SQL查詢,使用以下代碼片段:

IPython.core.magic進口line_magic,line_cell_magic,魔法,magics_class@magics_classDatabricksConnectMagics(魔法):@line_cell_magicdefsql(自我,,細胞=沒有一個):如果細胞:提高ValueError(細胞魔法”“線必須是空的,)試一試:autovizwidget.widget.utils進口display_dataframe除了ImportError:打印(“請運行“pip安裝autovizwidget”啟用可視化部件。”)display_dataframe=λx:x返回display_dataframe(自我get_spark()sql(細胞orgydF4y2Ba)toPandas())defget_spark(自我):user_ns=get_ipython()user_ns如果“火花”user_ns:返回user_ns(“火花”]其他的:pyspark.sql進口SparkSessionuser_ns(“火花”]=SparkSession構建器getOrCreate()返回user_ns(“火花”]知識產權=get_ipython()知識產權register_magics(DatabricksConnectMagics)

PyCharm

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

磚連接配置腳本自動添加包到您的項目配置。

Python 3集群

  1. 當你創建一個PyCharm項目,選擇現有的翻譯。從下拉菜單中,選擇您創建(參見Conda環境需求)。

    選擇翻譯
  2. Run >編輯配置

  3. 添加PYSPARK_PYTHON = python3作為一個環境變量。

    Python 3集群配置

SparkR和RStudio桌麵

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

  1. 下載並解壓開源的火花到您的本地機器上。選擇相同的版本在你的磚集群(Hadoop 2.7)。

  2. 運行databricks-connectget-jar-dir。這個命令返回一個路徑/usr/local/lib/python3.5/dist-packages / pyspark / jar。複製的文件路徑上麵一個目錄例如,JAR目錄文件路徑/usr/local/lib/python3.5/dist-packages / pyspark,這是SPARK_HOME目錄中。

  3. 配置火花自由路徑和火花家裏通過將它們添加到你的R腳本。集< spark-lib-path >你打開的目錄開源火花包在步驟1中。集< spark-home-path >從步驟2磚連接目錄。

    #指向OSS包路徑,例如,/ / /…/ spark-2.4.0-bin-hadoop2.7道路圖書館(SparkR,lib.loc=.libPaths(c(file.path(“< spark-lib-path >”,“R”,“自由”),.libPaths())))#點磚PySpark連接安裝,例如,/ / /…/ PySpark路徑Sys.setenv(SPARK_HOME=“< spark-home-path >”)
  4. 發起一個火花會話並開始運行SparkR命令。

    sparkR.session()df< -as.DataFrame(忠實的)(df)df1< -有斑紋的(df,函數(x){x},模式(df))收集(df1)

sparklyr和RStudio桌麵

預覽

這個特性是在公共預覽

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

你可以複製sparklyr-dependent代碼開發本地使用磚連接並運行它在你的磚磚筆記本或RStudio托管服務器的工作區以最小的不需要修改代碼。

需求

  • sparklyr 1.2或以上。

  • 與匹配磚磚運行時的7.3或以上連接。

安裝、配置和使用sparklyr

  1. 在RStudio桌麵,從凹口安裝sparklyr 1.2或以上或從GitHub安裝最新的主版本。

    #安裝從凹口install.packages(“sparklyr”)從GitHub或安裝最新的主版本號install.packages(“devtools”)devtools::install_github(“sparklyr / sparklyr”)
  2. 激活Python環境磚連接安裝和運行以下命令在終端得到< spark-home-path >:

    databricks-connect get-spark-home
  3. 發起一個火花會話並開始運行sparklyr命令。

    圖書館(sparklyr)sc< -spark_connect(方法=“磚”,spark_home=“< spark-home-path >”)iris_tbl< -copy_to(sc,虹膜,覆蓋=真正的)圖書館(dplyr)src_tbls(sc)iris_tbl% > %
  4. 關閉連接。

    spark_disconnect(sc)

資源

有關更多信息,請參見sparklyr GitHub自述

代碼示例,請參閱sparklyr

sparklyr和RStudio桌麵的局限性

不支持以下特性:

  • sparklyr流api

  • sparklyr毫升api

  • 掃帚api

  • csv_file序列化方式

  • 火花提交

IntelliJ (Scala或Java)

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

  1. 運行databricks-connectget-jar-dir

  2. 的依賴關係指向的目錄返回的命令。去文件> >項目結構>模塊依賴項>“+”符號> jar或目錄

    IntelliJ罐子

    為了避免衝突,我們強烈建議刪除任何其他火花從類路徑中安裝。如果這是不可能的,確保jar添加在前麵的類路徑中。特別是,他們必須提前安裝其他版本的火花(否則你會使用其中一個其他火花版本和本地運行或扔一個ClassDefNotFoundError)。

  3. 檢查在IntelliJ突破的設置選項。默認值是所有並將導致網絡超時如果你設置斷點調試。將其設置為線程為了避免停止網絡後台線程。

    IntelliJ線程

Eclipse

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

  1. 運行databricks-connectget-jar-dir

  2. 點的外部jar配置目錄返回的命令。去圖書館項目菜單>屬性> Java構建路徑> >添加外部jar

    Eclipse外部JAR配置

    為了避免衝突,我們強烈建議刪除任何其他火花從類路徑中安裝。如果這是不可能的,確保jar添加在前麵的類路徑中。特別是,他們必須提前安裝其他版本的火花(否則你會使用其中一個其他火花版本和本地運行或扔一個ClassDefNotFoundError)。

    Eclipse火花配置

Visual Studio代碼

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

  1. 驗證Python擴展安裝。

  2. 打開命令麵板(命令+ Shift + P在macOS和Ctrl + Shift + P在Windows / Linux)。

  3. 選擇一個Python解釋器。去代碼> Preferences >設置,並選擇python的設置

  4. 運行databricks-connectget-jar-dir

  5. 從命令返回的目錄添加到用戶設置JSONpython.venvPath。這應該被添加到Python配置。

  6. 禁用短絨。單擊在右邊編輯json設置。修改後的設置如下:

    VS代碼配置
  7. 如果與一個虛擬環境中運行,這是推薦的方式為Python開發在VS代碼中,在命令麵板類型選擇python翻譯並指向您的環境匹配集群的Python版本。

    選擇Python解釋器

    例如,如果您的集群是Python 3.5,你的當地環境應該是Python 3.5。

    Python版本

SBT

請注意

磚建議你使用dbx由磚實驗室為當地的發展而不是磚連接。磚不計劃新功能開發的磚連接。還要留意的限製磚的連接。

在你開始使用磚連接之前,必須符合要求的設置客戶端磚的連接。

使用SBT,您必須配置您的build.sbt文件鏈接對磚的連接罐而不是通常的火花庫依賴關係。你這樣做的unmanagedBase下麵的示例構建文件指令,假設一個Scala應用的com.example.Test主要對象:

build.sbt

名稱:= " hello world "版本:= " 1.0 " scalaVersion: = " 2.11.6 " / /這應該被設置為“返回的路徑databricks-connect get-jar-dir“unmanagedBase: = new java.io.File (“/ usr /地方/ lib / python2.7 / dist-packages pyspark / jars”) mainClass: =一些(“com.example.Test”)

從IDE運行示例

進口java.util.ArrayList;進口並不知道;進口java.sql.Date;進口org.apache.spark.sql.SparkSession;進口org.apache.spark.sql.types。*;進口org.apache.spark.sql.Row;進口org.apache.spark.sql.RowFactory;進口org.apache.spark.sql.Dataset;公共應用程序{公共靜態無效主要(字符串[]arg遊戲)拋出異常{SparkSession火花=SparkSession構建器()瀏覽器名稱(“臨時工演示”)配置(“spark.master”,“本地”)getOrCreate();/ /創建一個火花DataFrame組成的高和低的溫度/ /通過機場代碼和日期。StructType模式=StructType(StructField[]{StructField(“AirportCode”,數據類型StringType,,元數據()),StructField(“日期”,數據類型DateType,,元數據()),StructField(“TempHighF”,數據類型IntegerType,,元數據()),StructField(“TempLowF”,數據類型IntegerType,,元數據()),});列表<>dataList=ArrayList<>();dataList添加(RowFactory創建(“BLI”,日期返回對象的值(“2021-04-03”),52,43));dataList添加(RowFactory創建(“BLI”,日期返回對象的值(“2021-04-02”),50,38));dataList添加(RowFactory創建(“BLI”,日期返回對象的值(“2021-04-01”),52,41));dataList添加(RowFactory創建(“PDX”,日期返回對象的值(“2021-04-03”),64年,45));dataList添加(RowFactory創建(“PDX”,日期返回對象的值(“2021-04-02”),61年,41));dataList添加(RowFactory創建(“PDX”,日期返回對象的值(“2021-04-01”),66年,39));dataList添加(RowFactory創建(“海”,日期返回對象的值(“2021-04-03”),57,43));dataList添加(RowFactory創建(“海”,日期返回對象的值(“2021-04-02”),54,39));dataList添加(RowFactory創建(“海”,日期返回對象的值(“2021-04-01”),56,41));數據集<>臨時工=火花createDataFrame(dataList,模式);/ /創建一個表的數據磚集群,然後填滿/ /表DataFrame的內容。/ /從先前的運行,如果表已經存在/ /先刪除它。火花sql(“使用默認”);火花sql(“如果存在demo_temps_table”刪除表);臨時工()。saveAsTable(“demo_temps_table”);/ /查詢磚集群上的表,返回的行/ /在機場代碼不是BLI和日期晚/ /比2021-04-01。組織和秩序的結果高/ /溫度按照降序排列。數據集<>df_temps=火花sql(“從demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”);df_temps顯示();/ /結果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被刪除的表數據磚集群。火花sql(“DROP TABLE demo_temps_table”);}}
pyspark.sql進口SparkSessionpyspark.sql.types進口*datetime進口日期火花=SparkSession構建器瀏覽器名稱(“temps-demo”)getOrCreate()#創建一個火花DataFrame組成的高和低的溫度#機場代碼和日期。模式=StructType([StructField(“AirportCode”,StringType(),),StructField(“日期”,DateType(),),StructField(“TempHighF”,IntegerType(),),StructField(“TempLowF”,IntegerType(),)])數據=((“BLI”,日期(2021年,4,3),52,43),(“BLI”,日期(2021年,4,2),50,38),(“BLI”,日期(2021年,4,1),52,41),(“PDX”,日期(2021年,4,3),64年,45),(“PDX”,日期(2021年,4,2),61年,41),(“PDX”,日期(2021年,4,1),66年,39),(“海”,日期(2021年,4,3),57,43),(“海”,日期(2021年,4,2),54,39),(“海”,日期(2021年,4,1),56,41]]臨時工=火花createDataFrame(數據,模式)#磚集群的創建一個表,然後填滿# DataFrame的表的內容。#從先前的運行,如果表已經存在#刪除它。火花sql(使用默認的)火花sql(“刪除表如果存在demo_temps_table”)臨時工saveAsTable(“demo_temps_table”)#查詢磚集群上的表,返回的行#在機場代碼不是BLI和日期晚比2021-04-01 #。組織和秩序的結果高#溫度按照降序排列。df_temps=火花sql(“從demo_temps_table SELECT *”\“AirportCode ! = BLI和日期>‘2021-04-01’”\“GROUP BY AirportCode,日期、TempHighF TempLowF”\“TempHighF DESC秩序”)df_temps顯示()#結果:## + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +# | AirportCode | |日期TempHighF | TempLowF |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +45 # | PDX | 64 | 2021-04-03 | |# | PDX | 61 | 2021-04-02 | 41 |43 57 #海| | 2021-04-03 | | |54 #海| | 2021-04-02 | | |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +#清理被刪除的表數據磚集群。火花sql(“DROP TABLE demo_temps_table”)
進口orgapache火花sqlSparkSession進口orgapache火花sql類型_進口orgapache火花sql進口javasql日期對象演示{def主要(arg遊戲:數組(字符串]){瓦爾火花=SparkSession構建器(“本地”)。getOrCreate()/ /創建一個火花DataFrame組成的高和低的溫度/ /通過機場代碼和日期。瓦爾模式=StructType(數組(StructField(“AirportCode”,StringType,),StructField(“日期”,DateType,),StructField(“TempHighF”,IntegerType,),StructField(“TempLowF”,IntegerType,)))瓦爾數據=列表((“BLI”,日期返回對象的值(“2021-04-03”),52,43),(“BLI”,日期返回對象的值(“2021-04-02”),50,38),(“BLI”,日期返回對象的值(“2021-04-01”),52,41),(“PDX”,日期返回對象的值(“2021-04-03”),64年,45),(“PDX”,日期返回對象的值(“2021-04-02”),61年,41),(“PDX”,日期返回對象的值(“2021-04-01”),66年,39),(“海”,日期返回對象的值(“2021-04-03”),57,43),(“海”,日期返回對象的值(“2021-04-02”),54,39),(“海”,日期返回對象的值(“2021-04-01”),56,41))瓦爾抽樣=火花sparkContextmakeRDD(數據)瓦爾臨時工=火花createDataFrame(抽樣,模式)/ /創建一個表的數據磚集群,然後填滿/ /表DataFrame的內容。/ /從先前的運行,如果表已經存在/ /先刪除它。火花sql(“使用默認”)火花sql(“如果存在demo_temps_table”刪除表)臨時工saveAsTable(“demo_temps_table”)/ /查詢磚集群上的表,返回的行/ /在機場代碼不是BLI和日期晚/ /比2021-04-01。組織和秩序的結果高/ /溫度按照降序排列。瓦爾df_temps=火花sql(“從demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”)df_temps顯示()/ /結果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被刪除的表數據磚集群。火花sql(“DROP TABLE demo_temps_table”)}}

使用依賴關係

通常你的主類或Python文件將有其他依賴jar文件和文件。您可以添加這樣的依賴jar文件和文件通過調用sparkContext.addJar (“path-to-the-jar”)orgydF4y2BasparkContext.addPyFile(文件路徑)。你也可以加入雞蛋和zip文件的文件addPyFile ()接口。每次你在IDE運行代碼,依賴jar文件和文件在集群上安裝。

自由進口噴火pyspark.sql進口SparkSession火花=SparkSession構建器getOrCreate()sc=火花sparkContext# sc.setLogLevel(“信息”)打印(“測試簡單的計數”)打印(火花範圍(One hundred.)())打印(“測試addPyFile隔離”)scaddPyFile(“lib.py”)打印(sc並行化(範圍(10))地圖(λ:噴火(2))收集())噴火(對象):def__init__(自我,x):自我x=x

Python + Java udf

pyspark.sql進口SparkSessionpyspark.sql.column進口_to_java_column,_to_seq,# #在這個例子中,udf。jar包含編譯後的Java / Scala udf的:#包com.example##進口org.apache.spark.sql._#進口org.apache.spark.sql.expressions._#進口org.apache.spark.sql.functions.udf##{對象測試# val plusOne: UserDefinedFunction = udf((我:長)= > i + 1)#}火花=SparkSession構建器\配置(“spark.jars”,“/道路/ / udf.jar”)\getOrCreate()sc=火花sparkContextdefplus_one_udf(上校):f=sc_jvmcom例子測試plusOne()返回(f應用(_to_seq(sc,(上校),_to_java_column)))sc_jscaddJar(“/道路/ / udf.jar”)火花範圍(One hundred.)withColumn(“plusOne”,plus_one_udf(“id”))顯示()
com例子進口orgapache火花sqlSparkSession情況下噴火(x:字符串)對象測試{def主要(arg遊戲:數組(字符串):單位={瓦爾火花=SparkSession構建器()getOrCreate();火花sparkContextsetLogLevel(“信息”)println(“運行簡單的顯示查詢……”)火花格式(“鋪”)。負載(“/ tmp / x”)。顯示()println(“運行簡單的UDF查詢……”)火花sparkContextaddJar(”。/目標/ scala - 2.11 / hello-world_2.11-1.0.jar”)火花udf注冊(“f”,(x:Int)= >x+1)火花範圍(10)。selectExpr(“f (id)”)。顯示()println(“運行定製對象查詢……”)瓦爾obj=火花sparkContext並行化(Seq(噴火(“再見”),噴火(“嗨”)))。收集()println(objtoSeq)}}

訪問DBUtils

您可以使用dbutils.fsdbutils.secrets公用事業的磚公用事業模塊。支持命令dbutils.fs.cp,dbutils.fs.head,dbutils.fs.ls,dbutils.fs.mkdirs,dbutils.fs.mv,dbutils.fs.put,dbutils.fs.rm,dbutils.secrets.get,dbutils.secrets.getBytes,dbutils.secrets.list,dbutils.secrets.listScopes。看到文件係統實用程序(dbutils.fs)或運行dbutils.fs.help ()秘密效用(dbutils.secrets)或運行dbutils.secrets.help ()

pyspark.sql進口SparkSessionpyspark.dbutils進口DBUtils火花=SparkSession構建器getOrCreate()dbutils=DBUtils(火花)打印(dbutilsfsls(“dbfs: /))打印(dbutils秘密listScopes())

使用磚運行時的7.3 LTS以上時,訪問DBUtils模塊的方式在當地或磚集群的工作,使用以下get_dbutils ():

defget_dbutils(火花):pyspark.dbutils進口DBUtils返回DBUtils(火花)

否則,使用以下get_dbutils ():

defget_dbutils(火花):如果火花相依得到(“spark.databricks.service.client.enabled”)= =“真正的”:pyspark.dbutils進口DBUtils返回DBUtils(火花)其他的:進口IPython返回IPythonget_ipython()user_ns(“dbutils”]
瓦爾dbutils=com服務DBUtilsprintln(dbutilsfsls(“dbfs: /))println(dbutils秘密listScopes())

本地和遠程文件係統之間複製文件

您可以使用dbutils.fs您的客戶端和遠程文件係統之間複製文件。計劃文件:/指的是在客戶端本地文件係統。

pyspark.dbutils進口DBUtilsdbutils=DBUtils(火花)dbutilsfscp(“文件:/ home / user / data.csv”,“dbfs: /上傳”)dbutilsfscp(“dbfs: / / results.csv輸出”,下載的文件:/ home / user / / ')

最大文件大小,可以轉移方式是250 MB。

啟用dbutils.secrets.get

由於安全限製,調用的能力dbutils.secrets.get默認情況下是禁用的。接觸磚支持您的工作區來啟用這個特性。

訪問Hadoop文件係統

您也可以直接訪問DBFS使用標準的Hadoop文件係統接口:

>進口orgapachehadoopfs_/ /得到新的DBFS連接>瓦爾dbfs=文件係統得到(火花sparkContexthadoopConfiguration)dbfs:orgapachehadoopfs文件係統=com後端守護進程數據客戶端DBFS@二維036335年/ /列表文件>dbfslistStatus(路徑(“dbfs: /))res1:數組(orgapachehadoopfsFileStatus]=數組(FileStatus{路徑=dbfs:/美元;isDirectory=真正的;…})/ /打開文件>瓦爾=dbfs開放(路徑(“dbfs: /道路/ / your_file”)):orgapachehadoopfsFSDataInputStream=orgapachehadoopfsFSDataInputStream@7aa4ef24/ /獲取文件內容為字符串>進口orgapache下議院io_>println(字符串(IOUtilstoByteArray()))

Hadoop設置配置

在客戶端可以設置使用Hadoop的配置spark.conf.setAPI,它適用於SQL和DataFrame操作。Hadoop配置設置sparkContext必須在集群配置中設置或使用一個筆記本。這是因為配置設置sparkContext不與用戶會話,但適用於整個集群。

故障排除

運行databricks-connect測試檢查連接問題。本節介紹您可能遇到的一些常見問題,以及如何解決它們。

Python版本不匹配

檢查您使用的Python版本當地至少有相同的小版本版本在集群上(例如,3.5.13.5.2是好的,3.53.6不是)。

如果你有多個Python版本安裝在本地,確保磚使用正確的連接是通過設置PYSPARK_PYTHON環境變量(例如,PYSPARK_PYTHON = python3)。

服務器未啟用

確保集群火花服務器啟用了spark.databricks.service.server.enabled真正的。您應該看到下麵的線在司機日誌如果它是:

18/10/25 21:39:18信息SparkConfUtils美元:設置火花配置:spark.databricks.service.server。啟用- >真實……18/10/25 21:39:21信息SparkContext:加載火花服務RPC服務器18/10/25 21:39:21信息SparkServiceRPCServer:火花服務RPC服務器開始18/10/25 21:39:21信息服務器:jetty-9.3.20。v20170531 18/10/25 21:39:21信息AbstractConnector:開始ServerConnector@6a6c7f42 {HTTP / 1.1, (HTTP / 1.1)}{0.0.0.0:15001} 18/10/25 21:39:21信息服務器:@5879ms開始

衝突PySpark安裝

databricks-connect包與PySpark衝突。安裝兩個初始化時將導致錯誤引發上下文在Python中。這可以體現在幾個方麵,包括“流破壞”或“找不到”的錯誤。如果你有PySpark安裝到您的Python環境,確保安裝databricks-connect之前卸載。卸載PySpark之後,一定要完全重新安裝磚連接的包:

pip卸載pyspark pip卸載databricks-connect pip安裝- u“databricks-connect = = 9.1 *”。#或X.Y.*來match your cluster version.

相互衝突的SPARK_HOME

如果您以前使用過火花機,IDE可以配置為使用一個其他版本的火花而不是磚連接的火花。這可以體現在幾個方麵,包括“流破壞”或“找不到”的錯誤。你可以看到哪個版本的火花被檢查的價值SPARK_HOME環境變量:

係統println(係統采用(“SPARK_HOME”));
進口操作係統打印(操作係統環境(“SPARK_HOME”])
println(sysenv得到(“SPARK_HOME”))

決議

如果SPARK_HOME將另一個版本的火花在客戶端,您應該設置SPARK_HOME變量和再試一次。

檢查您的IDE環境變量設置,你的. bashrc,. zshrc,或. bash_profile文件,和其他環境變量可能。你很可能會退出並重新啟動IDE來清除舊的狀態,甚至你可能需要創建一個新項目,如果問題依然存在。

你不應該需要設置SPARK_HOME一個新值;複位應該足夠了。

衝突或失蹤路徑二進製文件的條目

可以配置路徑,這樣的命令spark-shell將運行其他之前安裝的二進製代替磚提供的一個連接。這可能會導致databricks-connect測試失敗。你應該確保優先考慮磚連接的二進製文件,或刪除之前安裝的。

如果你不能運行命令spark-shell,也有可能你的路徑並不是自動建立的皮普安裝,你將需要添加安裝手動dir到您的路徑。可以使用磚與ide,即使這不是設置。然而,databricks-connect測試命令將不能正常工作。

衝突的序列化設置在集群上

如果你看到“流損壞”運行時錯誤databricks-connect測試,這可能是由於不兼容的集群序列化配置。例如,設置spark.io.compression.codec配置會導致這個問題。為了解決這個問題,考慮從集群移除這些配置設置,或設置配置在磚連接的客戶端。

找不到winutils.exe在Windows上

如果您使用的是磚連接在Windows上看:

錯誤殼牌:失敗的定位winutils二進製hadoop二進製路徑javaioIOException:可以定位可執行的\\winutilsexeHadoop二進製文件

按照指示在Windows上配置Hadoop路徑

文件名、目錄名或卷標在Windows上語法是不正確的

如果您使用的是磚連接在Windows上看:

文件名,目錄的名字,orgydF4y2Ba體積標簽語法不正確的

Java或磚連接被安裝到一個目錄空間在你的路徑。您可以解決通過安裝到一個目錄路徑沒有空間,使用或配置路徑短名稱形式

限製

磚連接不支持下麵的磚的特點和第三方平台:Beplay体育安卓版本

  • 結構化的流。

  • 運行任意代碼不是一個遠程集群上火花工作的一部分。

  • 本機Scala、Python和Rδ表操作的api(例如,DeltaTable.forPath不支持)。然而,SQL API (spark.sql (…))和三角洲湖操作和火花的API(例如,spark.read.load三角洲表上)都支持。

  • 進入副本。

  • Apache飛艇0.7。x和是low.

  • 連接到集群訪問控製表

  • 連接到集群處理隔離(換句話說,啟用spark.databricks.pyspark.enableProcessIsolation被設置為真正的)。

  • δ克隆SQL命令。

  • 全局臨時視圖。

  • 考拉

  • 創建作為選擇SQL命令並不總是工作。相反,使用spark.sql(“選擇……”).write.saveAsTable(“表”)