你好,每個人!
我試圖讀取表三角洲作為流媒體來源使用火花。但我microbatches不平衡——一個非常小的,另一個是非常巨大的。我怎麼可以限製呢?
我使用不同的配置和maxBytesPerTrigger maxFilesPerTrigger,但沒有什麼變化,批量大小總是相同的。
有什麼想法嗎?
df =火花\
.readStream \
.format \(“δ”)
.load (“…”)
df \
.writeStream \
.outputMode \(“追加”)
.option (“checkpointLocation”、“…”) \
.table (“…”)
親切的問候
嗨@Yuliya Valava,如果你讀一個三角洲表作為PySpark流,您可以限製設置的輸入速度maxFilesPerTrigger選擇。
這個選項控製新文件的最大數量處理在一個觸發間隔。通過降低這個值,您可以限製輸入速率和管理數據在每一批處理。
這裏有一個例子如何限製輸入率當閱讀δ表作為流:
從pyspark。sql進口SparkSession火花= SparkSession.builder.appName (“myApp”) .getOrCreate () df = spark.readStream.format .load(“δ”)(“/道路/ / delta_table”) df = df.select(“*”) #限製輸入率1文件/觸發間隔df = df。選項(“maxFilesPerTrigger”, 1) #過程流查詢= df.writeStream.format(“控製台”).start () query.awaitTermination ()
在這個例子中,我們在閱讀δ表作為流和從表中選擇所有列。然後,我們設置maxFilesPerTrigger option 1,限製輸入率1文件/觸發間隔。
最後,我們把流到控製台並啟動流處理使用
一個query.awaitTermination ()。
請注意,maxFilesPerTrigger選項可能無法有效地限製輸入率如果δ表包含巨大的文件或文件被壓縮。
在這種情況下,您可能需要將文件分割成小塊或解壓前處理數據。
嗨@Yuliya Valava,如果你設置maxBytesPerTrigger和maxFilesPerTrigger選擇閱讀時三角洲表流,但批大小不改變,可能有幾個原因:
maxBytesPerTrigger是否正確或maxFilesPerTrigger被應用,你可以設置他們非常低的值(例如,1或10)是否批大小變化明顯。
你也可以監控每批處理文件或字節數使用磚UI代碼或通過添加日誌語句。
如果你仍然難以控製批量大小,你可以考慮重新分區數據前處理它,或者使用其他技術來優化你的處理邏輯。