這些文章可以幫助您了解結構化流和Spark流(遺留的Apache Spark流特性)。
21本類別中的物品
如果您仍有疑問或希望直接從代理處獲得幫助,請提交申請。我們會盡快回複您的。
請輸入您的請求的詳細信息。我們的支持人員會盡快做出回應。
使用追加模式進行聚合,返回異常錯誤消息。當流式數據幀/數據集上存在流聚合時,不支持追加輸出模式。原因分析在沒有水印的聚合數據幀上不能使用追加輸出模式。這是有意為之。處理建議您必須申請一個…
最後更新:2022年5月17日通過亞當Pavlacka
您試圖在Databricks流作業中使用Spark離散化流(DStream),但作業正在失敗。產生原因數據庫不支持DStreams和DStream API。解決方案不使用Spark DStream,你應該遷移到結構化流。回顧生產中的Databricks結構化流(AWS | Azure |…
當您將數據流輸入文件接收器時,您應該始終同時更改檢查點和輸出目錄。否則,您可能會得到失敗或意外的輸出。Apache Spark在輸出目錄中創建了一個名為_spark_metadata的文件夾。此文件夾包含每個批處理運行的預寫日誌。這就是Spark如何得到精確的一次保證…
當您使用Auto Loader (AWS | Azure | GCP)處理流文件時,將根據底層存儲中創建的文件記錄事件。本文向您展示如何將每個文件名的文件路徑添加到輸出DataFrame中的新列中。其中一個用例是審計。當文件被攝取到一個分區的文件夾結構,在那裏i…
最後更新:2022年5月18日通過亞當Pavlacka
本文介紹如何在AWS EC2機器上設置Apache Kafka,並將它們與Databricks連接。以下是創建Kafka集群和從Databricks筆記本連接所需的高級步驟。步驟1:在AWS中創建新VPC創建新VPC時,請將新的VPC CIDR範圍與Databricks的VPC CIDR範圍設置為不同…
如果S3中的數據按分區存儲,則使用分區列的值來命名源目錄結構中的文件夾。但是,如果使用SQS隊列作為流源,則S3-SQS源無法檢測分區列值。例如,以下DataFrame以JSON格式保存到S3: %scala val df = spark.range(1…
場景:你有一個流,運行一個窗口聚合查詢,從Apache Kafka讀取文件,並以追加模式寫入文件。您希望升級應用程序並重新啟動查詢,使偏移量等於最後寫入的偏移量。您希望丟棄所有尚未寫入接收器的狀態信息,從最早開始處理…
你有一個結構化流作業通過S3-SQS連接器運行。假設您希望以SNS數據為後盾重新創建源SQS,並且希望在同一作業和同一輸出目錄中繼續處理一個新隊列。處理建議新建SQS隊列,從SNS訂閱s3-events。在…
當你試圖讀取或寫入數據到Kafka流時,你得到一個錯誤消息。kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer原因:kafkashaded.org.apache.kafka.common.config.ConfigException: bootstrap中沒有給出可解析的引導url。如果你正在運行一個筆記本電腦,錯誤我…
您在集群上啟用了表訪問控製(AWS | Azure | GCP)。您正在嚐試運行結構化流查詢並獲得錯誤消息。py4j.security。Py4JSecurityException:方法public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.SQLContext.readStream()不在類類org.apache.s的白名單中…
最後更新:2022年5月19日通過mathan.pillai
你有一個流作業使用display()來顯示DataFrames。%scala val streamingDF = spark.readStream.schema(schema).parquet() display(streamingDF)檢查點文件正在創建,但沒有被刪除。您可以通過導航到根目錄並查看/local_disk0/tmp/文件夾來驗證問題。Ch……
最後更新:2022年5月19日通過亞當Pavlacka
你有一個流作業使用foreachBatch()來處理DataFrames。% scala streamingDF.writeStream.outputMode(“追加”)。foreachBatch {(batchDF: DataFrame, batchId: Long) => batchDF.write.format("parquet").mode(" override ").save(output_directory)}.start()檢查點文件正在創建,但沒有被刪除。你可以證實…
您有一個Apache Spark作業失敗,出現Java斷言錯誤Java .lang. assertionerror:斷言失敗:檢測到衝突的目錄結構。由:org.apache.spark.sql.streaming.StreamingQueryException:在嚐試推斷當前批文件的分區模式時出錯。請求……
最後更新:2022年5月19日通過阿施施
當你試圖使用RocksDB作為結構化流應用程序的狀態存儲時,你得到一個錯誤消息,說實例無法獲得。導致:java.lang.IllegalStateException: RocksDB實例不能被[ThreadId: 742, task: 140.3 in stage 3152, TID 553193]獲取,因為它沒有被[ThreadI…
最後更新:2023年2月25日通過亞當Pavlacka
Apache Spark不包括用於XML文件的流式API。但是,您可以將Spark批處理API的自動加載器特性與OSS庫Spark-XML結合起來,以流處理XML文件。在本文中,我們將介紹一個基於Scala的解決方案,它使用自動加載器解析XML數據。安裝Spark-XML庫必須安裝Spark-XML OSS庫…
你有一個流作業寫到一個Kinesis接收器,它是失敗的內存不足錯誤消息。Java .lang. outofmemoryerror: GC開銷限製超過Java .lang. outofmemoryerror: Java堆空間。症狀包括:Ganglia顯示JVM內存使用量逐漸增加。微批分析顯示輸入和處理率是一致的…
您正在監視一個流作業,並注意到它在處理數據時似乎卡住了。當您查看日誌時,您發現作業在向檢查點寫入數據時卡住了。INFO hdfsbackkedstatestoreprovider: delete files than 381160 for HDFSStateStoreProvider[id = (op=0,part=89),dir = dbfs:/FileStore/R_CHECKPOINT5/st…
最後更新:2022年5月19日通過何塞·岡薩雷斯
提示本文適用於Databricks Runtime 9.1 LTS及以上版本。當你得到一個IllegalArgumentException時,你正在使用自動加載器為你的ELT管道攝取數據:請提供源目錄路徑選項' path '錯誤消息。當您啟動Auto Loader作業時,如果數據路徑或數據…
最後更新:2022年10月12日通過何塞·岡薩雷斯
當運行一個使用雲存儲桶(S3, ADLS Gen2等)的結構化流應用程序時,當您訪問存儲桶時,很容易引起過多的事務。沒有在流代碼中指定.trigger選項是導致大量存儲事務的一個常見原因。當沒有指定.trigger選項時,sto…
最後更新:2022年10月26日通過chetan.kardekar
您正在運行一係列結構化流作業並寫入文件接收器。每運行10次,運行速度就會比前一次作業慢。產生原因文件sink在目標路徑下創建了一個_spark_metadata文件夾。這個元數據文件夾存儲關於每個批處理的信息,包括哪些文件是批處理的一部分。這是證明……所必需的。
最後更新:2022年10月28日通過gopinath.chandrasekaran
您正在使用Auto Loader (AWS | Azure | GCP)運行流作業,並希望從存儲帳戶獲得每個文件的最後修改時間。獲取由Auto Loader使用的文件的路徑文章描述了如何獲取由Auto Loader使用的所有文件的文件名和路徑。在本文中,我們將在此基礎上……
最後更新:2022年12月1日通過DD沙瑪