這是我的代碼寫數據從三角洲表事件中心(從消費者團隊將消耗數據):
進口org.apache.spark.eventhubs._進口org.apache.spark.sql.streaming。觸發._進口org.apache.spark.sql.types._進口org.apache.spark.sql.functions._進口java.util。屬性進口com.microsoft.azure.eventhubs。{EventData,PartitionSender}進口org.apache.spark.eventhubs。EventHubsConf進口io.delta.tables._進口org.apache.spark.sql.streaming。觸發進口. io .PrintWriter進口java.time。ZonedDateTime進口java.time.format。DateTimeFormatter進口scala.concurrent.duration._進口java.nio.file。{路徑,文件}/ /配置Azure事件中心的細節瓦爾namespaceNameOut =“des - ent prod cus -流- eventhub - 001”瓦爾eventHubNameOut =“hvc-prodstats-output”瓦爾sasKeyNameOut =“作家”瓦爾sasKeyOut = = dbutils.secrets.get(範圍“deskvscope”、關鍵=“des-ent-prod-cus-stream-e venthub - 001作家”)/ /配置檢查點和壞數據路徑瓦爾checkpoint_dir_path =“/ mnt / hvc-wenco / prodstats /流/檢查站”瓦爾baddata_path =“/ mnt / hvc-wenco / prodstats /流/ Bad_data”/ /定義時間戳檢查點的道路瓦爾tbl_version_timestamp_path =“/ mnt / hvc - wenco / equip_status_history_table /檢查點/檢查站”/ /配置其他參數瓦爾MaxEvents=5000年/ /讀最後一個檢查點的時間戳瓦爾last_checkpoint_string = dbutils.fs.head (tbl_version_timestamp_path)/ /解析最後的檢查點時間戳瓦爾time_format =“yyyy-MM-dd HH: mm: ss.SSSz”瓦爾格式化程序=DateTimeFormatter.ofPattern (time_format)瓦爾last_checkpoint =ZonedDateTime.parse (last_checkpoint_string格式化程序)/ /建立連接事件中心瓦爾connStrOut =新com.microsoft.azure.eventhubs。ConnectionStringBuilder().setNamespaceName (namespaceNameOut) .setEventHubName (eventHubNameOut) .setSasKeyName (sasKeyNameOut) .setSasKey (sasKeyOut)瓦爾ehWriteConf =EventHubsConf(connStrOut.toString ())/ /創建一個流dataframe從三角洲表瓦爾InputStreamingDF=火花.readStream .option (“maxFilesPerTrigger”,1).option (“startingTimestamp”last_checkpoint_string) .option (“readChangeFeed”,“真正的”).table (“wencohvc.equip_status_history_table”)瓦爾dropPreTransform =InputStreamingDF.filter (InputStreamingDF(“_change_type”)=。=“update_preimage”)瓦爾operationTransform = dropPreTransform.withColumn (“操作”,當($“_change_type”= = =“插入”,2當($).otherwise (“_change_type”= = =“update_postimage”,4)))瓦爾transformedDF = operationTransform.withColumn (“DeletedIndicator”,當($“_change_type”= = =“刪除”,“Y”).otherwise (“N”))瓦爾finalDF = transformedDF.drop (“_change_type”,“_commit_version”,“_commit_timestamp”)/ /寫入事件中心重試和檢查點var重試=真正的varretryCount =0瓦爾maxRetries =3而(重試& & retryCount < maxRetries) {試一試{瓦爾流= finalDF .select (to_json(結構體(* / / *列列表).alias (“身體”).writeStream .format (“eventhubs”).options ehWriteConf.toMap .option (“checkpointLocation”checkpoint_dir_path) .trigger (觸發。AvailableNow重試=).start () stream.awaitTermination ()假}抓{情況下艾凡:異常= > retryCount + =1如果(retryCount < maxRetries) {瓦爾延遲=2。秒* retryCount println (“流嚐試retryCount美元失敗,重試$ {delay.toSeconds}秒……”)線程.sleep (delay.toMillis)}}}/ /寫檢查點瓦爾emptyDF =Seq((1).toDF (“>”)瓦爾checkpoint_timestamp = emptyDF.withColumn (“current_timestamp”current_timestamp())當代().getTimestamp (1)+“+ 00:00”dbutils.fs。把(tbl_version_timestamp_path checkpoint_timestamp.toString (),真正的)
問題是超時之前最後一個命令和最後一個檢查點命令不會運行。我已經嚐試重試機製,但是,它超時。從源數據量很大,我不想流複製數據通過運行筆記本一次又一次,如果沒有適當的檢查點發生。我怎麼解決這個問題? ?我希望工作正確運行和儲存最後的檢查點時間戳,所以拿起在下一次運行形式,但它超時之前最後一個命令。錯誤的是:
錯誤:一些流終止之前這命令可以完成!流嚐試1失敗,重試2秒……流嚐試2失敗,重試4秒……重試:布爾=真正的retryCount:Int=3maxRetries:Int=3錯誤:一些流終止之前這命令可以完成!命令花了0.04秒
嗨@Ruby8376,你麵臨的問題是,流媒體的工作是超時之前最後一個命令,和前麵的檢查點命令不會運行。
為了解決這個問題,嚐試以下步驟:
1。增加了通過設置超時時間流的工作spark.sql.streaming.stopTimeout
配置屬性。你可以設置一個更高的價值,比如600年代
(10分鍾),允許更多的時間來完成工作。
2。檢查資源分配給流媒體工作。如果工作耗盡資源,比如內存或CPU,它可能導致超時。你可以嚐試增加資源分配給的工作,比如每個執行者的執行人或內存數量,是否就可以解決這個問題。
3所示。驗證檢查點目錄和破壞性的數據路徑和擁有必要的權限訪問。如果有任何問題與訪問或寫這些路徑,它可以導致工作失敗或超時。保證的方式存在,運行工作必要的權限的用戶讀和寫這些路徑。
4所示。檢查磚集群之間的網絡連接和Azure事件中心。如果有任何網絡問題或者連接問題,它可以導致流工作失敗或超時。確保集群訪問事件的中心,沒有防火牆或網絡限製阻止連接。
5。檢查生成的日誌和錯誤消息流的工作確定任何特定的錯誤或問題導致超時。你的錯誤消息顯示,一些流終止命令之前可以完成,但它不提供具體細節的原因。分析日誌和錯誤的筆記可以幫助你找到確切的問題,采取適當的措施來解決它。
來源:
- - - - - -文檔:event-hubs