你好,
我運行自動裝卸機持續運行,每1分鍾檢查新文件。我需要存儲文件接收/處理,但它給我當自動裝卸機開始日期。
這是我的代碼。
df =(火花
.readStream
.format (“cloudFiles”)
.option (“cloudFiles。格式”、“json”)
.option (“cloudFiles。在cludeExistingFiles", "true")
.option (“cloudFiles。validateOptions”、“真正的”)
.option (“cloudFiles。區域”、“us-east-1”)
.option (“cloudFiles。backfillInterval”、“一天”)
.option (“cloudFiles。fetchParallelism”, 100年)
.option (“cloudFiles。useNotifications”、“真正的”)
. schema (streamSchema)
.load (raw_path)
.withColumn (process_date,點燃(date.today ()))
)
(df
.writeStream
.format(“δ”)
.outputMode(“追加”)
bronze_checkpoint_path .option (“checkpointLocation”)
bronze_path .option(“路徑”)
.option (“mergeSchema”,真的)
.trigger (processingTime = 1分鍾)#或設置任何有意義的數據來源
.start ()
)
感謝任何幫助。
問候,
桑傑