取消
顯示的結果
而不是尋找
你的意思是:

流數據從三角洲表後eventhub合並數據,得到超時錯誤! !

Ruby8376
貢獻者

這是我的代碼寫數據從三角洲表事件中心(從消費者團隊將消耗數據):

進口org.apache.spark.eventhubs._進口org.apache.spark.sql.streaming。觸發._進口org.apache.spark.sql.types._進口org.apache.spark.sql.functions._進口java.util。屬性進口com.microsoft.azure.eventhubs。{EventData,PartitionSender}進口org.apache.spark.eventhubs。EventHubsConf進口io.delta.tables._進口org.apache.spark.sql.streaming。觸發進口. io .PrintWriter進口java.time。ZonedDateTime進口java.time.format。DateTimeFormatter進口scala.concurrent.duration._進口java.nio.file。{路徑,文件}/ /配置Azure事件中心的細節瓦爾namespaceNameOut =“des - ent prod cus -流- eventhub - 001”瓦爾eventHubNameOut =“hvc-prodstats-output”瓦爾sasKeyNameOut =“作家”瓦爾sasKeyOut = = dbutils.secrets.get(範圍“deskvscope”、關鍵=“des-ent-prod-cus-stream-e venthub - 001作家”)/ /配置檢查點和壞數據路徑瓦爾checkpoint_dir_path =“/ mnt / hvc-wenco / prodstats /流/檢查站”瓦爾baddata_path =“/ mnt / hvc-wenco / prodstats /流/ Bad_data”/ /定義時間戳檢查點的道路瓦爾tbl_version_timestamp_path =“/ mnt / hvc - wenco / equip_status_history_table /檢查點/檢查站”/ /配置其他參數瓦爾MaxEvents=5000年/ /讀最後一個檢查點的時間戳瓦爾last_checkpoint_string = dbutils.fs.head (tbl_version_timestamp_path)/ /解析最後的檢查點時間戳瓦爾time_format =“yyyy-MM-dd HH: mm: ss.SSSz”瓦爾格式化程序=DateTimeFormatter.ofPattern (time_format)瓦爾last_checkpoint =ZonedDateTime.parse (last_checkpoint_string格式化程序)/ /建立連接事件中心瓦爾connStrOut =com.microsoft.azure.eventhubs。ConnectionStringBuilder().setNamespaceName (namespaceNameOut) .setEventHubName (eventHubNameOut) .setSasKeyName (sasKeyNameOut) .setSasKey (sasKeyOut)瓦爾ehWriteConf =EventHubsConf(connStrOut.toString ())/ /創建一個流dataframe從三角洲表瓦爾InputStreamingDF=火花.readStream .option (“maxFilesPerTrigger”,1).option (“startingTimestamp”last_checkpoint_string) .option (“readChangeFeed”,“真正的”).table (“wencohvc.equip_status_history_table”)瓦爾dropPreTransform =InputStreamingDF.filter (InputStreamingDF(“_change_type”)=。=“update_preimage”)瓦爾operationTransform = dropPreTransform.withColumn (“操作”,當($“_change_type”= = =“插入”,2當($).otherwise (“_change_type”= = =“update_postimage”,4)))瓦爾transformedDF = operationTransform.withColumn (“DeletedIndicator”,當($“_change_type”= = =“刪除”,“Y”).otherwise (“N”))瓦爾finalDF = transformedDF.drop (“_change_type”,“_commit_version”,“_commit_timestamp”)/ /寫入事件中心重試和檢查點var重試=真正的varretryCount =0瓦爾maxRetries =3(重試& & retryCount < maxRetries) {試一試{瓦爾流= finalDF .select (to_json(結構體(* / / *列列表).alias (“身體”).writeStream .format (“eventhubs”).options ehWriteConf.toMap .option (“checkpointLocation”checkpoint_dir_path) .trigger (觸發AvailableNow重試=).start () stream.awaitTermination ()}{情況下艾凡:異常= > retryCount + =1如果(retryCount < maxRetries) {瓦爾延遲=2。秒* retryCount println (“流嚐試retryCount美元失敗,重試$ {delay.toSeconds}秒……”)線程.sleep (delay.toMillis)}}}/ /寫檢查點瓦爾emptyDF =Seq((1).toDF (“>”)瓦爾checkpoint_timestamp = emptyDF.withColumn (“current_timestamp”current_timestamp())當代().getTimestamp (1)+“+ 00:00”dbutils.fs。把(tbl_version_timestamp_path checkpoint_timestamp.toString (),真正的)

問題是超時之前最後一個命令和最後一個檢查點命令不會運行。我已經嚐試重試機製,但是,它超時。從源數據量很大,我不想流複製數據通過運行筆記本一次又一次,如果沒有適當的檢查點發生。我怎麼解決這個問題? ?我希望工作正確運行和儲存最後的檢查點時間戳,所以拿起在下一次運行形式,但它超時之前最後一個命令。錯誤的是:

錯誤:一些流終止之前命令可以完成!嚐試1失敗,重試2秒……嚐試2失敗,重試4秒……重試:布爾=真正的retryCount:Int=3maxRetries:Int=3錯誤:一些流終止之前命令可以完成!命令花了0.04

13日回複13

最初是沒有循環。我不得不將它添加重試機製所以不超時之前完成寫操作。

werners1
尊敬的貢獻者三世

擺脫它,讓我們試著找到原因。

你能將maxfilespertrigger設置為默認(1000)?

還要檢查Kaniz提到的點。

我的感覺是,太多的數據處理。

werners1
尊敬的貢獻者三世

也試著用maxBytesPerTrigger

工作階段失敗而終止:任務0階段24.0失敗了4次,最近的失敗:在舞台上失去了任務0.3 24.0 (TID 44)(10.188.133.16執行人0):com.microsoft.azure.eventhubs。TimeoutException:實體(qb2-testprodstats-output):發送操作超時2023 - 08 - 09 - t16:42:30.177z(等/ UTC)。,errorContext[NS: des-ent-prod-cus-stream-eventhub-001.servicebus.windows.net, PATH: qb2-testprodstats-output, REFERENCE_ID: 479AC68402153E0B02153E2864D3C08F_G2, LINK_CREDIT: 0]
com.microsoft.azure.eventhubs.impl.MessageSender.throwSenderTimeout (MessageSender.java: 947)
com.microsoft.azure.eventhubs.impl.MessageSender.access 1800美元(MessageSender.java: 62)
com.microsoft.azure.eventhubs.impl.MessageSender SendTimeout.run美元(MessageSender.java: 1069)
com.microsoft.azure.eventhubs.impl.Timer ScheduledTask.onEvent美元(Timer.java: 48)
com.microsoft.azure.eventhubs.impl.DispatchHandler.onTimerTask (DispatchHandler.java: 12)
org.apache.qpid.proton.engine.BaseHandler.handle (BaseHandler.java: 233)
org.apache.qpid.proton.engine.impl.EventImpl.dispatch (EventImpl.java: 108)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch (ReactorImpl.java: 324)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process (ReactorImpl.java: 291)
com.microsoft.azure.eventhubs.impl.MessagingFactory RunReactor.run美元(MessagingFactory.java: 784)
java.util.concurrent.Executors RunnableAdapter.call美元(Executors.java: 511)
java.util.concurrent.FutureTask.run (FutureTask.java: 266)
在java.util.concurrent.ScheduledThreadPoolExecutor ScheduledFutureTask.access 201美元(ScheduledThreadPoolExecutor.java: 180)
java.util.concurrent.ScheduledThreadPoolExecutor ScheduledFutureTask.run美元(ScheduledThreadPoolExecutor.java: 293)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1149)
java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 624)
java.lang.Thread.run (Thread.java: 750)

得到這個錯誤後修改。

Kaniz
社區經理
社區經理

@Ruby8376,你麵臨的問題是,流媒體的工作是超時之前最後一個命令,和前麵的檢查點命令不會運行。

為了解決這個問題,嚐試以下步驟:

1。增加了通過設置超時時間流的工作spark.sql.streaming.stopTimeout配置屬性。你可以設置一個更高的價值,比如600年代(10分鍾),允許更多的時間來完成工作。

2。檢查資源分配給流媒體工作。如果工作耗盡資源,比如內存或CPU,它可能導致超時。你可以嚐試增加資源分配給的工作,比如每個執行者的執行人或內存數量,是否就可以解決這個問題。

3所示。驗證檢查點目錄和破壞性的數據路徑和擁有必要的權限訪問。如果有任何問題與訪問或寫這些路徑,它可以導致工作失敗或超時。保證的方式存在,運行工作必要的權限的用戶讀和寫這些路徑。

4所示。檢查磚集群之間的網絡連接和Azure事件中心。如果有任何網絡問題或者連接問題,它可以導致流工作失敗或超時。確保集群訪問事件的中心,沒有防火牆或網絡限製阻止連接。

5。檢查生成的日誌和錯誤消息流的工作確定任何特定的錯誤或問題導致超時。你的錯誤消息顯示,一些流終止命令之前可以完成,但它不提供具體細節的原因。分析日誌和錯誤的筆記可以幫助你找到確切的問題,采取適當的措施來解決它。

來源:
- - - - - -文檔:event-hubs

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map