場景
假設您需要刪除一個由一年,月,日期,地區,服務.然而,這個表非常大,大約有1000個部分每個分區的文件。您可以列出每個分區中的所有文件,然後使用Apache Spark作業刪除它們。
例如,假設你有一個被a、b和c分割的表:
% scala Seq((1、2、3、4、5),(2、3、4、5、6),(3、4、5、6、7),(4、5、6、7、8)).toDF(“a”、“b”、“c”,“d”,“e”).write.mode .partitionBy(“覆蓋”)(“a”、“b”、“c”).parquet(“/ mnt /道路/表”)
列表文件
你可以使用這個函數列出所有的零件文件:
%scala import org.apache.hadoop.conf.Configuration{路徑,文件係統}import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql. execute .datasource . inmemoryfileindex import java.net.URI def listFiles(basep: String, globp: String): Seq[String] = {val conf = new Configuration(sc.hadoopConfiguration) val fs =文件係統。get(new URI(basep), conf) def validated(path: String): path = {if(path startwith "/") new path (path) else new path ("/" + path)} val fileCatalog = InMemoryFileIndex。bulkListLeafFiles(paths = SparkHadoopUtil.get。globPath(fs, Path.mergePaths(validated(basep), validated(globp))), hadoopConf = conf, filter = null, sparkSession = spark, areRootPaths=true) // If you are using Databricks Runtime 6.x and below, // removefrom the bulkListLeafFiles function parameter. fileCatalog.flatMap(_._2.map(_.path)) } val root = "/mnt/path/table" val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*" val files = listFiles(root, globp) files.toDF("path").show()
+------------------------------------------------------------------------------------------------------------------------------+ | 路徑 | +------------------------------------------------------------------------------------------------------------------------------+ | dbfs: / mnt /道路/表/ = 1 / b = 2 / c = 3 / - 00000部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——5.。拚花| | dbfs: / mnt /道路/表/ = 2 / b = 3 / c = 4 /部分- 00001 tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——6.。拚花| | dbfs: / mnt /道路/表/ = 3 / b = 4 / c - 00002 = 5 /部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——7.。拚花| | dbfs: / mnt /道路/表/ = 4 / b = 5 / - 00003 c = 6 /部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——8.。拚花 | +------------------------------------------------------------------------------------------------------------------------------+
的listFiles函數接受一個基地路徑和一個一團路徑作為參數,掃描文件並與一團模式,然後返回作為字符串序列匹配的所有葉文件。
這個函數也使用效用函數globPath從SparkHadoopUtil包中。此函數列出具有指定前綴的目錄中的所有路徑,而不進一步列出葉子(文件)。傳遞的路徑列表InMemoryFileIndex.bulkListLeafFiles方法,這是一個用於分布式文件列表的Spark內部API。
這兩個清單實用程序函數單獨都不能很好地工作。通過組合它們,您可以使用globPath函數獲得您想要列出的頂級目錄列表,該函數將在驅動程序上運行,您可以將頂級目錄的所有子葉子的列表分發到Spark worker使用bulkListLeafFiles.
根據阿姆達爾定律,速度可以提高20-50倍。原因是,你可以很容易地控製一團路徑根據真實文件的物理布局和控製的並行度通過spark.sql.sources.parallelPartitionDiscovery.parallelism為InMemoryFileIndex.
刪除文件
當從非托管表刪除文件或分區時,可以使用Databricks實用程序函數dbutils.fs.rm.該功能利用本機雲存儲文件係統API,該API針對所有文件操作進行了優化。但是,您不能直接使用刪除一個巨大的表dbutils.fs.rm(“路徑/ / /表”).
您可以使用上麵的腳本有效地列出文件。對於較小的表,要刪除的文件的收集路徑適合驅動程序內存,因此可以使用Spark作業來分發文件刪除任務。
對於巨大的表,即使是單個頂級分區,文件路徑的字符串表示也無法放入驅動程序內存中。解決這個問題最簡單的方法是遞歸地收集內部分區的路徑,列出這些路徑,然後並行地刪除它們。
% scala scala.util進口。{Try, Success, Failure} def delete(p: String):單位= {dbutls .fs.ls(p).map(_.path). todf . {dbutls .fs.rm(file(0). Foreach {file =>)。toString, true) println(s"deleted file: $file")}} final def walkDelete(root: String)(level: Int): Unit = {dbutls .fs.ls(root).map(_.path). {為each { p => println(s"Deleting: $p, on level: ${level}") val deleting = Try { if(level == 0) delete(p) else if(p endsWith "/") walkDelete(p)(level-1) // // Set only n levels of recursion, so it won't be a problem // else delete(p) } deleting match { case Success(v) => { println(s"Successfully deleted $p") dbutils.fs.rm(p, true) } case Failure(e) => println(e.getMessage) } } }
該代碼刪除內部分區,同時確保要刪除的分區足夠小。它通過按每個級別遞歸地搜索分區來做到這一點,隻有當它到達您設置的級別時才開始刪除。例如,如果您想從刪除頂級分區開始,請使用walkDelete(根)(0).Spark會刪除所有文件dbfs: / mnt /道路/表/ = 1 /,然後刪除= 2 /…/按照這個模式,直到耗盡。
Spark作業使用刪除函數,列出了與dbutils.fs.ls假設這個級別上的子分區數量很小。您還可以通過替換dbutils.fs.ls函數與listFiles函數如上所示,隻稍加修改。
總結
這兩種方法突出了列出和刪除巨型表的方法。它們使用一些Spark實用程序函數和特定於Databricks環境的函數。即使不能直接使用它們,也可以創建自己的實用函數以類似的方式解決問題。