利用火花結構化流擴展分析
2022年7月14日 在beplay体育app下载地址
這篇文章的米科學數據科學與工程團隊。
現代數據不會停止生長
“工程師教的生活經驗,做一些快速和做正確的事是互斥的!與結構化流從磚,M科學得到速度和準確度分析平台,而不需要每次都從頭開始重建我們的基礎設施。”Beplay体育安卓版本- Ben Tallman, CTO
假設你,大數據時代的“卑微的數據管道工”和創建一個任務是分析解決方案在線零售數據集:
發票 沒有 |
股票 代碼 |
描述 | 數量 | 發票 日期 |
單位 價格 |
客戶 ID |
國家 |
---|---|---|---|---|---|---|---|
536365年 | 85123一個 | 他叫白掛 | 6 | 2012-01-10 | 2.55 | 17850年 | 聯合王國 |
536365年 | 71053年 | 白色金屬燈 | 6 | 2012-01-10 | 3.39 | 17850年 | 聯合王國 |
536365年 | 84406 b | 奶油丘比特的心 | 8 | 2012-01-10 | 2.75 | 17850年 | 聯合王國 |
… | … | … | … | … | … | … | … |
分析你一直要求很簡單——一個聚合數量的美元,單位出售,和獨特的用戶每天,在每隻股票代碼。隻有幾行PySpark,我們可以將我們的原始數據轉換成有用的聚合:
進口pyspark.sql.functions作為Fdf=spark.table (“default.online_retail_data”)agg_df=(df#集團數據通過月、項目代碼和國家.groupBy (“InvoiceDate”,“StockCode”,)#返回彙總的美元,銷量,和獨特的用戶.agg (F。總和(“UnitPrice”).alias(“美元”),F。總和(“數量”).alias(“單位”),F.countDistinct (“CustomerID”).alias(“用戶”),))
(agg_df.write.format (“δ”).mode (“覆蓋”).saveAsTable (“analytics.online_retail_aggregations”))
一起和你的新聚合數據,你可以把一個很好的可視化做……業務的事情。
這工作,對吧?
ETL過程的一個靜態分析,你不期望的數據被更新,假設您已經將是唯一的數據數據你曾經擁有的。一個靜態分析的問題嗎?
你打算做什麼,當你得到更多的數據?
天真的回答是每天隻運行相同的代碼,但是你處理文檔的所有數據每次運行代碼,和每一個新的更新意味著你已經處理加工數據。當你的數據變得足夠大時,你會加倍的在時間和計算成本。
使用靜態分析,你把錢花在你已經處理加工數據。
很少有現代數據源,不會被更新。如果你想保持你的分析與數據來源和計算成本節省一大筆錢,你需要一個更好的解決方案。
我們做什麼當我們的數據增長?
在過去的幾年裏,“大數據”已經成為…缺乏。大量的數據增長和更多的生活已經在線,大數據的時代已經成為時代的“幫助我們,它隻是不會停止越來越大的數據。”A good data source doesn't stop growing while you work; this growth can make keeping data products up-to-date a monumental task.
在米科學,我們的使命是用典型的季度報告以外的替代數據-數據或股票趨勢數據來源,分析、提煉,並預測市場的變化和經濟。
每一天,我們的分析師和工程師麵臨一個挑戰:替代數據增長非常快。我甚至說,如果我們的數據停止增長,一些經濟已經非常非常錯誤的。
隨著數據的增長,我們的分析解決方案需要處理這一增長。我們不僅需要考慮經濟增長,但我們也需要考慮可能會在後期或無序的數據。這是我們的使命的一個重要部分——每一批新的數據可以批處理經濟信號一個戲劇性的變化。
分析產品,使可伸縮解決方案米科學分析師和客戶依賴每一天,我們使用磚結構的流,一個Apache火花™API具有可擴展性和容錯性流處理建立在火花的SQL引擎與磚Lakehouse的平台。Beplay体育安卓版本結構化流媒體向我們保證,隨著數據的增長,我們的解決方案也將規模。
使用火花結構化流
結構化流進場當新批次的數據被引入您的數據源。結構化流利用三角洲湖跟蹤數據變化的能力確定哪些數據是一個更新的一部分,重新計算的部分分析影響的新數據。
重新考慮你如何思考是很重要的流數據。對許多人來說,“流”是指實時數據流電影,查看Twitter,查看天氣,等等。如果你分析師、工程師或科學家,任何數據,更新是一個流。更新的頻率並不重要。秒,幾個小時,幾天,甚至幾個月——如果數據更新,數據流。如果數據流,那麼結構化流可以節省很多的麻煩。
使用結構化的流,您可以避免的成本加工之前的數據
讓我們回到我們的假設——你有一個聚合分析,今天你需要提供和保持更新新的數據卷。這一次,我們有DeliveryDate
列提醒我們我們之前的單發徒勞的分析:
發票 沒有 |
股票 代碼 |
描述 | 數量 | 發票 日期 |
交付 日期 |
單位 價格 |
客戶 ID |
國家 |
---|---|---|---|---|---|---|---|---|
536365年 | 85123一個 | 他叫白掛 | 6 | 2012-01-10 | 2012-01-17 | 2.55 | 17850年 | 聯合王國 |
536365年 | 71053年 | 白色金屬燈 | 6 | 2012-01-10 | 2012-01-15 | 3.39 | 17850年 | 聯合王國 |
536365年 | 84406 b | 奶油丘比特的心 | 8 | 2012-01-10 | 2012-01-16 | 2.75 | 17850年 | 聯合王國 |
… | … | … | … | … | … | … | … | … |
值得慶幸的是,結構化的界麵流非常類似於您的原始PySpark片段。這是你的原始靜態批量分析代碼:
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =# = = = = =老靜態批代碼= = = = =# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =進口pyspark.sql.functions作為Fdf = spark.table (“default.online_retail_data”)
agg_df = (df#組數據按日期和項目代碼.groupBy (“InvoiceDate”,“StockCode”,)#返回彙總美元,銷量,和獨特的用戶.agg (F。總和(“UnitPrice”).alias (“美元”),F。總和(“數量”).alias (“單位”),F.countDistinct (“CustomerID”).alias (“用戶”),))
(agg_df.write。格式(“δ”).mode (“覆蓋”).saveAsTable (“analytics.online_retail_aggregations”))
隻有一些調整,我們可以調整這個利用結構化流。把你以前的代碼,你會:
- 閱讀我們的輸入表流而不是一個靜態批數據
- 做一個目錄在您的文件係統檢查點將存儲
- 設置一個水印建立邊界多晚數據可以被忽略在分析前到達
- 修改你的一些轉換防止保存的檢查點狀態太大
- 寫下你的最終分析表作為流增量處理輸入數據
我們應用這些調整,貫穿每一個變化,給你幾個選擇如何配置你流的行為。
這是‚‚“stream-ified”
舊版本的代碼:
#=========================================#=====新結構化流代碼=====#=========================================+CHECKPOINT_DIRECTORY=“/δ/檢查點/ online_retail_analysis”+dbutils.fs.mkdirs (CHECKPOINT_DIRECTORY)+df=spark.readStream.table (“default.online_retail_data”)agg_df=(df+#水印數據與一個InvoiceDate的7天+.withWatermark (“InvoiceDate”, f“7天”)#集團數據通過日期&項目代碼.groupBy (“InvoiceDate”,“StockCode”,)#返回彙總的美元,銷量,和獨特的用戶.agg (F。總和(“UnitPrice”).alias(“美元”),F。總和(“數量”).alias(“單位”),+F.approx_count_distinct (“CustomerID”,0.05).alias(“用戶”),))
(+agg_df.writeStream.format(“δ”)+.outputMode(“更新”)+。觸發(一次=真正的)+CHECKPOINT_DIR .option (“checkpointLocation”)+.toTable (“analytics.online_retail_aggregations”))
讓我們重複每個調整我們的結構化流工作:
1。流從三角洲表
+ df = spark.readStream.table (“default.online_retail_data”)
δ所有表的漂亮的特性,這可能成為:你可以把它當做一個流。因為增量跟蹤更新,你可以使用.readStream.table ()
流新更新每次運行流程。
重要的是要注意,你的輸入表必須是一個三角洲表工作。有可能流其他數據格式有不同的方法,但是.readStream.table ()
需要三角洲的表
+#創建檢查點目錄+ CHECKPOINT_DIRECTORY =“/δ/檢查點/ online_retail_analysis”+ dbutils.fs.mkdirs (CHECKPOINT_DIRECTORY)
在結構化Streaming-jargon,在這個分析是一個聚合有狀態的轉換。不太遠的雜草、結構化流節省聚合的狀態作為一個檢查點每次更新分析。
這就是救了你在計算成本:而不是加工所有更新的數據每次都從頭開始,簡單的收拾,最後更新。
3所示。定義一個水印
+#水印數據的InvoiceDate 7天+ .withWatermark (“InvoiceDate”f“7天”)
當你得到新的數據,有一個好的機會,你可以接收數據無序。水印你的數據允許您定義一個截止追溯到總量如何被更新。從某種意義上說,它會創建一個“活”和“定居”數據之間的界限。
產品說明:假設這數據包含的7月數據。我們設置水印到7天。這意味著骨料從7日到1日還“活”。新的更新可以改變聚合物從1日到7日,但任何新的數據,落後超過7天不被包括在更新- - - - - -聚合前1日“定居”,和更新的時期將被忽略。
新數據以外的水印不納入分析。
重要的是要注意,列使用水印必須是一個時間戳或窗口。
4所示。使用結構化Streaming-compatible轉換
+ F.approx_count_distinct (“CustomerID”,0.05)
為了保持你的檢查站從膨脹狀態,你可能需要更換你的一些轉換更多storage-efficient替代品。列,可能包含很多獨特的個人價值,approx_count_distinct
函數將得到你的結果在一個定義的相對標準偏差。
5。創建輸出流
+ agg_df.writeStream。格式(“δ”)+ .outputMode (“更新”)= + .trigger(一次真正的)+ .option (“checkpointLocation”CHECKPOINT_DIR)+ .toTable (“analytics.online_retail_aggregations”)
最後一步是輸出分析三角洲表。這幾個選項,確定你流的行為:
.outputMode(“更新”)
配置流,聚合將上次每次運行的代碼,而不是從頭開始運行。從頭開始重新聚合,可以使用“完整的”
——實際上,做一批傳統聚合,同時仍然保留的聚合狀態的未來“更新”
運行。觸發(一旦= True)
將觸發查詢一次,當行輸出代碼開始,然後停止查詢,一旦所有的新數據被處理。“checkpointLocation”
讓程序知道檢查點應該存儲。
這些配置選項使流最接近像原來的一次性的解決方案。
這一起來創建一個可伸縮的解決日益增長的數據。如果新數據添加到你的來源,你會考慮新的數據分析沒有成本一隻手臂和一條腿。
你會很難找到任何上下文數據不會被更新。軟協議,數據分析師,工程師和科學家當我們工作與現代數據——它會增長,我們必須找到方法來處理這種增長。
火花結構化流,我們可以使用最新和最優數據提供最好的產品,沒有規模的頭痛。