使用自動加載器流XML文件

通過結合Spark批處理API和OSS庫Spark-XML的自動加載特性,在Databricks上流XML文件。

寫的亞當Pavlacka

最後發布日期:2022年5月19日

Apache Spark不包括用於XML文件的流式API。但是,您可以將Spark批處理API的自動加載器特性與OSS庫Spark-XML結合起來,以流處理XML文件。

在本文中,我們將介紹一個基於Scala的解決方案,它使用自動加載器解析XML數據。

安裝Spark-XML庫

您必須安裝Spark-XML在Databricks集群上的OSS庫。

檢查在集群上安裝庫(AWS|Azure)文件,以了解更多詳情。

刪除

信息

您必須確保您正在安裝的Spark- xml版本與您集群上的Spark版本匹配。

創建XML文件

創建XML文件並使用DBUtils (AWS|Azure),保存到您的集群。

% scala val xml2 = " " " <人> <人> <年齡出生= " 1990-02-24 " > 25年齡< / > < /人> <人> <年齡出生= " 1985-01-01 " > 30歲< / > < /人> <人> <年齡出生= " 1980-01-01 " > 30歲< / > < /人> < / >”“dbutils.fs.put (" / < path-to-save-xml-file > / < name-of-file > . xml”,xml2)

定義進口

導入所需的函數。

%scala import com. databicks .spark.xml.functions.from_xmlSchema_of_xml導入spark.implicit。_ import com. databicks .spark.xml。_ import org.apache.spark.sql.functions.{}

定義一個UDF來將二進製轉換為字符串

流DataFrame要求數據為字符串格式。

您應該定義一個用戶定義的函數來將二進製數據轉換為字符串數據。

%scala val toStrUDF = udf((bytes: Array[Byte]) => new String(bytes, "UTF-8"))

提取XML模式

在實現流DataFrame之前,必須提取XML模式。

方法可以從文件中推斷出這一點schema_of_xmlSpark-XML中的方法。

XML字符串作為輸入從二進製Spark數據傳遞。

%scala val df_schema = spark.read.format("binaryFile").load("/FileStore/tables/test/xml/data/age/").select(toStrUDF($"content").alias("text")) val payloadSchema = schema_of_xml(df_schema.select("text").as[String])

實現流讀取器

至此,所有所需的依賴項都已滿足,因此您可以實現流讀取器。

使用readStream時啟用二進製和自動加載器列表模式選項。

刪除

信息

列表模式在處理少量數據時使用。你可以利用fileNotificationMode如果您需要擴展應用程序。

toStrUDF用於將二進製數據轉換為字符串格式(文本)。

from_xml用於將字符串轉換為具有用戶定義模式的複雜結構類型。

%scala val df = spark.readStream.format("cloudFiles") .option("cloudFiles. format ")usenotifnotifications ", "false") //使用列表模式,因此false被使用。option("cloudFiles. conf "). format", "binaryFile") .load("/FileStore/tables/test/xml/data/age/") .select(toStrUDF($"content").alias("text")) // UDF將二進製轉換為字符串。select(from_xml($"text", payloadSchema).alias("parsed")) //將字符串轉換為複雜類型的函數。withcolumn ("path",input_file_name) // input_file_name用於提取輸入文件的路徑

視圖輸出

設置好一切之後,查看的輸出顯示器(df)在筆記本上。

樣本代碼輸出在筆記本電腦。

例如筆記本電腦

本示例筆記本將所有步驟組合成一個單獨的、有效的示例。

將其導入集群以運行示例。

流式XML示例筆記本

檢查流式XML示例筆記本


這篇文章有用嗎?