我需要閱讀和變換幾CSV文件,然後將它們附加到一個數據幀。我能做到這一點在磚使用簡單的循環,但我想加快解決。
下麵是我的代碼的大致結構:
在filepath all_filepaths: df1 = read_file (filepath) df2 =變換(df1) df3 = df3.append (df2)
而不是一次處理1文件有並行方式處理它們嗎?有很多解決方案在線但是我隻能在磚得到以下工作:
與ThreadPoolExecutor (max_workers = 20)池:df3 = pd.concat(池。地圖(read_and_transform_df all_filepaths))
153個文件,第一種方法花費了3.35分鍾,第二種方法3.87分鍾。
有沒有一種方法來優化第二種方法還是另一種更快的方法?
謝謝,
Tanjil
請考慮使用一個自動裝卸機加載多個CSV。
通過這種方式,您將會加速
https://docs.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/schema
我知道它可以令人困惑,因為它是一個流請添加
.trigger (availableNow = True)
過程隻是一個時間和完成加載後一切。
你可以閱讀所有的csv文件路徑使用通配符像spark.read.csv(“路徑/ * . csv”)
然而,由於這是並行處理的,你不會讓你的記錄以完全相同的順序,因為他們存在於你的csv文件。
為了達到這個目的,你需要你dataframe使用列進行排序,可以幫助你確定哪些文件一行。
如果你沒有這樣的列,然後不創建一個。你可以首先編寫一個簡單的代碼添加一列“row_num”你的每一個csv文件和行號。
如。file1 100行的值是0到99然後下文件,100行就100年至199年,等等。
然後你可以閱讀所有文件一次列row_num和秩序。
你可以過濾特定文件行嗎?如果是的,那麼取子集dataframe使用,”(df.where())的條款和適用各自的轉換。
請考慮使用一個自動裝卸機加載多個CSV。
通過這種方式,您將會加速
https://docs.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/schema
我知道它可以令人困惑,因為它是一個流請添加
.trigger (availableNow = True)
過程隻是一個時間和完成加載後一切。