DBConnect的GeoSpark未定義函數錯誤

在DBConnect會話中使用GeoSpark代碼。

寫的arjun.kaimaparambilrajan

最後發布日期:2022年6月1日

問題

您正在嚐試使用GeoSpark功能st_geofromwkt使用DBConnect (AWS|Azure|GCP),你會得到一個Apache Spark錯誤消息。

分析異常:未定義函數:'st_geomfromwkt'。這個函數既不是注冊的臨時函數,也不是注冊在數據庫'default'中的永久函數。

此示例代碼在使用DBConnect時出現錯誤而失敗。

%scala val sc = spark。sparkContext sc.setLogLevel("DEBUG") val sqlContext = spark。sqlContext spark. sparkcontext . addjar ("~/jars/geospark-sql_2.3-1.2.0.jar") spark. sparkcontext . addjar ("~/jars/geospark-1.2.0.jar")sql("select ST_GeomFromWKT(area) AS geometry from polygon").show()

導致

DBConnect不支持將客戶端udf自動同步到服務器。

解決方案

類在集群上注冊UDF的代碼可以使用自定義實用程序jarSparkSessionExtensions類。

  1. 創建一個實用工具jar,用於注冊GeoSpark函數SparkSessionExtensions.這個實用程序類定義可以構建到實用程序jar中。
    %scala package com.databricks.spark.utils import org.apache.spark.sql.SparkSessionExtensions import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator類GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) {def apply(e: SparkSessionExtensions): Unit = {e.b injectcheckrule (spark => {println(" injection UDF") GeoSparkSQLRegistrator.registerAll(spark) _ => Unit})}}
  2. 將GeoSpark jar和您的工具jar複製到DBFSdbfs: /磚/ geospark-extension-jars /
  3. 創建初始化腳本(set_geospark_extension_jar.sh),將jar從DBFS位置複製到Spark類路徑,並設置spark.sql.extensions到實用程序類。
    % scala dbutils.fs。Put ("dbfs:/databricks//set_geospark_extension_jar.sh", """#!/bin/sh |sleep 10s |#複製擴展和GeoSpark依賴jar到/databricks/jars|cp -v /dbfs/databricks/geospark-extension-jars/{spark_geospark_extension_2_11_0_1.jar,geospark_sql_2_3_1_2_0.jar} /databricks/jars/ |#設置擴展名。|cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf |[driver] {| "spark.sql. conf . |cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf |[driver] {| "Extensions " = "com. databicks .spark.utils. Extensions "。GeoSparkUdfExtension" |} |EOF |""" "。stripMargin, override = true)
  4. 將初始化腳本作為集群範圍的初始化腳本安裝(AWS|Azure|GCP).您需要腳本所在位置的完整路徑(dbfs: /磚/ < init-script-folder > / set_geospark_extension_jar.sh).
  5. 重新啟動集群。
  6. 現在可以在DBConnect中使用GeoSpark代碼。