取消
顯示的結果
而不是尋找
你的意思是:

如何限製在每一批數量的文件流批處理

桑傑
價值貢獻

你好,

我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。

(spark.readStream.format(“δ”).load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.option (“maxFilesPerTrigger”, 200年)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ()

)

請建議。

問候,

桑傑

1接受解決方案

接受的解決方案

Sandeep
貢獻者三世

@Sanjay Jain抱歉錯過了一件事。.trigger(一旦= True)不支持速率限製器。您可以使用.trigger (availableNow = True)。

裁判:https://docs.www.eheci.com/structured-streaming/triggers.html configuring-incremental-batch-process……

spark.readStream.format(“δ”)

.option (“maxFilesPerTrigger”, 200年)

.load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.trigger (availableNow = True)

.foreachBatch (foreachBatchFunction)

.start ()

在原帖子查看解決方案

20個回複20.

werners1
尊敬的貢獻者三世

你可以嚐試與觸發= availablenow

桑傑
價值貢獻

現在嚐試可用,但它還處理所有數據進行處理。我想在批處理過程,最大200個文件我每一批雖然我有1000個文件的過程。

werners1
尊敬的貢獻者三世

好吧,你怎麼知道1000個文件被選中?

我問,因為三角洲湖(源代碼)也將舊版本的數據存儲,這將不會被發送到流中。身體三角洲湖可能有1000個文件但當前狀態也許隻有150個文件- > 1 microbatch。

這有可能嗎?

我已經發送1000個文件處理前層和我不想處理所有。我可以看到所有1000收到了在當前批

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map