取消
顯示的結果
而不是尋找
你的意思是:

使用自動加載器大數據攝入問題

bzh
新的因素

這個項目的目標是攝取1000 +文件每個文件(100 mb)從S3成磚。由於這是增量更改,我們使用自動裝卸機持續攝入和轉換使用一個集群(i3.xlarge)。
當前進程非常緩慢。感覺它可能需要幾天才能完成。
1。E上述文件大約是100000 +行。但當我運行代碼時,火花UI顯示有超過3000000行(見附件)正在處理完全即使我設置maxFilesPerTrigger = 1
2。我們使用udf。我們理解udf可以是一個昂貴的操作相比Pyspark dataframe。但一些python邏輯行級轉換是很難轉換成dataframe。
這是我們的代碼的流程:

#從s3山讀取流
df = (
spark.readStream.format (“cloudFiles”)
.option (“cloudFiles。格式”、“csv”)
.option (“9”、“| | | |”)
. schema (transaction_schema)
.option (“maxFilesPerTrigger”, 1)
.load (“dbfs: / mnt / s3 /公共/ test_transactions”)
)

#外負載contracts_df一旦流操作
contracts_df = spark.read.table (“hive_metastore.spindl.contracts”)

#轉換應用到整個流DataFrame
df = df。withColumn (“transaction_hash F.col (" id "))
#……更多的轉換…

#定義udf
contract_info_udf = F。udf (contract_info,……

#應用udf
df = df。withColumn (contract_info contract_info_udf (F.struct (contracts_df.columns)))
#……更多的轉換…

#寫入事務表
df.write.mode(“追加”).insertInto (“hive_metastore.spindl.test_transactions”)

#寫流
查詢= df.writeStream \
.format \(“δ”)
.option (“checkpointLocation”、“/ tmp /δ/測試/ _write_stream_checkpoints /”) \
.start ()

query.awaitTermination ()

這是我們的代碼的流程。
3回複3

Tharun-Kumar
重視貢獻三世
重視貢獻三世

@bzh

我強烈懷疑,一個文件有300萬條記錄。確認如果數據來自單個文件或多個文件,你可以添加一個新列的值作為input_file_name ()。這將有助於我們理解是否配置maxFilesPerTrigger被認為是。

文檔-https://docs.www.eheci.com/en/sql/language-manual/functions/input_file_name.html

Lakshay
尊敬的貢獻者三世
尊敬的貢獻者三世

它看起來像來自一個文件的3 m公司記錄。和處理這些記錄,你可能需要更多的核集群。

youssefmrini
尊敬的貢獻者二世
尊敬的貢獻者二世

有幾個可能的方法來提高性能的火花流攝取大量的S3的工作文件。這裏有一些建議:

  1. 調優spark.sql.shuffle.partitions配置參數:

默認情況下,調整分區的數量設置為200,這可能是太低的工作負載。該參數控製火花時應該使用多少個分區之間移動數據階段,和太少的分區會導致較低的並行性和緩慢的性能。你可以試試這個參數到一個更高的數量增加,基於數據的大小和核集群節點的數量,提高並行性和提高性能。

例如:

python
spark.conf。(“spark.sql.shuffle.partitions”,“1000”)

                         
  1. 合並輸出dataframe下筆前三角洲湖:

寫作三角洲湖時,火花首先將輸出寫入到臨時文件,然後將其合並到主表使用背景的工作。如果你有太多的小文件,這可能導致低劣的性能。為了幫助緩解這個問題,你可以合並輸出dataframe減少創建的文件的數量。

例如:

python
df.coalesce (16).write.mode (“添加”)格式(“δ”)
  1. 簡化/優化udf

執行行級轉換使用udf可以是昂貴的,特別是如果函數沒有優化。你可以嚐試優化你的udf:

  • 廣播小DataFrames,使它們可以在所有節點。
  • 利用矢量化udf
  • 盡可能避免Python udf。
  1. 增加實例類型,自動定量調整:

您可能需要考慮使用更大的實例類型。或者,你可以調整你的自動定量政策動態集群規模大小根據傳入的工作量。你可以使用磚自動定量的特性來幫助最大化集群利用率和降低整體成本。

  1. 使用達美航空的自動最優化特性

考慮顯式打開三角洲湖的自動最優化特性。當你保持攝取數據,選擇優化/緊湊可以提高查詢性能。

這些最佳實踐改善火花流媒體應用程序的性能從S3使用自動裝卸機攝取大量的數據。

我希望會有幫助!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map