我試圖在磚parallelise拷貝文件的執行。利用多個執行器是一種方法。所以,這是一段代碼,我在pyspark寫道。
def parallel_copy_execution (src_path: str target_path: str): files_in_path = dbutils.fs.ls (src_path) file_paths_df = spark.sparkContext.parallelize (files_in_path) .toDF file_paths_df ()。foreach(λx: dbutils.fs.cp (x.path.toString (), target_path,遞歸= True))
我拿來所有文件複製和創造了Dataframe。當試圖運行一個foreach在DataFrame我得到以下錯誤。它說
“你不能使用內dbutils火花工作”
你不能使用dbutils內引發工作或否則泡菜。如果你需要使用內getArguments火花的工作,你必須在使用前得到的參數。例如,如果您有以下代碼:myRdd。地圖(λ我:dbutils.args.getArgument (X) + str(我)那麼你應該這樣使用它:argX = dbutils.args.getArgument myRdd (X)。地圖(λ我:argX + str(我))
但當我試著在Scala中相同。它的工作原理。內部使用dbutils火花工作。將這段代碼。
def parallel_copy_execution (p:字符串,t: String):單位= {dbutils.fs.ls (p) . map (_.path) .toDF。= > dbutils.fs.cp foreach{文件(文件(0)。toString, t,遞歸= true) println (s“cp文件:$文件”)}}
Pyspark API不是更新來處理呢?
如果是的,請建議替代過程並行dbutils命令。
如果你有火花會話,您可以使用火花隱藏的文件係統:
#從SparkSession fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get獲取文件係統(spark._jsc.hadoopConfiguration()) #讓路徑類路徑字符串轉換為fs = spark._jvm.org.apache.hadoop.fs.Path #列表文件路徑路徑fs.listStatus(路徑(路徑“/ / /數據”))#應與安裝點#重命名文件fs.rename(路徑(“OriginalName”),路徑(新名稱))#刪除文件fs.delete(路徑(路徑“/ / /數據”))#上傳文件DBFS根fs.copyFromLocalFile(路徑(local_file_path),路徑(remote_file_path)) #上傳文件DBFS根fs.copyToLocalFile(路徑(remote_file_path),路徑(local_file_path))
如果你有一個Azure存儲,你應該安裝集群,然後你可以訪問它與“abfss: / /”或“/ mnt /”