如何處理包含在XML文件中的blob數據

了解如何處理XML文件中包含的blob數據。

寫的亞當Pavlacka

最新發布日期:2022年3月4日

如果以XML格式記錄事件,則每個XML事件都記錄為base64字符串。為了使用Apache Spark對這些數據進行分析,您需要使用spark_xml圖書館和BASE64DECODER API轉換數據以便分析。

問題

您需要使用Spark分析xml格式的日誌文件中的base64編碼字符串。例如,下麵的文件input.xml顯示這種格式:

<?xml version="1.0" encoding="UTF-8"日誌(< ! DOCTYPEENTITY %log SYSTEM "instance">%log;]>   <一> aW5zdGFuY2VJZCxzdGFydFRpbWUsZGVsZXRlVGltZSxob3Vycw0KaS0wMjdmYTdjY2RhMjEwYjRmNCwyLzE3LzE3VDIwOjIxLDIvMTcvMTdUMjE6MTEsNQ0KaS0wN2NkNzEwMGUzZjU0YmY2YSwyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsNA0KaS0wYTJjNGFkYmYwZGMyNTUxYywyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsMg0KaS0wYjQwYjE2MjM2Mzg4OTczZiwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsNg0KaS0wY2ZkODgwNzIyZTE1ZjE5ZSwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsMg0KaS0wY2YwYzczZWZlZWExNGY3NCwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsMQ0KaS0wNTA1ZTk1YmZlYmVjZDZlNiwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsOA==    

解決方案

解析XML文件:

  1. 加載XML數據。
  2. 使用spark_xml庫並創建一個原始文件DataFrame
  3. 在blob列上應用base64解碼器BASE64DecoderAPI。
  4. 將解碼後的數據保存在一個文本文件中(可選)。
  5. 使用Spark加載文本文件DataFrame和解析。
  6. 創建DataFrame作為Spark SQL表。

下麵的Scala代碼處理該文件:

val xmlfile = "/mnt//input.xml" val readxml = spark.read.format("com.databricks.spark.xml").option("rowTag","message").load(xmlfile) val decoded = readxml。selectExpr("_source as source","_time as time","_type as type","detail.blob") decoded.show() //顯示原始blob數據//對blob數據的每一塊應用base64解碼器,如下所示val decodethisxmlblob = decoded。rdd .map(str => str(3). tostring). map(str1 => new String(new sun.misc.BASE64Decoder() . decodebuffer (str1))) //將其臨時存儲在一個文本文件中decodethisxmlblob.saveAsTextFile("/mnt/vgiri/ec2blobtotxt") //根據需要使用Spark DataFrame解析該文本文件。val readAsDF = spark.sparkContext.textFile("/mnt/vgiri/ec2blobtotxt") val header = readAsDF.first() val finalTextFile = readAsDF。filter(row => row != header) val finalDF = finalTextFile.toDF() .selectExpr(("split(value, ',')[0] as instanceId"), ("split(value, ',')[1] as startTime"), ("split(value, ',')[2] as deleteTime"), ("split(value, ',')[3] as deleteTime"), ("split(value, ',')[3] as hours")) finalDF.show()

Spark代碼生成如下輸出:

18/03/24 22:54:31 INFO DAGScheduler: ResultStage 4 (show at SparkXMLBlob.scala:42) finished in 0.016 sscala:42, take 0.019120 s 18/03/24 22:54:31調用阻止()關閉鉤  +-------------------+-------------+-------------+-----+ | instanceId |開始時間| deleteTime |小時 | +-------------------+-------------+-------------+-----+ | 我- 027 fa7ccda210b4f4 | 2/17/17T20:21 | 2/17/17T21:11 | 5 | | i-07cd7100e3f54bf6a | 2/17/17T20:19 | 2/17/17T21:11 | 4 | | i-0a2c4adbf0dc2551c | 2/17/17T20:19 | 2/17/17T21:11 | 2 | | i-0b40b16236388973f | 2/17/17T20:18 | 2/17/17T21:11 | 6 | | i-0cfd880722e15f19e | 2/17/17T20:18 | 2/17/17T21:11 | 2 || i-0cf0c73efeea14f74 | 2/17/17T16:21 | 2/17/17T17:11 | 1 | |我- 0505 e95bfebecd6e6 | 2/17/17T16:21 | 2/17/17T17:11 | 8 | +-------------------+-------------+-------------+-----+