取消
顯示的結果
而不是尋找
你的意思是:

限製輸入速度閱讀三角洲表流如何?

Lulka
新的貢獻者二世

你好,每個人!

我試圖讀取表三角洲作為流媒體來源使用火花。但我microbatches不平衡——一個非常小的,另一個是非常巨大的。我怎麼可以限製呢?

我使用不同的配置和maxBytesPerTrigger maxFilesPerTrigger,但沒有什麼變化,批量大小總是相同的。

有什麼想法嗎?

df =火花\

.readStream \

.format \(“δ”)

.load (“…”)

df \

.writeStream \

.outputMode \(“追加”)

.option (“checkpointLocation”、“…”) \

.table (“…”)

親切的問候

1接受解決方案

接受的解決方案

werners1
尊敬的貢獻者三世

除了你提到的參數,我不知道任何其他的控製批量大小。

你是否檢查如果δ表不是嚴重傾斜?

在原帖子查看解決方案

4回複4

werners1
尊敬的貢獻者三世

除了你提到的參數,我不知道任何其他的控製批量大小。

你是否檢查如果δ表不是嚴重傾斜?

Lulka
新的貢獻者二世

謝謝,你是對的!數據很扭曲

Kaniz
社區經理
社區經理

嗨@Yuliya Valava,如果你讀一個三角洲表作為PySpark流,您可以限製設置的輸入速度maxFilesPerTrigger選擇。

圖像

這個選項控製新文件的最大數量處理在一個觸發間隔。通過降低這個值,您可以限製輸入速率和管理數據在每一批處理。

這裏有一個例子如何限製輸入率當閱讀δ表作為流:

從pyspark。sql進口SparkSession火花= SparkSession.builder.appName (“myApp”) .getOrCreate () df = spark.readStream.format .load(“δ”)(“/道路/ / delta_table”) df = df.select(“*”) #限製輸入率1文件/觸發間隔df = df。選項(“maxFilesPerTrigger”, 1) #過程流查詢= df.writeStream.format(“控製台”).start () query.awaitTermination ()

在這個例子中,我們在閱讀δ表作為流和從表中選擇所有列。然後,我們設置maxFilesPerTrigger option 1,限製輸入率1文件/觸發間隔。

最後,我們把流到控製台並啟動流處理使用

一個query.awaitTermination ()

請注意,maxFilesPerTrigger選項可能無法有效地限製輸入率如果δ表包含巨大的文件或文件被壓縮。

在這種情況下,您可能需要將文件分割成小塊或解壓前處理數據。

Kaniz
社區經理
社區經理

嗨@Yuliya Valava,如果你設置maxBytesPerTriggermaxFilesPerTrigger選擇閱讀時三角洲表流,但批大小不改變,可能有幾個原因:

  1. 輸入數據率不超過設定的限製maxBytesPerTrigger或maxFilesPerTrigger。
  2. 這些選項控製最大的數據量可以在單個批處理,但如果輸入率低於這些限製,批量大小可能不會改變。
  3. δ表中的數據已經均勻分區,每個分區包含的數據量大致相同。在這種情況下,即使你設置maxBytesPerTrigger或maxFilesPerTrigger低價值,批量大小變化不大,因為每個分區已經包含了一個類似的數據量。
  4. 可能有其他因素影響批量大小,如集群中的可用資源或你的處理邏輯的複雜性。這些因素也可以影響批量大小,無論maxBytesPerTrigger或maxFilesPerTrigger設置的值。

maxBytesPerTrigger是否正確或maxFilesPerTrigger被應用,你可以設置他們非常低的值(例如,1或10)是否批大小變化明顯。

你也可以監控每批處理文件或字節數使用磚UI代碼或通過添加日誌語句。

如果你仍然難以控製批量大小,你可以考慮重新分區數據前處理它,或者使用其他技術來優化你的處理邏輯。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map