取消
顯示的結果
而不是尋找
你的意思是:

流數據從三角洲表後eventhub合並數據,得到超時錯誤! !

Ruby8376
貢獻者

這是我的代碼寫數據從三角洲表事件中心(從消費者團隊將消耗數據):

進口org.apache.spark.eventhubs._進口org.apache.spark.sql.streaming。觸發._進口org.apache.spark.sql.types._進口org.apache.spark.sql.functions._進口java.util。屬性進口com.microsoft.azure.eventhubs。{EventData,PartitionSender}進口org.apache.spark.eventhubs。EventHubsConf進口io.delta.tables._進口org.apache.spark.sql.streaming。觸發進口. io .PrintWriter進口java.time。ZonedDateTime進口java.time.format。DateTimeFormatter進口scala.concurrent.duration._進口java.nio.file。{路徑,文件}/ /配置Azure事件中心的細節瓦爾namespaceNameOut =“des - ent prod cus -流- eventhub - 001”瓦爾eventHubNameOut =“hvc-prodstats-output”瓦爾sasKeyNameOut =“作家”瓦爾sasKeyOut = = dbutils.secrets.get(範圍“deskvscope”、關鍵=“des-ent-prod-cus-stream-e venthub - 001作家”)/ /配置檢查點和壞數據路徑瓦爾checkpoint_dir_path =“/ mnt / hvc-wenco / prodstats /流/檢查站”瓦爾baddata_path =“/ mnt / hvc-wenco / prodstats /流/ Bad_data”/ /定義時間戳檢查點的道路瓦爾tbl_version_timestamp_path =“/ mnt / hvc - wenco / equip_status_history_table /檢查點/檢查站”/ /配置其他參數瓦爾MaxEvents=5000年/ /讀最後一個檢查點的時間戳瓦爾last_checkpoint_string = dbutils.fs.head (tbl_version_timestamp_path)/ /解析最後的檢查點時間戳瓦爾time_format =“yyyy-MM-dd HH: mm: ss.SSSz”瓦爾格式化程序=DateTimeFormatter.ofPattern (time_format)瓦爾last_checkpoint =ZonedDateTime.parse (last_checkpoint_string格式化程序)/ /建立連接事件中心瓦爾connStrOut =com.microsoft.azure.eventhubs。ConnectionStringBuilder().setNamespaceName (namespaceNameOut) .setEventHubName (eventHubNameOut) .setSasKeyName (sasKeyNameOut) .setSasKey (sasKeyOut)瓦爾ehWriteConf =EventHubsConf(connStrOut.toString ())/ /創建一個流dataframe從三角洲表瓦爾InputStreamingDF=火花.readStream .option (“maxFilesPerTrigger”,1).option (“startingTimestamp”last_checkpoint_string) .option (“readChangeFeed”,“真正的”).table (“wencohvc.equip_status_history_table”)瓦爾dropPreTransform =InputStreamingDF.filter (InputStreamingDF(“_change_type”)=。=“update_preimage”)瓦爾operationTransform = dropPreTransform.withColumn (“操作”,當($“_change_type”= = =“插入”,2當($).otherwise (“_change_type”= = =“update_postimage”,4)))瓦爾transformedDF = operationTransform.withColumn (“DeletedIndicator”,當($“_change_type”= = =“刪除”,“Y”).otherwise (“N”))瓦爾finalDF = transformedDF.drop (“_change_type”,“_commit_version”,“_commit_timestamp”)/ /寫入事件中心重試和檢查點var重試=真正的varretryCount =0瓦爾maxRetries =3(重試& & retryCount < maxRetries) {試一試{瓦爾流= finalDF .select (to_json(結構體(* / / *列列表).alias (“身體”).writeStream .format (“eventhubs”).options ehWriteConf.toMap .option (“checkpointLocation”checkpoint_dir_path) .trigger (觸發AvailableNow重試=).start () stream.awaitTermination ()}{情況下艾凡:異常= > retryCount + =1如果(retryCount < maxRetries) {瓦爾延遲=2。秒* retryCount println (“流嚐試retryCount美元失敗,重試$ {delay.toSeconds}秒……”)線程.sleep (delay.toMillis)}}}/ /寫檢查點瓦爾emptyDF =Seq((1).toDF (“>”)瓦爾checkpoint_timestamp = emptyDF.withColumn (“current_timestamp”current_timestamp())當代().getTimestamp (1)+“+ 00:00”dbutils.fs。把(tbl_version_timestamp_path checkpoint_timestamp.toString (),真正的)

問題是超時之前最後一個命令和最後一個檢查點命令不會運行。我已經嚐試重試機製,但是,它超時。從源數據量很大,我不想流複製數據通過運行筆記本一次又一次,如果沒有適當的檢查點發生。我怎麼解決這個問題? ?我希望工作正確運行和儲存最後的檢查點時間戳,所以拿起在下一次運行形式,但它超時之前最後一個命令。錯誤的是:

錯誤:一些流終止之前命令可以完成!嚐試1失敗,重試2秒……嚐試2失敗,重試4秒……重試:布爾=真正的retryCount:Int=3maxRetries:Int=3錯誤:一些流終止之前命令可以完成!命令花了0.04

13日回複13

Kaniz
社區經理
社區經理

@Ruby8376,幫助我們構建一個充滿活力和足智多謀的社區通過識別和強調洞察力的貢獻。請接受這個解決方案和表達你的感激!

謝謝Kaniz !

werners1
尊敬的貢獻者三世

好的,錯誤消息顯然狀態超時事件中心一側。
所以最好的方法就是看事件中心配置(而不是通過各種技巧在火花)。
試圖設置receiverTimeout / operationTimeout連接字符串,或maxRatePerPartition。
第一寫整個三角洲表將讀取事件中心應該準備什麼實際上是一個大型的批量裝載。
我沒有一個明確的答案的設置將在你的工作情況,你必須嚐試一些值(甚至加上火花流設置如前所述)。

事件引發流中心配置

Ruby8376
貢獻者

謝謝你!@werners1

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map