取消
顯示的結果
而不是尋找
你的意思是:

使用與autolaoder遠大前程

Chhaya
新的貢獻者三世

大家好!

我實現了一個數據管道使用自動裝卸機銅- - >銀- - >金。

現在當我做這個我想執行一些數據質量檢查,為此我用遠大前程圖書館。

但是我堅持下麵的錯誤當試圖驗證數據

validator.expect_column_values_to_not_be_null(列= " col1”)

validator.expect_column_values_to_be_in_set (

列= " col2 ",

value_set = (1,6)

)

MetricResolutionError:查詢與流媒體來源必須執行writeStream.start ();

看起來像遠大前程隻能使用靜態/批處理數據。

有人建議我如何可以得到它的工作呢?

我跟著下麵的磚與great_expectations筆記本開始

https://docs.greatexpectations.io/docs/deployment_patterns/how_to_use_great_expectations_in_databric..。

這是自動裝卸機代碼

從pyspark.sql。功能導入坳、to_date date_format pyspark.sql。類型進口StructType、StructField StringType、IntegerType FloatType, DateType導入時間#自動裝卸機表和檢查點路徑basepath =“/ mnt / autoloaderdemodl / datagenerator /”bronzeTable = basepath +“青銅/”bronzeCheckpoint = basepath +“檢查點/銅/”bronzeSchema = basepath +“模式/銅/”silverTable = basepath +“銀/”silverCheckpoint = basepath +“檢查點/銀/“landingZoneLocation =“/ mnt / autoloaderdemodl / datageneratorraw / customerdata_csv”#加載數據從CSV文件使用自動加載器銅表使用救援模式演化選項raw_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。格式”、“csv”) \ .option (“cloudFiles。schemaEvolutionMode”、“救援”)\ .option(“標題”,真的)\ .option (“cloudFiles。schemaLocation”, bronzeSchema) \ .option (“cloudFiles。inferSchema”、“真實”)\ .option (“cloudFiles。inferColumnTypes”,真的)\ .load (landingZoneLocation) #寫原始數據到青銅層bronze_df = raw_df.writeStream.format(“δ”)\ .trigger(一次= True) \ .queryName (bronzeLoader) \ .option (“checkpointLocation”, bronzeCheckpoint) \ .option (“mergeSchema”,“真正的”)\ .outputMode(“追加”)\ .start (bronzeTable) #等待青銅流完成bronze_df.awaitTermination()青銅= spark.read.format(“δ”).load (bronzeTable) bronze_count = bronze.count()顯示(青銅)打印(“在銅表的行數:{}”.format (bronze_count)) bronze_df = spark.readStream.format(“δ”).load (bronzeTable) #日期格式轉換應用到DataFrame # silver_df = bronze_df變換日期列。withColumn (date1 to_date(坳(“date1”)、“yyyyDDD”)) \ .withColumn (“date2 to_date(坳(“date2”)、“yyyyDDD”)) \ .withColumn (“date3 to_date(坳(“date3”)、“MMddyy”)) #寫silver_stream = silver_df DataFrame變成銀層。writeStream \ .format(“δ”)\ .outputMode(“追加”)\ .option (“mergeSchema”,“真正的”)\ .option (“checkpointLocation”, silverCheckpoint) \ .trigger(一次= true) \ .start (silverTable) #等待寫流完成silver_stream.awaitTermination() #數銀銀表的行數= spark.read.format(“δ”).load (silverTable)顯示(銀)silver_count = silver.count()打印(“銀表的行數:{}”.format (silver_count))

PS -客戶不想使用DLT呢。

6個回答6

匿名
不適用

@Chhaya Vishwakarma:

錯誤消息顯示,相關的問題是數據流的特性,這就需要使用一個特定的方法來執行查詢,包括流媒體資源。遠大前程是設計用於處理批處理/靜態數據,這意味著它不能被直接用來驗證流數據來源。

這個問題的一個解決方案是使用foreachBatch火花結構化流提供的API,它允許您使用一個批處理流數據的函數。在這個函數中,你可以把你的數據質量檢查使用遠大前程。

希望這有助於讓你前進,

Chhaya
新的貢獻者三世

謝謝你的回複,我已經嚐試foreachbatch並不工作。你是對通用電氣不支持流媒體數據的。

匿名
不適用

@Chhaya Vishwakarma:

您可以考慮的替代方案來執行數據質量檢查作為流管道的一部分,使用火花內置的驗證功能。您可以使用斷言如isNotNull isin()()或檢查的質量數據寫到下一層之前管道。例如,您可以將以下代碼添加到您的銅銀轉換:(請核實和修改代碼,以滿足您的用例)

silver_df = bronze_df。withColumn (date1 to_date(坳(“date1”)、“yyyyDDD”)) \ .withColumn (“date2 to_date(坳(“date2”)、“yyyyDDD”)) \ .withColumn (“date3 to_date(坳(“date3”)、“MMddyy”)) \ .filter(坳(col1) .isNotNull ()) \ .filter(坳(col2) .isin ((1,6)))

這將過濾掉任何行,col1是null或col2不在一組(1,6)。你同樣可以根據需要添加額外的質量檢查。

請記住upvote最好的答案,幫助你!旁白,如果你需要更多的後續,在線程做回複,高興圓回來。

Chhaya
新的貢獻者三世

由於@Suteja卡努裏人對你的建議,客戶認為使用DLT搬到:)。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map