取消
顯示的結果
而不是尋找
你的意思是:

如何限製在每一批數量的文件流批處理

桑傑
價值貢獻

你好,

我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。

(spark.readStream.format(“δ”).load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.option (“maxFilesPerTrigger”, 200年)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ()

)

請建議。

問候,

桑傑

20個回複20.

桑傑
價值貢獻

仍然滿載。

df = (spark.readStream.format(“δ”)

.option (“maxFilesPerTrigger”、“100”)

.load (silver_path)

)

(df.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ())

werners1
尊敬的貢獻者三世

你能試著也在水槽中設置maxFilespertrigger嗎?

spark.readStream.format(“δ”)

.option (“maxFilesPerTrigger”、“100”)

.load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.option (“maxFilesPerTrigger”、“100”)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

werners1
尊敬的貢獻者三世

桑傑
價值貢獻

不,仍然把所有1000個文件

werners1
尊敬的貢獻者三世

奇怪,它應該工作。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map