我最近開始探索領域的數據工程和遇到一些困難。我有一桶GCS數以百萬計的鑲花的文件,我想創建一個異常檢測模型。我想攝取數據成磚,它存儲在一個δ表,使用自動裝卸機,然後使用水蘇打水的異常檢測模型。你推薦什麼?
我有遇到一些內存問題當我讀到三角洲表成Pyspark Dataframe並試圖將其傳遞給一個水框架。
我使用一個集群30 GB的內存和4芯和Dataframe形狀(30448674,213)。我得到如下錯誤:火花驅動程序意外停止並重新啟動。你的筆記本將自動複位。Ganglia UI中通過分析我的司機圖執行時總是用紅色asH2OFrame()函數。
這是我使用的代碼:
#從pyspark進口從pysparkling進口進口水*。#創建一個sql進口SparkSession H2OContext對象。hc = H2OContext.getOrCreate(火花)#從三角洲表讀取數據PySpark Dataframe df = spark.read.format .load(“δ”)(“路徑/ / delta_table”) #將PySpark Dataframe一H2OFrame h2o_df = hc.asH2OFrame (df) #水模型的異常檢測isolation_model = H2OIsolationForestEstimator isolation_model (model_id ntrees,種子)。火車(training_frame = h2o_df) #預測預測= isolation_model.predict (h2o_df)
@Pedro巴博薩:
似乎你的記憶當試圖把PySpark dataframe H2O框架。一個可能的方法來解決這個問題是分區PySpark dataframe之前將它轉換為一個水框架。
您可以使用重新分配()方法的PySpark dataframes dataframe劃分成較小的分區,然後將每個分區轉換成一個單獨水框架。然後您可以將生成的水幀來創建最終的水。
下麵是一個示例代碼片段展示了如何分區PySpark dataframe和每個分區轉換成水框架:
#從pyspark進口從pysparkling進口進口水*。#創建一個sql進口SparkSession H2OContext對象。hc = H2OContext.getOrCreate(火花)#從三角洲表讀取數據PySpark Dataframe df = spark.read.format .load(“δ”)(“路徑/ / delta_table”) #分區Dataframe num_partitions = 10 df = df.repartition (num_partitions) #每個分區轉換成一個水框架h2o_frames =[]我的範圍(num_partitions): partition_df = df.where (df.partition_id() = =我)h2o_frame = hc.asH2OFrame (partition_df) h2o_frames.append (h2o_frame) #連接水幀來創建最終的水框架h2o_df = h2o.frames.concat (h2o_frames) #水模型的異常檢測isolation_model = H2OIsolationForestEstimator isolation_model (model_id ntrees,種子)。火車(training_frame = h2o_df) #預測預測= isolation_model.predict (h2o_df)
這種方法可以幫助你避免內存耗盡時將大PySpark dataframes H2O幀。