創建Apache Spark™
三角洲湖更好
想看而不是讀?看看視頻在這裏.
數據湖的前景
Michael時常要:
今天我非常高興能在這裏談論如何通過使用Delta Lake使Apache Spark變得更好。然而,在我開始之前,我想先談談數據湖的概念,以及為什麼這麼多人對它感到興奮,以及為什麼當他們試圖設置這些東西時,會有很多挑戰。
那麼,首先,什麼是數據湖,它對我意味著什麼?所以,三角洲湖的承諾基本上是這樣的,組織有很多數據。它可能是OLTP係統中精心策劃的客戶數據。它可能是來自web服務器的原始點擊流,也可能是來自一堆傳感器的非結構化數據。
而數據湖的承諾就是你可以把所有的數據都倒進數據湖裏。當你將它與傳統數據庫相比時,這實際上是非常強大的。因為在傳統的數據庫中,你必須從提出一個模式開始,並做大量的清理工作。這通常稱為寫時模式。
而數據湖允許你做的是,它允許你放棄這個過程,隻是從收集所有東西開始。因為有時候直到很久以後你才知道數據為什麼有價值。如果你沒有儲存它,那麼你已經失去了它。數據湖就是文件係統中的一堆文件。它可以是S3或HDFS,或Azure Blob Storage。你可以把所有東西都放在那裏,然後再回來看。
我們的想法是,當你完成了,一旦你收集了所有的信息,你就可以從中得到真正的見解。你可以做數據科學和機器學習。您可以為您的業務構建強大的工具,如推薦引擎或欺詐檢測算法。你甚至可以做一些瘋狂的事情,比如用基因組學和DNA測序來治愈癌症。
然而,這個故事我已經看過很多很多次了。通常,不幸的是,一開始的數據都是垃圾。因此,存儲在數據湖中的數據是垃圾。結果,你從這些更高級的過程中得到垃圾,你試圖在最後做。
數據湖的挑戰
為什麼會這樣呢?為什麼從這些數據湖中獲得質量和可靠性如此困難?這個典型的項目是什麼樣的?我想帶你們看一個故事,我在許多組織中看到過一遍又一遍地發生,當他們坐下來試圖從他們的數據中提取見解時。它通常是這樣的。這被認為是當今的前沿技術,但在三角洲湖之前。
一個非常常見的模式是,你有一個事件流進入一些係統,比如Apache Kafka。你的任務是做兩件事,你需要做流分析,這樣你就可以知道你的業務中實時發生了什麼。你也想做人工智能和報告,你可以看更長的一段時間,做縱向分析,實際上是看曆史和趨勢,對未來做出預測。
那麼,我們要怎麼做呢?第一步,我坐在電腦前。我知道Spark有很好的api可以從Apache Kafka讀取數據。你可以使用數據幀,數據集,SQL和Spark SQL來處理和做聚合和時間窗口以及所有與你的流分析相關的事情。
所以,我們從那開始,很快,它運行得很好。但這給我們帶來了第一個挑戰,那就是曆史查詢。Kafka非常適合進行實時分析,但它隻能存儲一天或一周的數據。你不希望在Kafka中存儲年複一年的數據。所以,我們也可以解決這個問題。實時技術對於當前正在發生的事情很有幫助,但是對於尋找曆史趨勢就不太好了。
我最近看了很多博客文章。這裏發生的一個很常見的模式是有一種叫做lambda架構的東西,據我所知,基本上就是每件事都做兩次。你有一個實時的東西,它在做一個近似值,告訴你在這個時刻到底發生了什麼。你還有另一個管道,它可能更有策劃,運行得更慢一些。但它會將所有數據存檔到你的數據湖中。這是第一步。所以,如果我們想要解決這個曆史查詢問題,我們還要在Apache Spark之上建立lambda架構。
然後,一旦我在數據湖中獲得了所有數據,我的想法是,現在我也可以在上麵運行Spark SQL查詢。我可以做人工智能和報道。這需要一些額外的工作和額外的協調。但幸運的是,Spark為批處理和流處理提供了統一的API。所以,這是可能的,你可以建立。
但這就帶來了第二個挑戰。就像我之前說的,現實世界中的數據通常是混亂的。上遊的一些團隊在沒有通知您的情況下更改了模式,現在您遇到了問題。因此,我在這裏看到的一個模式是你需要添加驗證。因此,您需要編寫額外的Spark SQL程序,以確保您對數據的假設是正確的。如果他們錯了,它會發送一封電子郵件讓你改正。
當然,因為我們已經完成了lambda架構,我們必須在兩個不同的地方進行驗證。但是,這是我們可以做到的。我們可以用火花來做。所以,現在我們建立驗證,他們處理混亂的數據。不幸的是,我們要麵對第三個挑戰,那就是錯誤和失敗。
這些驗證很好,但有時你會忘記把它們放在適當的位置,或者你的代碼中有一個錯誤,或者更困難的是,如果你的代碼在中間崩潰,因為你在EZ2上運行,你的spot實例死亡。現在你要擔心的是如何清理。使用這些類型的分布式係統和分布式文件係統的真正問題是,如果一個作業在中間崩潰,這意味著需要清理的垃圾結果。所以,你必須自己做所有關於正確性的推理。這個係統並沒有給你太多幫助。
一個很常見的模式是人們,不是一次處理整個表,因為如果出了問題,我們需要重新計算整個表,他們會把它分成幾個分區。你有一個不同的文件夾。每個文件夾存儲一天、一個小時、一個星期,任何粒度都適合您的用例。你可以圍繞它編寫很多腳本,這樣我就可以很容易地進行讀計算。
因此,如果其中一個分區由於任何原因損壞了,無論是我的代碼中的錯誤還是作業失敗,我都可以刪除整個目錄並從頭重新處理該分區的數據。因此,通過構建這個分區和再處理引擎,現在我可以處理這些錯誤和失敗。有一點額外的代碼要寫,但現在我可以安心睡覺,知道這將工作。
這就引出了第四個挑戰。更新。做點更新是非常困難的。添加數據非常容易,但在數據湖中更改數據並正確地進行更改卻非常困難。出於GDPR的原因,你可能需要這樣做。你可能需要留住玩家。你可能需要做匿名化處理,或者其他事情,或者你可能隻是在數據中有錯誤。因此,現在你必須編寫一個完整的另一類Spark作業來執行更新和合並,因為這可能非常困難。
通常情況下,因為這太難了,我看到人們做的不是單獨更新,這非常便宜,他們實際上隻是在任何時候他們需要做一些事情,每當他們每月得到一組dsr時,他們會複製整個表,刪除任何由於GDPR而被要求遺忘的人。他們可以這樣做,但這是另一個需要運行的Spark作業,成本非常高。
這裏有一個微妙的地方,讓它變得格外困難,那就是,如果你修改了一個表格,而別人正在閱讀它生成一個報告,他們會看到報告中不一致的結果是錯誤的。因此,您需要非常小心地安排這個,以避免在執行這些修改時發生任何衝突。但這些都是人們可以解決的問題。你晚上做這個,白天做報告什麼的。現在我們有了一個進行更新的機製。
然而,這裏的問題是這變得非常複雜。這意味著你在解決係統問題上浪費了大量的時間和金錢,而不是做你真正想做的事情,也就是從數據中提取價值。我的看法是,這些都是數據湖的幹擾,阻止你真正完成手頭的工作。
總結一下我的想法,這裏最大的一個就是沒有原子性。當你運行一個分布式計算時,如果作業在中間失敗了,你仍然有一些部分的結果。不是全有或全無。因此,原子性意味著當一個作業運行時,它要麼完全正確地完成,要麼如果出現任何錯誤,它完全回滾,什麼都不會發生。因此,您不必再讓數據處於損壞狀態,而需要費力地構建這些工具來進行手動恢複。
另一個關鍵問題是沒有質量執行。在每一項工作中,手動檢查傳入數據的質量取決於您。再說一次,這都是你的假設。係統無法像傳統數據庫中的不變量那樣提供幫助,您可以說:“不,這一列是必需的,或者這必須是這種類型的模式。”所有這些事情都由你作為程序員來處理。
最後,沒有對一致性或隔離性的控製。這意味著您一次隻能對任何數據湖表執行一個正確的操作。這使得混合流處理和批處理變得非常困難,也使得人們在讀取數據時無法進行操作。這些都是你對數據存儲係統的期望。您希望能夠做到這些事情,並且人們應該總是能夠自動地看到一致的快照。
通過Delta Lake解決這些挑戰
那麼,現在讓我們後退一步,看看Delta Lake的這個過程是怎樣的。Delta Lake的想法是我們采用這種相對複雜的架構,其中很多正確性和其他事情都是由你手動編寫Spark程序來完成的,我們把它改變成這樣,你隻考慮數據流,你從你的組織中引入所有數據並不斷提高質量,直到它可以使用為止。
這個架構的特點是,Delta Lake為Apache Spark帶來了完整的ACID事務。這意味著現在運行的每個Spark作業要麼完成整個作業,要麼什麼都不完成。同時讀寫的人可以保證看到一致的快照。當一些東西被寫出來的時候,它肯定是被寫出來的,它不會丟失。這些都是ACID的特征。這讓你可以專注於實際的數據流而不是考慮所有這些額外的係統問題,一遍又一遍地解決已知的問題。
Delta Lake的另一個關鍵方麵是它基於開放標準,而且是開源的。所以,這是一個完整的Apache許可證。沒有愚蠢的通用條款或類似的東西。你可以完全免費地將它用於任何你想要的應用程序。就我個人而言,如果我要存儲pb級的數據,這對我來說真的很重要。數據有很大的引力。當你收集大量數據時,會有很多慣性。我想把它放到一個黑盒子裏這樣我就很難把它取出來。
這意味著您可以存儲大量數據而不用擔心鎖定問題。所以,它既是開源的,也是基於開放標準的。我會在後麵的演講中更詳細地討論這個問題。但實際上,Delta是將數據存儲在拚花中。所以,你可以用其他引擎來閱讀它。三角洲湖周圍有一個不斷壯大的社區,在那裏建立了這種原生支持。
但是,最壞的情況是,如果你決定離開Delta Lake,你所需要做的就是刪除事務日誌,它就變成了一個普通的拚花桌。最後,Delta Lake由Apache Spark深度驅動。因此,這意味著,如果你有現有的Spark作業,無論它們是流處理還是批處理,你都可以輕鬆地將它們轉換為獲得Delta的各種好處,而不必從頭重寫這些程序。我會在後麵的演講中詳細講一下。
Delta Lake建築:青銅、銀和金桌子
但是現在,我想把這張照片簡化一下,來談談我看到的Delta Lake建築的其他一些特點,以及我看到的人們非常成功的地方。
首先,我想談談數據質量等級的概念。這些都不是三角洲湖的基本特征。我認為這些是人們使用各種係統的東西。但我看到有人非常成功地將這種模式與Delta的特性結合在一起。這些隻是數據質量的一般分類。這裏的想法是,當你把數據帶進Delta Lake時,不要試圖一下子讓它變得完美,你要逐步提高數據的質量,直到它為消費做好準備。我會講一下為什麼我認為這實際上是一個非常強大的模式,可以幫助你更有效率。
所以,從最開始就是你的青銅等級數據。這是原始數據的傾倒場。它還在燃燒。實際上我認為這是一件好事。因為這裏的核心思想是,如果您捕獲了所有內容,而沒有對其進行大量的修改或解析,那麼在解析和修改代碼中就不可能有錯誤。你從一開始就保留了一切。你通常可以在這裏保留一年的用戶留存。我會講一下為什麼我認為這很重要。
但這意味著你可以收集所有東西。你不需要提前花大量的時間來決定哪些數據有價值,哪些數據沒有價值。你可以在分析的過程中算出來。
從青銅開始,我們轉向銀的水平數據。這是尚未準備好供消費的數據。這不是一份你要交給CEO的報告。但我已經清理過了。我過濾掉了一個特定的事件類型。我正在解析JSON並給它一個更好的模式。或者我加入了不同的數據集。他們把我想要的所有信息都集中在一個地方。
而且,您可能會問,如果這個數據還沒有準備好使用,為什麼我要花時間創建一個表來實現它。實際上有幾個不同的原因。一個是,這些中間結果通常對組織中的很多人都有用。因此,通過創建這些銀色級別的表,你在其中吸收你的領域知識並清理數據,你允許他們自動從中受益而不必自己做這些工作。
但這裏更有趣和更微妙的一點是,它也可以真正幫助調試。當我的最終報告中出現錯誤時,能夠查詢這些中間結果是非常強大的,因為我實際上可以看到哪些數據產生了那些糟糕的結果,並看到管道中的哪些地方是有意義的。這是在管道中有多個跳的一個很好的理由。
最後,我們轉向黃金級數據。這是幹淨的數據。已經可以消費了。正是這些業務級別的聚合,實際上討論了事情是如何運行的,以及事情是如何工作的,這幾乎為報告做好了準備。
在這裏,你開始使用各種不同的引擎。就像我說的,Delta Lake已經和Spark合作得很好了。同時,也有很多人對增加對Presto等人的支持感興趣。所以,你可以做你的流分析和人工智能,並報告它。
三角洲湖的流式和批量加工
現在我想談談人們是如何通過三角洲湖,通過這些不同的質量等級來移動數據的。我反複看到的一個模式是——流媒體實際上是一個非常強大的概念。在我深入研究流媒體之前,我想糾正一些我經常聽到的誤解。
所以,當人們聽到流媒體時,他們通常會想到一件事,他們認為它必須非常快。它必須非常複雜,因為你想要它非常快。Spark實際上是支持這種模式的,如果你有這樣的應用的話。有一個持續的處理,你不斷地從服務器獲取新數據,保持支持毫秒延遲的核心。
但這實際上並不是流媒體有意義的唯一應用。流媒體對我來說實際上是增量計算。它是關於一個我想在新數據到達時持續運行的查詢。所以,與其把它看作是一堆離散的工作,並把這些離散的工作的所有管理都放在我的工作流程引擎上,流媒體把這些都帶走了。
您編寫一次查詢,您說“我想從青銅表中讀取數據,我想執行這些操作。我要在銀桌子上寫字"然後你就連續地運行它。您不必考慮哪些日期是新的,哪些數據已經被處理,如何處理這些數據並以事務性的方式提交到下遊,如何檢查我的狀態,以便在作業崩潰並重新啟動時,我不會失去在流中的位置。結構化流為您解決了所有這些問題。
因此,我認為它實際上可以簡化你的數據架構,而不是變得更複雜。Apache Spark中的流實際上有一個很好的成本延遲權衡,你可以調優。因此,在遠端,您可以使用連續處理模式。你可以保留這些內核來持續流,你可以得到毫秒級的延遲。
在中間區域,您可以使用微批處理。微批處理的好處是現在你可以在集群上有很多流,它們的時間可以多路複用這些核心。所以,你運行一個非常快速的工作,然後你放棄這個核心,然後其他人進來運行它。有了這個,你可以得到幾秒到幾分鍾的延遲。這對許多人來說是一個最佳點,因為很難判斷某件事在最後一分鍾內是否最新,但你確實關心它在最後一小時內是否最新。
最後,在結構化流中還有一種叫做觸發一次的模式。因此,如果您的工作中數據每天、每周或每月隻到達一次,那麼一直啟動並運行該集群就沒有任何意義,特別是如果您在雲中運行,您可以放棄它並停止為它付費。
結構化流實際上也有這個用例的特性。它叫做trigger-once,基本上不是連續運行作業,隻要有新數據到達,你啟動它,你說trigger once,它讀取任何新數據到達,處理它,提交一個下遊事務,然後關閉。因此,這可以為您帶來流的好處,簡化協調,而沒有任何傳統上與始終運行的集群相關的成本。
當然,溪流並不是通過三角洲湖傳輸數據的唯一方式。批處理作業也非常重要。就像我之前提到的,你可能有GDPR,或者你需要做這些更正。您可能已經更改了來自其他係統的數據捕獲,其中有一組更新來自您的操作存儲區。你隻需要在你的三角洲湖中反映出來。為此,我們有UPSERTS。當然,我們也支持標準的插入和刪除以及那些命令。
因此,Delta Lake的真正優點是它支持這兩種模式,你可以使用正確的工具來做正確的工作。您可以無縫地混合流處理和批處理,而無需擔心正確性或協調問題。
我想講的最後一個模式是重新計算的概念。所以,當你有了這個早期的表格保存了你所有的原始結果當你有了很長時間的保留,也就是幾年的原始數據。
當你在Delta Lake數據圖的不同節點之間使用流時,你很容易進行重新計算。你可能想要重新計算因為你的代碼中有一個錯誤或者你可能想要重新計算因為你決定要提取一些新的東西。這裏真正的好處是,由於流媒體的工作方式,這非常簡單。
因此,為了讓您了解結構化流在Apache Spark中是如何工作的,我們基本上有這樣一個模型,即流查詢應該總是在相同數量的數據上返回相同的批查詢結果。這意味著,當你對Delta表啟動一個新流時,它會在流啟動的那一刻對那個表進行快照。
你做這個回填操作,在這個快照中處理所有的數據,把它分解成漂亮的小塊,沿途檢查你的狀態,向下遊提交。當您到達快照的末尾時,我們切換到跟蹤事務日誌,隻處理自查詢開始以來到達的新數據。這意味著您得到的結果與您在最後運行查詢相同,但是比從頭開始一遍又一遍地運行查詢工作量要少得多。
因此,如果您想在這個模型下進行重新計算,您所需要做的就是清除下遊表,創建一個新的檢查點,然後重新開始。它會自動地從時間的開始處理,並趕上我們今天的位置。這實際上是一個非常強大的模式用於糾正錯誤和做其他事情。
Delta Lake客戶用例
所以,現在我們已經討論了更高的層次,我想談談Delta Lake在降低成本和在這些Delta Lakes之上使用Apache Spark管理方麵發揮作用的一些具體用例。
德爾塔湖,我想介紹一下它的曆史。三角洲湖實際上已經有幾年曆史了。在過去的兩年裏,我們在Databricks內部把它作為一個專有的解決方案。我們的一些大客戶正在使用它。beplay体育app下载地址所以,我要特別講講康卡斯特,還有拳頭遊戲,Jam City和英偉達,這些你們都知道的大公司。他們已經用了很多年了。
大約兩個月前,在Spark峰會上,我們決定開源它,這樣每個人,甚至是在prem或其他地方運行的人都可以使用Delta Lake的電力。
康卡斯特公司
我想講一個我認為很酷的特殊用例。這是康卡斯特。所以,他們的問題是他們在世界各地都有機頂盒。為了了解人們如何與他們的編程交互,他們需要對這些信息進行會話。所以,你看這個電視節目,你換了頻道,你到這裏,你回到另一個電視節目。有了這個,他們可以通過了解人們如何消費來創造更好的內容。
你可以想象,康卡斯特有很多訂戶。這裏有pb級的數據。在Delta Lake之前,他們在Apache Spark上運行這個。問題是執行這個會話化的Spark作業太大了以至於Spark調度器會翻倒。
所以,不是運行一個作業,他們實際上要做的是,他們要把這個作業,按用戶ID劃分。取用戶ID,進行哈希。他們把它修正了10倍。所以,他們把它分成10個不同的工作。然後他們獨立地運行每一項工作。
這意味著在協調方麵有10倍的開銷。你需要確保這些都在運行。你需要為所有這些實例付費。你需要處理失敗和10倍於此的工作。這很複雜。把這個轉換到Delta的一個很酷的故事是他們可以把很多手工過程轉換到流媒體上。他們能夠極大地降低他們的成本,通過把這個工作降低到運行1/10的硬件。現在計算同樣的東西,但是開銷和成本減少了10倍。
因此,Delta的可伸縮元數據可以給Apache Spark帶來非常強大的功能。我將在後麵的演講中詳細講解這一切是如何運作的。但在深入討論之前,我想向您展示,如果您已經在Delta Lake中使用Apache Spark,那麼入門是多麼容易。
所以,開始是微不足道的。所以,你把它發布在Spark包上。在Spark集群上安裝Delta Lake所需要做的就是使用Spark包。如果你在使用pySpark,你可以做破折號包。如果你用的是星火殼,也是一樣的。如果您正在構建Java或Scala jar,並且希望依賴於Delta,那麼您所需要做的就是添加一個Maven依賴項。
更改代碼也同樣簡單。如果你在spark SQL中使用數據幀讀取器和寫入器,你所需要做的就是將數據源從parquet,或JSON或CSV或任何現在使用的數據源更改為Delta。其他的都是一樣的。唯一不同的是,現在一切都將是可擴展和事務性的,正如我們之前看到的那樣,這是非常強大的。
到目前為止,我講的大部分都是關於正確性的係統問題。如果我的工作崩潰了,我不希望它破壞這個表。如果兩個人同時向表寫入數據,我希望他們都能看到一致的快照。但數據質量實際上不止於此。您可以編寫正確運行的代碼,但代碼中可能存在錯誤並得到錯誤的答案。
這就是為什麼我們要擴展數據質量的概念,以允許您以聲明的方式談論質量約束。這是下個季度左右要做的工作。但這裏的想法是,我們允許你在一個地方,指定你的三角洲湖的布局和限製。首先,我們可以看到一些重要的東西,比如數據在哪裏。存儲後,您可以選擇打開嚴格的模式檢查。
三角洲湖有兩種不同的模式。我經常看到人們在他們的數據質量之旅中使用這兩種方法。在前麵的表中,您將使用模式和打印,其中可能隻是讀取了一堆JSON,並將其完全放入Delta Lake中。我們有很好的工具,可以自動執行安全的模式遷移。
因此,如果您正在向Delta Lake寫入數據,您可以打開合並模式標誌,它會自動將數據中出現的新列添加到表中,這樣您就可以捕獲所有內容,而無需花費大量時間寫入DDL。
當然,我們也支持標準的嚴格模式檢查,也就是說,用模式創建一個表,拒絕任何不符合該模式的數據。您可以使用alter表來更改表的模式。通常,我看到這種應用在未來的道路上,在黃金級別的表格中,你真的想要嚴格執行其中的內容。
最後,你可以在Hive metastore中注冊表,這個支持很快就會到來。同時也有人類可讀的描述,所以來到這張表的人可以看到這些數據來自這個來源,並以這種方式被解析。它屬於這支球隊。你可以用這些額外的人類信息來理解哪些數據會給你想要的答案。
最後,我最感興趣的特性是期望的概念。期望允許您將數據質量的概念實際編碼到係統中。你可以說,比如,這裏,我說我希望這個表有一個有效的時間戳。我可以說這對我和我的組織來說意味著什麼。
我希望時間戳在那裏。我希望它發生在2012年之後,因為我的組織成立於2012年。因此,如果你看到1970年的數據,由於嚴重的人為錯誤,我們知道這是不正確的,我們想要拒絕它。這和你們熟悉傳統數據庫的人很相似。這聽起來很像一個變體,你可以在表上取非null或其他東西。
但這裏有一個細微的區別。不變量的思想是關於表格的。如果違反了其中一個不變量,事務將被終止並自動失敗。
我認為大數據的問題是,為什麼隻有不變量是不夠的,如果你停止處理,每次你看到一些意想不到的東西,特別是在那些早期的青銅表中,你永遠不會處理任何東西。這真的會傷害你的敏捷性。
所以,關於期望很酷的一點是我們實際上有一個可調嚴重性的概念。因此,我們確實支持這個失敗停止,你可能想在你的財務部門正在使用的表上使用它,因為你不希望他們看到任何不正確的東西。但我們也有一些較弱的東西,在這些東西中,您隻能監控有多少記錄是有效的,有多少記錄在某個閾值處未能解析和警告。
或者更強大的是,我們有數據隔離的概念,你可以說,任何不符合我期望的記錄都不會填充管道,但也不會讓它通過。把它隔離在這裏和另一張桌子上,這樣我以後就可以來看看,然後決定我需要做什麼來補救這種情況。因此,這允許您繼續處理,但不會使用無效記錄破壞下遊結果。
所以,就像我說的,這是我們現在正在積極開發的功能。請繼續關注GitHub,了解更多關於它的工作。我認為這從根本上改變了您對Apache Spark和Delta Lake數據質量的看法。
現在他們已經了解了Delta是什麼,為什麼還要關心它呢?我想深入了解Delta是如何工作的。因為我們可以將這些完整的ACID事務引入到像Apache Spark這樣的分布式係統中,並且仍然保持良好的性能,這聽起來簡直太好了。
首先,讓我們看看Delta表實際存儲在磁盤上時是什麼樣的。所以,對於那些已經有三角洲湖的人來說,這看起來應該很熟悉。它隻是存儲在文件係統S3、HDFS、Azure Blob Storage、ABLS中的一個目錄。它隻是一個目錄,裏麵有一堆拚花文件。
還有一點非常重要。那就是我們也存儲這個事務日誌。在事務日誌內部,有不同的表版本。我馬上會講一下這些表格版本。但是我們仍然將數據存儲在分區目錄中。然而,這實際上主要用於調試。它們也是Delta模式,我們可以以最優的方式直接使用存儲係統。
例如,在S3中,他們建議您定期寫入大量數據,而不是創建日期分區,這會創建時間局部性熱點。相反,您可以隨機散列分區,由於Delta元數據的強大功能,我們也可以這樣做。最後是標準數據文件,這是普通的編碼拚字文件,可以被任何係統讀取。
那麼,這些表格版本裏到底有什麼呢?我們如何推斷表的當前狀態是什麼?每個版本的表都有一組操作應用於表並以某種方式改變它。此時表的當前狀態是所有這些動作之和的結果。那麼,我說的是哪種行為呢?
舉個例子,我們可以改變元數據。我們可以說這是表的名稱,這是表的模式,你可以向表中添加列或者其他什麼,你可以設置表的分區。因此,您可以采取的操作是更改元數據。
其他操作是添加文件和刪除文件。我們寫出一個拚花文件。然後,為了使它在表中可見,它還需要添加到事務日誌中。我一會兒會講到為什麼這種額外的間接層次是一個非常強大的技巧。這裏的另一個細節是,當我們將文件添加到Delta時,我們可以保留很多關於它們的可選統計信息。
在某些版本中,我們實際上可以為每一列保留最小值和最大值,我們可以使用它們來執行數據跳過或快速計算表上的聚合值。最後,你還可以通過刪除文件從表中刪除數據。這又是一個惰性操作。這種間接的層次真的很強大。當我們從表中刪除一個文件時,我們不一定會立即刪除該數據,這讓我們可以做其他很酷的事情,比如時間旅行。
因此,獲取所有這些東西的結果是你最終得到當前的元數據,一個文件列表,然後還有一些細節,比如已經提交的事務列表和我們所在的協議版本。
那麼,這是如何讓ACID真正獲得事務性數據庫的這些良好屬性的呢?這裏有一個細節,當我們創建這些表版本時,我們將它們存儲為一個有序的原子單元,稱為提交。我之前講過這個。我們通過創建這個文件0.json創建表的0版本。這裏的思想是,當Delta在文件係統中構造那個文件時,我們將使用底層原子原語。
在S3上,為了保證原子性,您所需要做的就是上傳到係統。他們這樣做的方式是你開始上傳,說我希望上傳這麼多字節。除非您實際上成功上傳了那麼多字節,否則S3不會接受該權限。所以,你保證在7月得到整個文件或一個文件都沒有。
另一個係統是Azure或HDFS。我們要做的是創建一個包含全部內容的臨時文件。然後我們會做一個原子重命名,這樣整個文件就被創建了。然後你可以有連續的版本。在版本1中,我們添加了這兩個文件,不好意思,在版本0中,我們添加了這兩個文件。在版本一中,我們去掉了它們,放入了第三個。例如,您可以在這裏進行壓縮,您可以自動地將這兩個文件壓縮成一個更大的文件。
現在,這裏的另一個重要細節是,我們希望為每個提交添加原子性,但我們也希望可序列化性。我們希望每個人都同意穩定的變化順序,這樣我們就可以正確地做一些事情,比如合並到變化數據捕獲和其他需要這個屬性的事情。
所以,為了在有多個作者的情況下達成一致,我們需要這個叫做互斥的性質。如果兩個人嚐試創建同一個版本的Delta表,那麼隻有一個人能夠成功。為了更清楚一點,用戶1可以寫表的0版本。用戶2可以寫版本1。但是如果他們都嚐試編寫版本2,那麼其中一個可以成功,但另一個必須得到一個錯誤消息,說“對不起,您的事務沒有通過”。
現在你可能會說,“等一下。但如果兩個人同時做一件事,就會失敗。聽起來我浪費了很多時間和很多工作。對我來說,這聽起來很複雜。”不幸的是,在這裏我們使用了第三個很酷的技巧,稱為樂觀並發。樂觀並發的思想是當你在表上執行一個操作時,你隻是樂觀地假設它將會工作。如果你有衝突,你就會看看這個衝突對你來說是否重要。如果沒有,你可以樂觀地再試一次。
在大多數情況下,實際上這些事務是不重疊的你可以自動進行補救。這裏給你一個具體的例子,假設我們有兩個用戶,這兩個用戶都流進同一個表。
因此,當它們都開始流時,它們會從讀取當時的表版本開始。它們都在版本0中讀取。他們讀入表格的大綱。它們確保附加的數據有正確的格式。然後他們會寫出一些數據文件來記錄這個批處理中要記錄的流的內容。他們記錄了從桌子上讀到的和寫的東西。
現在,他們都在努力做出承諾。在這種情況下,用戶1贏了,用戶2輸了。但用戶2要做的是檢查是否有任何變化。因為他們讀到的關於表的唯一內容是模式,而模式沒有改變,他們被允許自動再次嚐試。作為開發者,這些都是隱藏的。這一切都在被窩裏自動發生。所以,他們都會試著做出承諾,而且都能成功。
現在,我們的最後一個技巧是表可以有大量的元數據。那些嚐試過將數百萬個分區放入Hive metastore的人可能對這個問題很熟悉。一旦您獲得了這些數據大小,元數據本身實際上可能是導致係統崩潰的東西。
因此,我們有一個技巧,實際上我們已經有了一個分布式處理係統能夠處理大量的數據,我們隻使用Spark。因此,我們獲取事務日誌及其操作集,我們用Spark讀取它,我們實際上可以將它編碼為parquet中的檢查點。檢查點基本上是某個版本的表的整個狀態。
因此,在讀取事務日誌時,不必讀取整個事務日誌,隻需從檢查點開始,然後再從檢查點開始進行後續更改。然後這個本身就可以用Spark處理。
因此,當你麵對一個有數百萬個文件的龐大表時,你會問,昨天添加了多少記錄,我們要做的是運行兩個不同的Spark作業。第一個組是元數據,告訴我們哪些文件與昨天有關。它會返回文件列表然後你會運行另一個Spark作業來處理它們並進行計數。
通過分兩個階段進行,我們可以大大減少需要處理的數據量。我們將隻查看與查詢相關的文件。我們會用Spark來過濾。
所以,在我們開始提問之前,我想先談談路線圖。就像我說的,雖然這個項目已經有幾年了,但它最近才開源。今年剩下的時間裏,我們有一個非常令人興奮的路線圖。基本上,我們的目標是讓開源的Delta Lake項目與Databricks內部可用的API完全兼容。因此,本季度剩餘時間的路線圖基本上是開源我們擁有的許多很酷的功能。
因此,我們實際上在幾周前發布了0.2.0版本,它增加了對從S3讀取以及從Azure Blob Store和Azure Delta Lake讀取的支持。然後這個月,我們計劃發布一個0.3.0版本,它將添加用於更新、刪除、合並和真空的Scala api。Python api也將緊隨其後。在這個季度剩下的時間裏,我們有一些計劃。我們希望添加完全的DDL支持。這就是創建表和修改表。
我們還想讓你能夠在Hive metastore中存儲Delta表,我認為這對於不同組織中的數據發現非常重要。我們想從更新、刪除和合並之前獲取這些DML命令,並將它們實際掛鉤到Spark SQL解析器中。因此,您也可以使用標準SQL來執行這些操作。
接下來,讓我們知道你想要什麼。所以,如果你有興趣做更多,我建議你去看看我們的網站Delta.io。它對項目有一個高層次的概述。這裏有一個快速入門指南,告訴你如何開始。它也有到GitHub的鏈接,在那裏你可以看到我們的進展,看到我們的路線圖,並就你認為項目應該走向的方向提交你自己的問題。
所以,我強烈建議你們這樣做。說到這裏,我想我們將進入提問環節。讓我把它們拉出來,看看我們得到了什麼。
常見問題
Michael時常要:
Delta Lake是否增加了性能開銷?
我想分解一下。所以,首先,三角洲湖被設計成一個高通量係統。所以,每一個單獨的操作,都有一些開銷。因此,你基本上因為不僅僅是寫入文件,我們需要寫入文件,同時也要寫入事務日誌。因此,這將為您的Spark工作增加幾秒鍾的時間。
現在,重要的是我們把Delta設計成大規模並行和非常高的吞吐量。因此,您的Spark工作將增加幾秒鍾的時間。但這主要與Spark作業的大小無關。Delta Lake最擅長的是吸收數以萬億計的數據記錄或千兆字節的數據。數據不擅長的是插入個人記錄。如果每個Spark作業運行一條記錄,將會有很多開銷。
因此,這裏的技巧是您希望在Spark最有意義的地方使用Delta,這些地方是分布在許多機器上的相對較大的作業。在這些情況下,開銷可以忽略不計。
既然它具有ACID屬性,那麼我的係統是否也具有高可用性呢?
所以,Delta,再次強調,它是專門設計來利用雲的優勢利用這些好的特性。
所以,對我來說,雲有一些很好的特性。一是雲是非常可擴展的。您可以將大量數據放入S3中,它可以任意處理這些數據。它通常是高度可用的。因此,無論您身在何處,都可以從S3讀取數據。如果你真的很在乎,甚至還有像複製這樣的東西,你可以把數據複製到多個區域。Delta很好地解決了這個問題。所以,從Delta表中讀取應該是高度可用的,因為它實際上就是底層存儲係統的可用性。
現在,熟悉CAP定理的人可能會說,等一下,對於權利,當我們考慮一致性,可用性和分區容忍時,Delta選擇了一致性。如果你無法與中心協調器對話,這取決於你是否在S3上,那可能是你自己運行的服務,在Azure上,他們采取了一致性方法,我們在那裏使用原子操作,係統會暫停。
但這裏的好處是,由於采用了樂觀並發機製,這並不一定意味著您會丟失已經運行了幾個小時的整個作業。這隻是意味著你必須等待,直到你能夠與該服務交談。所以,我想說的是,就閱讀的高度可用性而言,就權利而言,我們選擇了一致性,但總的來說,這實際上還是很有效的。
下一件事是你要保存所有級別的數據。好吧,我想澄清一下銅,銀,金背後的概念。不是每個人都保存原始數據,不是每個人都保存所有數據。你可能有保留要求,比如說,你隻允許保留兩年的數據。
所以,真的,我認為應該由你來決定哪些數據是有意義的。我想說的唯一一件事是,我認為Delta Lakes的好處,以及Delta如何應用於它們,是你有權保留原始數據,盡可能多地保留。
因此,沒有技術限製可以讓你保留所有的數據。因此,我工作過的許多組織實際上保留了法律允許他們保留很長一段時間的所有東西,隻有在他們不得不處理的時候,你才會刪除它們。
你用什麼來寫這個邏輯?我們能用Scala寫邏輯嗎?
Delta Lake可以插入Apache Spark的所有現有API,這意味著您可以使用其中任何一個API。所以,如果你是一個Scala程序員,你可以使用Scala。如果你是一個Java程序員,這也可以。我們在Python中也有綁定。如果你是一名分析師,你根本不想編程,我們也支持純SQL。
我們的想法是底層引擎是用Scala寫的,Delta也是用Scala寫的。但是你的邏輯可以用任何你喜歡的語言來寫。這是另一種情況,我認為你需要合適的工具來做合適的工作。
就我個人而言,我用Scala做了很多事情,但當我需要製作圖形時,我切換到Python。但是,Delta仍然給了我過濾大量數據的能力,將其縮小到適合熊貓的東西,然後我用它做一些圖形。
Presto是Delta Lake的一部分還是Spark的一部分?
這實際上是現在發展得非常快的東西。這個問題有幾個不同的答案。那麼,我來告訴你們兩個我們現在在哪裏,要去哪裏。
現在,在Databricks中有一個我們正在開源的特性,它允許你有Delta的寫入器,寫入這些被稱為清單文件的東西,允許你以一種一致的方式從Presto或Athena知道的任何其他基於Presto的係統中查詢Delta表。
然而,我們正在與Presto背後的公司之一Starburst深入合作,為Presto構建一個本地連接器。我們也從Hive社區中獲得了積極的興趣。因此,對於構建連接器有很多興趣。所以,今天,Delta的核心構建在Spark中,但我認為開源和開放標準真正強大的地方在於,任何人都可以與它集成。作為這個項目,我們致力於發展這個生態係統,並與任何人合作。
所以,如果你是其中一個項目的提交者,請加入我們的郵件列表,加入我們的Slack頻道,查看並讓我們知道我們如何幫助你構建這些額外的連接器。
我們可以在社區版的Databricks中對Delta Lake進行實驗嗎?
是的,你可以。Delta Lake在社區版中可用。來看看。所有東西都應該在那裏。讓我們知道你的想法。
可以用Hive創建Delta表嗎?
是的。基本上和Presto的答案是一樣的。社區中有積極的興趣來建立支持。現在還沒有,但這絕對是我們想要開發的東西。
三角洲湖如何處理從原始到黃金的緩慢變化的維度?
在www.eheci.com網站上有一篇博客。如果你穀歌,慢慢改變維度,它會告訴你所有的細節。但我認為,正確的答案是,使用合並運算符加上冪函數或Spark,實際上很容易構建所有不同類型的緩慢變化的維度。
Delta在Spark之上添加的神奇之處就是這些事務。在沒有事務的情況下修改表是非常危險的,而Delta使這成為可能。因此,它支持這種類型的用例。
我們通常處理Azure。我們想知道Delta Lake在Azure事件中心而不是Kafka上運行時是否有任何不同的行為?
我會更籠統地回答這個問題。所以,我想我談到了Delta的一個強大之處,就是它與Spark的整合。其中一個重要原因是,我認為Spark是大數據生態係統中的“瘦腰”。世界上幾乎每個大數據係統都有Spark連接器。
所以,如果Spark能讀取數據,它就能與Delta Lake合作。因此,Event Hub有一個本地連接器,可以插入Spark數據源,也有一個與Spark Kafka一起工作的Kafka API。所以,你可以很容易地從Event Hub讀取數據,並使用Event Hub而不是Kafka來完成我今天談到的所有事情。實際上,這適用於Spark可以讀取的任何係統。
總的來說,再回答一下Azure, Delta完全支持Azure,包括ADLS,我們最近剛剛改進了對ADLS Gen2的支持。你們可以下載。它也是Azure Databricks開箱即用的一部分。
用於DML命令(比如update)的Scala API究竟是什麼?
答案是,它看起來像Spark SQL嗎。Spark SQL,你傳入一個字符串來進行更新。答案是,我們會同時支持這兩種方式。所以,如果你真的去GitHub存儲庫,我相信這段代碼已經被合並了。你可以看到Scala API。如果沒有,就會有一種設計,在票上討論添加更新的細節。
但這裏的想法是,會有一個Scala函數,叫做update,你可以通過編程方式使用,而不需要做字符串插值。還有一種SQL方法,你可以創建一個SQL字符串,然後傳入。這就像你使用你最熟悉的語言,它已經是你工具箱的一部分,Delta應該自動處理它。
Delta Lake與HDFS一起工作嗎?
是的,它完全與HDFS一起工作。HDFS有我們需要的所有原語,所以你不需要任何額外的細節。我在這裏說的是HDFS支持原子重命名,如果目標已經存在就會失敗。
所以,隻要你運行的是一個足夠新版本的HDFS,它應該會自動工作。如果你查看Delta文檔中的入門指南。Io,它有我們支持的所有不同的存儲係統,以及你需要做什麼來設置的詳細信息。
是更新、刪除單行還是記錄級別?
這個問題有兩個答案。因此,Delta確實允許您進行細粒度的個別行更新。因此,您不必在分區級別上執行更新或刪除操作。如果您在分區級別上執行它們,那麼它們就很重要。例如,如果您確實喜歡在分區級別上進行刪除,那麼它們的效率會顯著提高,因為我們可以直接刪除元數據。實際上我們不需要手動重寫。
但如果它們不在分區級別,如果你在做細粒度的單行更新或刪除,我們要做的是找到相關的parquet文件,重寫它們,提交添加和刪除使操作發生。然後,這是完成它的事務。所以,它確實支持它,但它涉及到重寫單個文件。
所以,我在這裏要說的是,如果Delta確實不是為OLTP係統設計的,那麼如果你有很多單獨的行更新,你就不應該使用它。但我們確實支持這種細粒度的用例。
你知道Delta Lake的Scala api什麼時候可以用嗎?
這個問題有幾個答案。因此,Delta Lake的讀寫、流處理和批處理工作已經在Scala中實現了。如果您具體談論的是更新、刪除和合並,我相信大部分代碼已經放入存儲庫中。如果你下載並自己構建,它就在那裏。我們希望能在7月發布。所以,希望本月會有包含額外Scala api的下一個版本。
讓我們來看看。是的。下一個問題是關於數據質量的。除了時間戳之外,我們還能有其他字段用於驗證嗎?是的。所以,我們之前討論的期望隻是一般的SQL表達式。因此,您可以在SQL中編碼的任何期望都是允許的。
所以,在那個例子中,這是一個非常簡單的與特定日期的比較操作。但它可以是任何你想要的。它甚至可以是一個檢查數據質量的UDF。這裏重要的是我們允許你把它們作為你的數據流的屬性而不是你需要記住自己去做的手動驗證。這就強製了所有使用這個係統的人。
Delta Lake是否支持從數據幀而不是臨時表進行合並?
是的。因此,一旦Scala和Python api可用,就可以傳入數據幀。今天,在Databricks內部,唯一可用的是SQL DML。在這種情況下,您確實需要將其注冊為臨時表。但就像我說的,請繼續關注本月底。我們將發布Scala api,然後你就可以自己傳遞數據幀了。
這個問題我已經見過幾次了,所以我再回答一次。我們同時支持ADLs Gen1和Gen2,盡管Gen2會更快,因為我們有一些額外的優化。
在檢查點示例中,內部計算Delta Lake檢查點的Spark作業是否需要手寫?
因此,當您使用流來讀取或寫入一個Delta表或兩者同時進行時,如果您隻是在兩個不同的Delta表之間使用它,檢查點是由結構化流處理的。
因此,您不需要做任何額外的工作來構造檢查點。這是引擎內置的。結構化流在Spark中的工作方式是每個源和所有東西,有一個契約允許我們自動執行檢查點。因此,源需要能夠說,我正在處理從這裏到這裏的數據。這些關於它們在流中的位置的概念,我們稱之為偏移量,它們需要是可序列化的。我們隻是把它們存儲在檢查點。我們基本上使用檢查點作為一個提前日誌。
第10批是這個數據。然後我們嚐試處理第10批。然後寫入同步。這裏的保證是同步必須是有效的。因此,它必須隻接受批次號10一次。如果我們因為一次失敗而嚐試寫兩次,它一定會拒絕並跳過它。通過將所有這些約束放在一起,您實際上隻得到一次自動檢查點處理,而不需要做任何額外的工作。
為什麼不使用多語言持久性並使用RDBMS來存儲酸性事務呢?
我們真的試過了。事實上,Delta的一個早期版本使用MySQL。這裏的問題是MySQL是一台單機。因此,僅僅獲取一個大表的文件列表實際上就會成為瓶頸。然而,當您以Spark本身可以本地處理的形式存儲此元數據時,您可以利用Spark來進行處理。
因此,沒有什麼可以阻止您在存儲係統之上實現Delta事務協議。事實上,現在在GitHub存儲庫上有一個相當長的對話,關於如何構建Delta的基礎DB版本。是的,這當然是可能的。但在我們最初的可伸縮性測試中,我們發現Spark是最快的方法,至少在我們測試的係統中是這樣的,這就是為什麼我們決定這樣做。
這是否意味著我們不需要數據幀,而是可以在Delta Lake上進行所有的轉換?
我會說沒有。好吧,我認為您隻能更新、刪除和合並,而不能使用每個SQL可以使用的任何實際數據幀代碼。但實際上,我認為這是適合工作的合適工具。
Delta Lake確實與Spark數據幀進行了深度集成。就我個人而言,我覺得這是一個非常強大的轉換工具。它就像SQL++,因為你有所有這些關係概念,但嵌入在一個完整的編程語言中。我認為這是一種非常有效的方式來編寫數據管道。
Delta Lake如何管理Spark的新版本?
Delta Lake需要Spark 2.4.3,這是一個最近發布的版本。這是因為在Spark的早期版本中,實際上存在一些錯誤,這些錯誤阻止了數據源正確地插入其中。但總的來說,我們正在致力於Spark兼容性。這實際上是我們這個季度的核心項目之一,就是確保Delta的所有東西都能插入Spark的良好的公共穩定api,這樣我們就可以在未來使用多個版本。
Delta Lake支持ORC嗎?
同樣,在GitHub上有關於添加支持的討論。所以,我鼓勵你們去看看,如果這對你們來說很重要,就這個問題投票。
這個問題有兩個答案。一個是Delta Lake事務協議。我認為它實際上是在事務日誌中,並且實際上支持指定所存儲數據的格式。因此,它實際上可以用於任何不同的文件格式,txt, JSON, CSV,這些都已經內置到協議中了。今天,當你創建一個Delta表時,我們不公開它作為一個選擇,我們隻做拚花。
原因很簡單。我隻是覺得調諧旋鈕越少越好。但是對於像RFC這樣的東西,如果有一個很好的理由讓你的組織可以轉換,我認為支持是非常非常容易的,這是我們在社區中正在討論的事情。所以,請到GitHub,找到這個問題,並填寫它。
Databricks附帶的Delta Lake與開源版本有什麼不同?
這是我經常被問到的問題。我認為思考這個問題的方式是,我想談談開源背後的哲學。我認為api應該是開放的。
因此,任何可以在Databricks中正確運行的程序都應該可以在開源環境中正常運行。現在,這並不完全正確,因為Delta Lake的開源版本隻有兩個月的曆史。因此,我們所做的就是努力開源現有的所有不同的api。所以,更新,刪除,合並曆史記錄,所有這些你可以在Databricks中做的事情也將在開源版本中可用。
管理三角洲湖是我們提供的版本。這樣更容易建立。它將與Databricks的所有其他部分集成。我們進行緩存。我們有一個更快的Spark版本。這樣運行起來就快多了。但就功能而言,我們的目標是在這裏實現完全的功能對等,因為我們致力於使這個開源項目取得成功。我認為開放api是做到這一點的正確方式。
到此為止,我想我們就到此為止。非常感謝你們今天的到來。