取消
顯示的結果
而不是尋找
你的意思是:

使用與autolaoder遠大前程

Chhaya
新的貢獻者三世

大家好!

我實現了一個數據管道使用自動裝卸機銅- - >銀- - >金。

現在當我做這個我想執行一些數據質量檢查,為此我用遠大前程圖書館。

但是我堅持下麵的錯誤當試圖驗證數據

validator.expect_column_values_to_not_be_null(列= " col1”)

validator.expect_column_values_to_be_in_set (

列= " col2 ",

value_set = (1,6)

)

MetricResolutionError:查詢與流媒體來源必須執行writeStream.start ();

看起來像遠大前程隻能使用靜態/批處理數據。

有人建議我如何可以得到它的工作呢?

我跟著下麵的磚與great_expectations筆記本開始

https://docs.greatexpectations.io/docs/deployment_patterns/how_to_use_great_expectations_in_databric..。

這是自動裝卸機代碼

從pyspark.sql。功能導入坳、to_date date_format pyspark.sql。類型進口StructType、StructField StringType、IntegerType FloatType, DateType導入時間#自動裝卸機表和檢查點路徑basepath =“/ mnt / autoloaderdemodl / datagenerator /”bronzeTable = basepath +“青銅/”bronzeCheckpoint = basepath +“檢查點/銅/”bronzeSchema = basepath +“模式/銅/”silverTable = basepath +“銀/”silverCheckpoint = basepath +“檢查點/銀/“landingZoneLocation =“/ mnt / autoloaderdemodl / datageneratorraw / customerdata_csv”#加載數據從CSV文件使用自動加載器銅表使用救援模式演化選項raw_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。格式”、“csv”) \ .option (“cloudFiles。schemaEvolutionMode”、“救援”)\ .option(“標題”,真的)\ .option (“cloudFiles。schemaLocation”, bronzeSchema) \ .option (“cloudFiles。inferSchema”、“真實”)\ .option (“cloudFiles。inferColumnTypes”,真的)\ .load (landingZoneLocation) #寫原始數據到青銅層bronze_df = raw_df.writeStream.format(“δ”)\ .trigger(一次= True) \ .queryName (bronzeLoader) \ .option (“checkpointLocation”, bronzeCheckpoint) \ .option (“mergeSchema”,“真正的”)\ .outputMode(“追加”)\ .start (bronzeTable) #等待青銅流完成bronze_df.awaitTermination()青銅= spark.read.format(“δ”).load (bronzeTable) bronze_count = bronze.count()顯示(青銅)打印(“在銅表的行數:{}”.format (bronze_count)) bronze_df = spark.readStream.format(“δ”).load (bronzeTable) #日期格式轉換應用到DataFrame # silver_df = bronze_df變換日期列。withColumn (date1 to_date(坳(“date1”)、“yyyyDDD”)) \ .withColumn (“date2 to_date(坳(“date2”)、“yyyyDDD”)) \ .withColumn (“date3 to_date(坳(“date3”)、“MMddyy”)) #寫silver_stream = silver_df DataFrame變成銀層。writeStream \ .format(“δ”)\ .outputMode(“追加”)\ .option (“mergeSchema”,“真正的”)\ .option (“checkpointLocation”, silverCheckpoint) \ .trigger(一次= true) \ .start (silverTable) #等待寫流完成silver_stream.awaitTermination() #數銀銀表的行數= spark.read.format(“δ”).load (silverTable)顯示(銀)silver_count = silver.count()打印(“銀表的行數:{}”.format (silver_count))

PS -客戶不想使用DLT呢。

6個回答6

匿名
不適用

@Chhaya Vishwakarma:不錯的選擇!與DLT所有最好的。

Vidula_Khanna
主持人
主持人

嗨@Chhaya Vishwakarma

謝謝你的問題!幫助你更好的,請花一些時間來檢查答案,讓我知道它是否最適合您的需要。

請幫助我們選擇最好的解決方案通過點擊“選擇最佳”如果它。

您的反饋將幫助我們確保我們提供最好的服務給你。

謝謝你!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map