在流媒體工作,我們目前在一個目錄上運行流(cloudFiles格式)與銷售交易每5分鍾。
在這個目錄中,事務是下令在下列格式:
< streaming-checkpoint-root > / < transaction_date > / < transaction_hour > / transaction_x_y.json
隻有今天的交易感興趣的,其他所有已經過時了。
當我開始流的工作,它將處理所有曆史交易,´我不希望的。
隻可能以某種方式來處理新文件進來後流已經開始工作?
更新:
maxFileAge似乎不是一個好主意。以下的選項“includeExistingFiles”= False解決了我的問題:
streaming_df = (
spark.readStream.format (“cloudFiles”)
.option (“cloudFiles。格式”,擴展)
.option (“cloudFiles。maxFilesPerTrigger”, 20)
.option (“cloudFiles。在cludeExistingFiles", False)
.option(“多行”,真的)
.option (“pathGlobfilter”、“*”。+擴展)\
. schema(模式).load (streaming_path)
)
看來,“maxFileAge”解決問題。
streaming_df = (
spark.readStream.format .option (“cloudFiles (“cloudFiles”)。”、“json格式”)\
.option (“maxFilesPerTrigger”, 20) \
.option \(“多行”,真正的)
.option (“maxFileAge”, 1) \
. schema(模式).load (streaming_path)
)
這忽略了文件超過1周。
但如何忽略文件超過1天嗎?
是的完全cloudFiles.maxFileAge請選擇你的答案是最好的
更新:
maxFileAge似乎不是一個好主意。以下的選項“includeExistingFiles”= False解決了我的問題:
streaming_df = (
spark.readStream.format (“cloudFiles”)
.option (“cloudFiles。格式”,擴展)
.option (“cloudFiles。maxFilesPerTrigger”, 20)
.option (“cloudFiles。在cludeExistingFiles", False)
.option(“多行”,真的)
.option (“pathGlobfilter”、“*”。+擴展)\
. schema(模式).load (streaming_path)
)