Apache Spark不包括用於XML文件的流式API。但是,您可以將Spark批處理API的自動加載器特性與OSS庫Spark-XML結合起來,以流處理XML文件。
在本文中,我們將介紹一個基於Scala的解決方案,它使用自動加載器解析XML數據。
安裝Spark-XML庫
您必須安裝Spark-XML在Databricks集群上的OSS庫。
檢查在集群上安裝庫(AWS|Azure)文件,以了解更多詳情。
創建XML文件
創建XML文件並使用DBUtils (AWS|Azure),保存到您的集群。
% scala val xml2 = " " " <人> <人> <年齡出生= " 1990-02-24 " > 25年齡< / > < /人> <人> <年齡出生= " 1985-01-01 " > 30歲< / > < /人> <人> <年齡出生= " 1980-01-01 " > 30歲< / > < /人> < / >”“dbutils.fs.put (" / < path-to-save-xml-file > / < name-of-file > . xml”,xml2)
定義進口
導入所需的函數。
%scala import com. databicks .spark.xml.functions.from_xmlSchema_of_xml導入spark.implicit。_ import com. databicks .spark.xml。_ import org.apache.spark.sql.functions.{}
定義一個UDF來將二進製轉換為字符串
流DataFrame要求數據為字符串格式。
您應該定義一個用戶定義的函數來將二進製數據轉換為字符串數據。
%scala val toStrUDF = udf((bytes: Array[Byte]) => new String(bytes, "UTF-8"))
提取XML模式
在實現流DataFrame之前,必須提取XML模式。
方法可以從文件中推斷出這一點schema_of_xmlSpark-XML中的方法。
XML字符串作為輸入從二進製Spark數據傳遞。
%scala val df_schema = spark.read.format("binaryFile").load("/FileStore/tables/test/xml/data/age/").select(toStrUDF($"content").alias("text")) val payloadSchema = schema_of_xml(df_schema.select("text").as[String])
實現流讀取器
至此,所有所需的依賴項都已滿足,因此您可以實現流讀取器。
使用readStream時啟用二進製和自動加載器列表模式選項。
toStrUDF用於將二進製數據轉換為字符串格式(文本)。
from_xml用於將字符串轉換為具有用戶定義模式的複雜結構類型。
%scala val df = spark.readStream.format("cloudFiles") .option("cloudFiles. format ")usenotifnotifications ", "false") //使用列表模式,因此false被使用。option("cloudFiles. conf "). format", "binaryFile") .load("/FileStore/tables/test/xml/data/age/") .select(toStrUDF($"content").alias("text")) // UDF將二進製轉換為字符串。select(from_xml($"text", payloadSchema).alias("parsed")) //將字符串轉換為複雜類型的函數。withcolumn ("path",input_file_name) // input_file_name用於提取輸入文件的路徑
視圖輸出
設置好一切之後,查看的輸出顯示器(df)在筆記本上。
例如筆記本電腦
本示例筆記本將所有步驟組合成一個單獨的、有效的示例。
將其導入集群以運行示例。
流式XML示例筆記本
檢查流式XML示例筆記本.