問題
您正在嚐試使用GeoSpark功能st_geofromwkt使用DBConnect (AWS|Azure|GCP),你會得到一個Apache Spark錯誤消息。
分析異常:未定義函數:'st_geomfromwkt'。這個函數既不是注冊的臨時函數,也不是注冊在數據庫'default'中的永久函數。
此示例代碼在使用DBConnect時出現錯誤而失敗。
%scala val sc = spark。sparkContext sc.setLogLevel("DEBUG") val sqlContext = spark。sqlContext spark. sparkcontext . addjar ("~/jars/geospark-sql_2.3-1.2.0.jar") spark. sparkcontext . addjar ("~/jars/geospark-1.2.0.jar")sql("select ST_GeomFromWKT(area) AS geometry from polygon").show()
導致
DBConnect不支持將客戶端udf自動同步到服務器。
解決方案
類在集群上注冊UDF的代碼可以使用自定義實用程序jarSparkSessionExtensions類。
- 創建一個實用工具jar,用於注冊GeoSpark函數SparkSessionExtensions.這個實用程序類定義可以構建到實用程序jar中。
%scala package com.databricks.spark.utils import org.apache.spark.sql.SparkSessionExtensions import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator類GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) {def apply(e: SparkSessionExtensions): Unit = {e.b injectcheckrule (spark => {println(" injection UDF") GeoSparkSQLRegistrator.registerAll(spark) _ => Unit})}}
- 將GeoSpark jar和您的工具jar複製到DBFSdbfs: /磚/ geospark-extension-jars /.
- 創建初始化腳本(set_geospark_extension_jar.sh),將jar從DBFS位置複製到Spark類路徑,並設置spark.sql.extensions到實用程序類。
% scala dbutils.fs。Put ("dbfs:/databricks/
/set_geospark_extension_jar.sh", """#!/bin/sh |sleep 10s |#複製擴展和GeoSpark依賴jar到/databricks/jars|cp -v /dbfs/databricks/geospark-extension-jars/{spark_geospark_extension_2_11_0_1.jar,geospark_sql_2_3_1_2_0.jar} /databricks/jars/ |#設置擴展名。|cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf |[driver] {| "spark.sql. conf . |cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf |[driver] {| "Extensions " = "com. databicks .spark.utils. Extensions "。GeoSparkUdfExtension" |} |EOF |""" "。stripMargin, override = true) - 將初始化腳本作為集群範圍的初始化腳本安裝(AWS|Azure|GCP).您需要腳本所在位置的完整路徑(dbfs: /磚/ < init-script-folder > / set_geospark_extension_jar.sh).
- 重新啟動集群。
- 現在可以在DBConnect中使用GeoSpark代碼。