我們有數據存儲在HDF5文件“專有”。這些數據需要讀取、轉換和轉換,才能插入到三角洲表。
所有這些轉換在一個定製的python函數需要HDF5文件並返回一個元組的列表。輸入文件大約50 mb的大小。輸出
與大約1500行和2000列一個表為每個文件。
這就是我做的事情:
以並行處理的文件我創建了一個抽樣的文件內容(二進製)和分區的數量等於文件的數量。然後我使用rdd.map()運行每個文件的自定義函數。並行化部隊使用一個任務在每個文件。輸出是一個抽樣,然後轉換為dataframe和插入到三角洲表。
df_files # dataframe與要處理的文件列表rcount = df_files.count df_files.rdd ()。地圖(λx: x.FQNFile) .collect () rdd_files = spark.read.format (binaryFile) .load (list_of_files) .repartition (rcount)。抽樣def parse_hdf5(行,pandas_df_with_mapping):文件=行(“路徑”)file_name = os.path.basename(文件)# hdf文件讀取從二進製生物= io.BytesIO(字節(行(“內容”)))h5 = h5py。文件(生物,“r”) #處理h5文件#有一些步驟來提取數據,# mapp轉置,等data_list =[]的時間戳hdf5_file:錘頭= (file_name、時間戳、dict_of_data) data_list.append(錘頭)返回data_list rdd2 = rdd_files。地圖(λx: parse_hdf5 (x, pandas_df_with_mapping))原理圖=火花模式df =火花。createDataFrame (rdd2原理圖)df2 = df.select(爆炸(df.value) .alias (t)) \ .select (' t。file_name”、“t。時間戳,t.struct_of_data) #進一步過程df2三角洲表
問題:
1。遺囑執行人超時
整個不時引發工作失敗與錯誤消息”遺囑執行人心跳超時”。這是一個錯誤我不能繁殖。有時候工作,有時候不能用這個錯誤。我馬上重新啟動工作時,它可以工作,沒有改變。
我已經搜查了日誌(司機、遺囑執行人),沒有發現一個暗示,點我的方向如何解決這個問題。集群的指標不顯示任何重大問題與CPU或內存利用率。
任何想法如何解決這是受歡迎的!
2。腐敗HDF5文件停止整個工作
如果其中一個HDF5文件是腐敗的任務失敗了整個工作,忽視了所有正確的工作的其他任務。
我的主要問題是:是否有更好的方法來處理“專有”文件與一個python函數映射抽樣像我一樣嗎?
有辦法有一個錯誤彈性過程,即使一個任務失敗完成剩下的工作?理想情況下的任務/文件失敗的提示。
唯一的選擇,我看到的是在一個循環中處理它的驅動程序,這不是平行的。
請注意:
兩件事要注意,因為他們不直接。
1。因為需要保護我們的磚的工作區中所有數據使用憑證透傳。這導致我們不能直接讀取HDF5文件從python函數從一個山。
相反,我有閱讀HDF5文件在抽樣´spark.read.format (binaryFile)´然後映射函數。
2。列的列表存儲在HDF5文件可以各不相同,因此我決定用一個結構體在結果表中。所以,從函數返回的元組實際上是包含鍵/淡水河穀地圖。但這應該不是問題。
提前謝謝你!