這不會工作,如果你是第一次創建表的流,例如當首次運行下麵的代碼。我需要一種方法來捕捉文件名的流
#配置自動加載程序
streaming_query = (spark.readStream
。
格式
(
“cloudFiles”
)
.option (
“cloudFiles.format”
,
“csv”
)
.option (
“cloudFiles.schemaLocation”
raw_checkpoint_path)
.option (
“9”
,
“|”
)
.option (
“inferSchema”
,
“真正的”
)
.option (
“lineSep”
,
“\ r \ n”
)
#指定樣式的EOL字符(CRLF)
.option (
“pathGlobfilter”
file_pattern)
.load (
f”
{file_path}
”
)
.select (
“*”
,input_file_name ().alias (
“source_file”
),current_timestamp () .alias (
“processing_time”
))
.writeStream
.option (
“checkpointLocation”
raw_checkpoint_path)
.trigger (availableNow =
真正的
)
.toTable (raw_table_name))