低延遲流數據管道與達美住表和Apache卡夫卡
三角洲生活表(DLT)是第一個ETL框架,使用一個簡單的聲明性方法用於創建可靠的數據管道和全麵管理批處理和底層基礎設施的規模嗎流數據。許多用例需要可行的見解來自附近的實時數據。增量住表可以低延遲流數據管道支持這樣的用例通過直接攝取低延遲的數據事件等公交車Apache卡夫卡,AWS運動,彙合的雲,亞馬遜MSK的,或Azure的活動中心。
本文將介紹使用Apache的DLT卡夫卡,同時提供攝取所需的Python代碼流。推薦係統架構將會解釋說,和相關的DLT設置值得考慮將探索。
流媒體平台Beplay体育安卓版本
事件總線或消息總線分離消息生產者與消費者。一個流行的流媒體用例是點擊率數據的收集來自用戶的導航網站,每一個用戶交互是Apache卡夫卡存儲為一個事件。然後從卡夫卡事件流是用於實時流數據分析。多個消息的消費者可以從卡夫卡讀取相同的數據和使用數據,了解受眾利益,轉化率,反彈的原因。從用戶交互實時流媒體事件數據通常也需要與實際購買存儲在數據庫計費。
Apache卡夫卡
Apache卡夫卡是一個很受歡迎的開放源代碼事件總線。卡夫卡使用主題的概念,一個擴展分布式日誌事件的消息是一定的時間緩衝。雖然在卡夫卡不刪除消息一旦消耗他們,他們也不會無限期保存。卡夫卡的消息保留可以配置每個主題,默認為7天。過期的信息最終將被刪除。
這篇文章是圍繞Apache卡夫卡;然而,概念也適用於許多討論其他事件總線或消息傳遞係統。
流數據管道
在數據流管道,三角洲住表及其依賴關係可以用一個標準的SQL聲明創建表選擇DLT (cta)語句和關鍵字“生活”。
當發展與Python的DLT,@dlt.table
修飾符是用來創建一個三角洲住表。確保數據質量在一個管道,DLT用途預期簡單的SQL的約束條款定義管道的行為與無效記錄。
由於流媒體工作負載常常有不可預測的數據量,數據磚使用增強自動定量為數據流管道,以減少整體的端到端延時,同時降低成本通過關閉不必要的基礎設施。
三角洲生活表完全重新計算,以正確的順序,每個管道運行一次。
相比之下,流三角洲生活表是有狀態的,增量計算和處理數據,添加了自上次管道運行。如果查詢的定義了一個流媒體直播表變化,新的數據處理基於新的查詢,但現有的數據重新計算。流媒體直播表總是使用流媒體來源,隻有在擴展流,如卡夫卡,運動,或者自動加載程序。流dlt是基於結構化流火花。
可以鏈多個流管道,例如,工作量非常大的數據量和低延遲的要求。
直接攝入流引擎
三角洲生活表寫在Python中可以直接攝取數據從一個事件總線使用結構化流火花就像卡夫卡。你可以設置一個短停留時間對卡夫卡的話題避免合規問題,降低成本和受益於廉價,三角洲所提供的彈性和可控製的存儲。
作為第一步,我們建議攝入數據是青銅(生)表,避免複雜的轉換,可以將重要數據。像任何三角洲表銅表將保留曆史和允許執行GDPR和其他執行任務。
當編寫DLT管道在Python中,您使用@dlt.table
創建一個DLT表注釋。沒有特殊的屬性標記流dlt在Python中;簡單地使用spark.readStream ()
訪問流。示例代碼創建一個DLT表的名稱kafka_bronze
消費數據從卡夫卡主題如下:
進口dlt從pyspark.sql.functions進口*從pyspark.sql.types進口*主題=“tracker-events”KAFKA_BROKER = spark.conf.get (“KAFKA_SERVER”)#在KAFKA_BROKER訂閱話題raw_kafka_events = (spark.readStream。格式(“卡夫卡”).option (“訂閱”、主題).option (“kafka.bootstrap.servers”KAFKA_BROKER).option (“startingOffsets”,“最早”).load ())@dlt.table (table_properties = {“pipelines.reset.allowed”:“假”})defkafka_bronze():返回raw_kafka_events
pipelines.reset.allowed
注意事件公共汽車通常過期消息一段時間後,而三角洲是專為無限的保留。
這可能導致在卡夫卡的影響源數據已經刪除DLT管道在運行一個完整的刷新。在這種情況下,並不是所有的曆史數據可以從消息傳遞平台、回填和數據會丟失DLT表。Beplay体育安卓版本為了防止刪除數據,使用以下DLT表屬性:
pipelines.reset.allowed = false
設置pipelines.reset.allowed
假防止刷新表但並不妨礙增量寫入表或新流入的數據表。
檢查點
如果你是一個有經驗的火花結構化流開發人員,你會發現沒有檢查點在上麵的代碼。在火花結構化流檢查點需要持續進步已成功處理信息數據失敗,這個元數據用於重新啟動失敗的查詢具體位置。
而對故障恢複檢查點是必要的火花結構化流僅一次保證,DLT自動處理狀態,而不需要任何手動配置或顯式檢查點。
混合SQL和Python DLT管道
DLT管道可以包含多個筆記本,但一個DLT筆記本需要完全用SQL編寫或Python(不像其他磚筆記本,你可以有細胞不同的語言在一個筆記本)。
現在,如果你的偏好是SQL,您可以從Apache代碼數據攝入卡夫卡在一個筆記本在Python中,然後實現數據的轉換邏輯管道在另一個筆記本在SQL。
模式映射
從消息傳遞平台讀取數據時,數據流是不透明的,必須提供一個模式。Beplay体育安卓版本
下麵的Python示例顯示了從健身追蹤事件的模式定義,以及價值的一部分卡夫卡消息映射這個模式。
event_schema = StructType ([\StructField (“時間”、TimestampType ()真正的),\StructField (“版本”、StringType ()真正的),\StructField (“模型”、StringType ()真正的),\StructField (“heart_bpm”、IntegerType ()真正的),\StructField (“千卡”、IntegerType ()真正的)\])
#臨時表,可見在管道而不是在數據瀏覽器,#不能查詢交互@dlt.table (評論=“真正的Kakfa負載模式”,臨時=真正的)
defkafka_silver():返回(#卡夫卡流(時間戳值)#價值包含了卡夫卡的有效載荷dlt.read_stream (“kafka_bronze”).select(坳(“時間戳”),from_json(坳(“價值”).cast (“字符串”),event_schema) .alias (“事件”)).select (“時間戳”,“。*”))
好處
閱讀流數據直接從message broker DLT最小化架構複雜性並提供較低的端到端延時自數據直接從消息傳遞代理和中介流的步驟。
流攝取與雲對象存儲媒介
用於一些特定的情況下,您可能希望將數據從Apache卡夫卡,例如,使用卡夫卡連接器和存儲流數據在雲對象中介。磚的工作區,雲通過磚可以映射到特定於供應商的對象存儲文件係統(DBFS)作為cloud-independent文件夾。一旦數據卸載,磚自動加載程序可以攝取的文件。
自動加載器可以攝取數據與一行的SQL代碼。語法來攝取的JSON文件到一個DLT表如下所示(它是包裹在兩行可讀性)。
——攝取自動加載程序創建或更換現場直播表生作為選擇*從cloud_files (“dbfs: /數據/ twitter”,“json”)
注意自動加載器本身就是一個流數據源和新來的所有文件將被處理一次,因此,原始表顯示數據流媒體關鍵字是攝取增量表。
自卸載流數據到雲對象存儲了一個額外的步驟在您的係統架構也會增加端到端延遲和創建額外的存儲成本。記住,卡夫卡連接器寫事件數據到雲對象存儲需要管理,增加操作的複雜性。
因此磚建議作為最佳實踐的直接訪問事件總線數據使用火花從DLT結構化流如上所述。
其他事件總線或消息傳遞係統
這篇文章是圍繞Apache卡夫卡;然而,討論的概念也適用於其他事件總線或消息傳遞係統。DLT支持任何數據源數據磚運行時直接支持。
亞馬遜運動
在運動中,你寫消息完全托管serverless流。卡夫卡一樣,動作不永久存儲消息。默認的消息保留在運動是一天。
當使用亞馬遜動作,替換格式(“卡夫卡”
)格式(“運動”
)在上麵的Python代碼流攝入和增加亞馬遜Kinesis-specific設置選項
()。關於更多信息,檢查部分運動的集成在火花結構化流文檔。
Azure的活動中心
Azure活動中心設置,檢查官員微軟文檔和這篇文章三角洲表食譜:生活消費從Azure事件中心。
總結
DLT不僅僅是ETL的“T”。DLT,您可以很容易地從流和批處理來源攝取,清理和轉換數據磚Lakehouse平台上任何雲保證數據質量。Beplay体育安卓版本
數據從Apache卡夫卡可攝入通過直接連接到卡夫卡從Python中的DLT筆記本代理。數據丟失是可以預防的一個完整的管道刷新即使卡夫卡的源數據流層過期了。
開始
如果你是一個磚客戶,隻是遵循導遊開始。閱讀發布說明了解更多關於什麼是包含在該通用版本。如果你沒有一個現有的磚的客戶,注冊一個免費試用,您可以查看我們的詳細的DLT定價在這裏。
加入的談話磚社區data-obsessed同行在哪裏聊天關於數據+人工智能峰會2022公告和更新。學習。網絡。
最後但並非最不重要,享受深入研究數據工程會話的峰會。在該會話,我通過另一個流數據的代碼例子微博直播,自動加載程序,三角洲住表的SQL,擁抱麵臨情緒分析。