Apache火花的結構化流到生產
這是第五篇由多部分組成的係列如何你可以執行複雜的流分析使用Apache火花。
在磚,我們我們的生產管道遷移到結構化流過去幾個月,想分享我們的開箱即用的部署模型允許我們的客戶快速構建生產管道磚。beplay体育app下载地址
一個生產應用程序需要監控、報警和自動故障恢複(原生雲)方法。這篇文章不僅介紹可用的api解決這些挑戰但也將展示如何磚使結構化流在生產運行簡單。
度量和監控
結構化流在Apache火花提供了一個簡單的編程API來獲得當前執行流的信息。有兩個關鍵的命令,您可以運行在當前活動流為了得到相關信息的查詢執行進展:獲取當前的命令狀態查詢和命令recentProgress的查詢。
狀態
你可能會問的第一個問題是,“我現在流執行處理是什麼?”The status maintains information about the current state of the stream, and is accessible through the object that was returned when you started the query. For example, you might have a simple counts stream that provides counts of IOT devices defined by the following query.
查詢= streamingCountsDF \.writeStream \。格式(“記憶”)\.queryName (“計數”)\.outputMode (“完整的”)\.start ()
運行query.status
將返回流的當前狀態。這給了我們的細節發生了什麼在那個時間點上的流。
{“消息”:“從FileStreamSource得到補償(dbfs / databricks-datasets / structured-streaming /事件):“,“isDataAvailable”:真正的,“isTriggerActive”:真正的}
磚的筆記本給你一個簡單的方法來發現任何流查詢的狀態。你隻需將鼠標停在
在流媒體查詢圖標可用。你會得到相同的信息,使其更方便快速了解你的狀態流。最近的進展
雖然查詢狀態無疑是重要的,同樣重要的是一個能夠查看查詢的曆史進步。進步的元數據將使我們能夠回答這樣的問題“我處理元組以什麼速度?”或“元組到達從源的速度有多快?”
通過運行stream.recentProgress
你會獲得更多的基於時間的信息處理速度和批處理時間。然而,一張圖片勝過一千年JSON斑點,所以在磚,我們創建可視化為了方便快速分析的最新進展流。
讓我們了解一下為什麼我們選擇顯示這些指標和為什麼他們對你理解很重要。
輸入速度和處理速度
輸入率指定多少數據流入結構化流從一個係統就像卡夫卡或運動。處理速度是多快我們可以分析這些數據。在理想的情況下,這些應該變化始終在一起;然而,他們多少會有所不同根據輸入數據處理開始時存在。如果輸入速度遠遠超過的處理速度,我們流將落後,我們需要集群規模更大的大小來處理更大的負載。
批處理時間
幾乎所有的流媒體係統利用批處理操作在任何合理的吞吐量(一些有一個選項高延時,以換取更低的吞吐量)。結構化流實現。運營數據,您可能會看到這個振蕩結構化流過程不同數量的事件。這一核心集群在Community Edition中,我們可以看到,我們的批處理時間是振蕩持續三秒鍾左右。更大的集群自然會有更快的處理速度,以及批處理時間要短得多。
生產報警流工作
度量和監控都是很好,但是為了快速反應出現的任何問題,而無需照顧你流工作一整天,你需要一個健壯的警示故事。磚簡化了報警,允許你運行流工作生產管道。
例如,讓我們定義一個磚工作以下規格:
注意到我們設置一個電子郵件地址在PagerDuty觸發警報。這將觸發警報產品(或您指定的級別)當作業失敗。
自動故障恢複
同時提醒方便,不得不迫使人類應對停機不方便在最好的情況下,是不可能的。為了真正productionize結構化流,你會希望能夠盡快恢複自動失敗,同時確保數據一致性和數據丟失。磚使得這種無縫的:隻是之前設置重試的次數不可恢複的故障和磚將嚐試恢複流自動工作。在每一個失敗,你可以觸發通知生產中斷。
你得到兩全其美。係統將嚐試自我修複,同時保持員工和開發人員的通知狀態。
更新應用程序
有兩種情況下,你需要思考當你更新你的流媒體應用程序。在大多數情況下,如果你不改變重要的業務邏輯(如輸出模式)您可以簡單地重新啟動流使用相同的檢查站工作目錄。新更新的流媒體應用程序將離開,繼續運作。
然而,如果你改變狀態操作(如聚合或輸出模式),更新更多的參與。你必須用一個新的開始一種全新的流檢查點目錄。幸運的是,很容易啟動另一個並行流在磚為了運行時轉換到新流。
先進的報警和監控
還有其他幾種先進的監測技術,磚支持。例如,您可以使用係統輸出的通知Datadog,Apache卡夫卡,或Coda Hale指標。這些先進的技術可以用於實現外部監測和報警係統。
下麵是一個例子,您可以創建一個StreamingQueryListener將所有查詢進展信息轉發給卡夫卡。
類KafkaMetrics(服務器:字符串)擴展StreamingQueryListener{瓦爾kafkaProperties=新屬性()kafkaProperties。把(“bootstrap.servers”、服務器)kafkaProperties。把(“key.serializer”,“kafkashaded.org.apache.kafka.common.serialization.StringSerializer”)kafkaProperties。把(“value.serializer”,“kafkashaded.org.apache.kafka.common.serialization.StringSerializer”)瓦爾生產商=新KafkaProducer(字符串,字符串)(kafkaProperties)defonQueryProgress(事件:org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent):單位= {生產商。發送(新ProducerRecord (“streaming-metrics”,event.progress.json))}defonQueryStarted(事件:org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent):單位= {}defonQueryTerminated(事件:org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent):單位= {}}
結論
在這篇文章中,我們展示了簡單的結構化流從原型到生產使用磚。閱讀更多關於結構化流的其他方麵,閱讀我們的一係列的博客: