取消
顯示的結果
而不是尋找
你的意思是:

讀取專有文件和轉換表的內容——錯誤彈性過程需要的

格哈德
新的貢獻者三世

我們有數據存儲在HDF5文件“專有”。這些數據需要讀取、轉換和轉換,才能插入到三角洲表。

所有這些轉換在一個定製的python函數需要HDF5文件並返回一個元組的列表。輸入文件大約50 mb的大小。輸出

與大約1500行和2000列一個表為每個文件。

這就是我做的事情:

以並行處理的文件我創建了一個抽樣的文件內容(二進製)和分區的數量等於文件的數量。然後我使用rdd.map()運行每個文件的自定義函數。並行化部隊使用一個任務在每個文件。輸出是一個抽樣,然後轉換為dataframe和插入到三角洲表。

df_files # dataframe與要處理的文件列表rcount = df_files.count df_files.rdd ()。地圖(λx: x.FQNFile) .collect () rdd_files = spark.read.format (binaryFile) .load (list_of_files) .repartition (rcount)。抽樣def parse_hdf5(行,pandas_df_with_mapping):文件=行(“路徑”)file_name = os.path.basename(文件)# hdf文件讀取從二進製生物= io.BytesIO(字節(行(“內容”)))h5 = h5py。文件(生物,“r”) #處理h5文件#有一些步驟來提取數據,# mapp轉置,等data_list =[]的時間戳hdf5_file:錘頭= (file_name、時間戳、dict_of_data) data_list.append(錘頭)返回data_list rdd2 = rdd_files。地圖(λx: parse_hdf5 (x, pandas_df_with_mapping))原理圖=火花模式df =火花。createDataFrame (rdd2原理圖)df2 = df.select(爆炸(df.value) .alias (t)) \ .select (' t。file_name”、“t。時間戳,t.struct_of_data) #進一步過程df2三角洲表

問題:

1。遺囑執行人超時

整個不時引發工作失敗與錯誤消息”遺囑執行人心跳超時”。這是一個錯誤我不能繁殖。有時候工作,有時候不能用這個錯誤。我馬上重新啟動工作時,它可以工作,沒有改變。

我已經搜查了日誌(司機、遺囑執行人),沒有發現一個暗示,點我的方向如何解決這個問題。集群的指標不顯示任何重大問題與CPU或內存利用率。

任何想法如何解決這是受歡迎的!

2。腐敗HDF5文件停止整個工作

如果其中一個HDF5文件是腐敗的任務失敗了整個工作,忽視了所有正確的工作的其他任務。

我的主要問題是:是否有更好的方法來處理“專有”文件與一個python函數映射抽樣像我一樣嗎?

有辦法有一個錯誤彈性過程,即使一個任務失敗完成剩下的工作?理想情況下的任務/文件失敗的提示。

唯一的選擇,我看到的是在一個循環中處理它的驅動程序,這不是平行的。

請注意:

兩件事要注意,因為他們不直接。

1。因為需要保護我們的磚的工作區中所有數據使用憑證透傳。這導致我們不能直接讀取HDF5文件從python函數從一個山。

相反,我有閱讀HDF5文件在抽樣´spark.read.format (binaryFile)´然後映射函數。

2。列的列表存儲在HDF5文件可以各不相同,因此我決定用一個結構體在結果表中。所以,從函數返回的元組實際上是包含鍵/淡水河穀地圖。但這應該不是問題。

提前謝謝你!

0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map