@Krishnamoorthy Natarajan:請盡量使用foreachBatch()方法應用自定義每個micro-batch輸出數據的處理。示例代碼如下
從pyspark.sql。從pyspark.sql進口input_file_name功能。類型進口StructType、StructField StringType IntegerType #定義模式模式= StructType ([StructField(“名字”,StringType(),真的),StructField(“時代”,IntegerType(),真的)])#定義流數據源input_path = " / mnt /輸入文件夾/ * / * / *。csv“df = spark.readStream.schema(模式)。選項(“maxFilesPerTrigger”, 1) . csv (input_path)。withColumn (“input_file”, input_file_name ()) # Define foreachBatch函數來寫入三角洲def write_to_delta (df, epoch_id): #得到輸入文件路徑input_file = df.select (“input_file”)當代()[0]#定義基於輸入文件輸出路徑output_path =“/ mnt /輸出文件夾/”+ input_file.split (“/”) [3] + " + input_file.split(“/”)[2] #寫數據到三角洲df.write.format .mode(“δ”)(“追加”)。選項(“路徑”,output_path) .save() #應用foreachBatch函數在輸出數據df.writeStream.foreachBatch (write_to_delta) .start () .awaitTermination ()