你好,
我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。
(spark.readStream.format(“δ”).load (silver_path)
.writeStream
gold_checkpoint_path .option (“checkpointLocation”)
.option (“maxFilesPerTrigger”, 200年)
.trigger(一旦= True)
.foreachBatch (foreachBatchFunction)
.start ()
.awaitTermination ()
)
請建議。
問候,
桑傑
仍然滿載。
df = (spark.readStream.format(“δ”)
.option (“maxFilesPerTrigger”、“100”)
.load (silver_path)
)
(df.writeStream
gold_checkpoint_path .option (“checkpointLocation”)
.trigger(一旦= True)
.foreachBatch (foreachBatchFunction)
.start ()
.awaitTermination ())
你能試著也在水槽中設置maxFilespertrigger嗎?
spark.readStream.format(“δ”)
.option (“maxFilesPerTrigger”、“100”)
.load (silver_path)
.writeStream
gold_checkpoint_path .option (“checkpointLocation”)
.option (“maxFilesPerTrigger”、“100”)
.trigger(一旦= True)
.foreachBatch (foreachBatchFunction)
.start ()