取消
顯示的結果
而不是尋找
你的意思是:

三角洲編寫基於輸入文件流動態不同的文件夾

Krishna264
新的因素

我有根文件夾和文件正在攝入在子文件夾。想要構建一個工作流將編寫流基於文件被攝入

2回答2

匿名
不適用

@Krishnamoorthy Natarajan:請盡量使用foreachBatch()方法應用自定義每個micro-batch輸出數據的處理。示例代碼如下

從pyspark.sql。從pyspark.sql進口input_file_name功能。類型進口StructType、StructField StringType IntegerType #定義模式模式= StructType ([StructField(“名字”,StringType(),真的),StructField(“時代”,IntegerType(),真的)])#定義流數據源input_path = " / mnt /輸入文件夾/ * / * / *。csv“df = spark.readStream.schema(模式)。選項(“maxFilesPerTrigger”, 1) . csv (input_path)。withColumn (“input_file”, input_file_name ()) # Define foreachBatch函數來寫入三角洲def write_to_delta (df, epoch_id): #得到輸入文件路徑input_file = df.select (“input_file”)當代()[0]#定義基於輸入文件輸出路徑output_path =“/ mnt /輸出文件夾/”+ input_file.split (“/”) [3] + " + input_file.split(“/”)[2] #寫數據到三角洲df.write.format .mode(“δ”)(“追加”)。選項(“路徑”,output_path) .save() #應用foreachBatch函數在輸出數據df.writeStream.foreachBatch (write_to_delta) .start () .awaitTermination ()

Vidula_Khanna
主持人
主持人

嗨@Krishnamoorthy Natarajan

謝謝你的問題!幫助你更好的,請花一些時間來檢查答案,讓我知道它是否最適合您的需要。

請幫助我們選擇最好的解決方案通過點擊“選擇最佳”如果它。

您的反饋將幫助我們確保我們提供最好的服務給你。謝謝你!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map