大家好!
我實現了一個數據管道使用自動裝卸機銅- - >銀- - >金。
現在當我做這個我想執行一些數據質量檢查,為此我用遠大前程圖書館。
但是我堅持下麵的錯誤當試圖驗證數據
validator.expect_column_values_to_not_be_null(列= " col1”)
validator.expect_column_values_to_be_in_set (
列= " col2 ",
value_set = (1,6)
)
MetricResolutionError:查詢與流媒體來源必須執行writeStream.start ();
看起來像遠大前程隻能使用靜態/批處理數據。
有人建議我如何可以得到它的工作呢?
我跟著下麵的磚與great_expectations筆記本開始
這是自動裝卸機代碼
從pyspark.sql。功能導入坳、to_date date_format pyspark.sql。類型進口StructType、StructField StringType、IntegerType FloatType, DateType導入時間#自動裝卸機表和檢查點路徑basepath =“/ mnt / autoloaderdemodl / datagenerator /”bronzeTable = basepath +“青銅/”bronzeCheckpoint = basepath +“檢查點/銅/”bronzeSchema = basepath +“模式/銅/”silverTable = basepath +“銀/”silverCheckpoint = basepath +“檢查點/銀/“landingZoneLocation =“/ mnt / autoloaderdemodl / datageneratorraw / customerdata_csv”#加載數據從CSV文件使用自動加載器銅表使用救援模式演化選項raw_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。格式”、“csv”) \ .option (“cloudFiles。schemaEvolutionMode”、“救援”)\ .option(“標題”,真的)\ .option (“cloudFiles。schemaLocation”, bronzeSchema) \ .option (“cloudFiles。inferSchema”、“真實”)\ .option (“cloudFiles。inferColumnTypes”,真的)\ .load (landingZoneLocation) #寫原始數據到青銅層bronze_df = raw_df.writeStream.format(“δ”)\ .trigger(一次= True) \ .queryName (bronzeLoader) \ .option (“checkpointLocation”, bronzeCheckpoint) \ .option (“mergeSchema”,“真正的”)\ .outputMode(“追加”)\ .start (bronzeTable) #等待青銅流完成bronze_df.awaitTermination()青銅= spark.read.format(“δ”).load (bronzeTable) bronze_count = bronze.count()顯示(青銅)打印(“在銅表的行數:{}”.format (bronze_count)) bronze_df = spark.readStream.format(“δ”).load (bronzeTable) #日期格式轉換應用到DataFrame # silver_df = bronze_df變換日期列。withColumn (date1 to_date(坳(“date1”)、“yyyyDDD”)) \ .withColumn (“date2 to_date(坳(“date2”)、“yyyyDDD”)) \ .withColumn (“date3 to_date(坳(“date3”)、“MMddyy”)) #寫silver_stream = silver_df DataFrame變成銀層。writeStream \ .format(“δ”)\ .outputMode(“追加”)\ .option (“mergeSchema”,“真正的”)\ .option (“checkpointLocation”, silverCheckpoint) \ .trigger(一次= true) \ .start (silverTable) #等待寫流完成silver_stream.awaitTermination() #數銀銀表的行數= spark.read.format(“δ”).load (silverTable)顯示(銀)silver_count = silver.count()打印(“銀表的行數:{}”.format (silver_count))
PS -客戶不想使用DLT呢。
@Chhaya Vishwakarma:
您可以考慮的替代方案來執行數據質量檢查作為流管道的一部分,使用火花內置的驗證功能。您可以使用斷言如isNotNull isin()()或檢查的質量數據寫到下一層之前管道。例如,您可以將以下代碼添加到您的銅銀轉換:(請核實和修改代碼,以滿足您的用例)
silver_df = bronze_df。withColumn (date1 to_date(坳(“date1”)、“yyyyDDD”)) \ .withColumn (“date2 to_date(坳(“date2”)、“yyyyDDD”)) \ .withColumn (“date3 to_date(坳(“date3”)、“MMddyy”)) \ .filter(坳(col1) .isNotNull ()) \ .filter(坳(col2) .isin ((1,6)))
這將過濾掉任何行,col1是null或col2不在一組(1,6)。你同樣可以根據需要添加額外的質量檢查。
請記住upvote最好的答案,幫助你!旁白,如果你需要更多的後續,在線程做回複,高興圓回來。