從三角洲湖開始

Delta Lake讓Apache Spark變得更好

邁克爾。Databricks的首席軟件工程師
Michael Armbrust是Apache Spark的提交者和PMC成員,也是Spark SQL的最初創建者。他目前在Databricks領導設計和構建結構化流和Databricks Delta的團隊。他於2013年在加州大學伯克利分校獲得博士學位,並由邁克爾·富蘭克林、大衛·帕特森和阿曼多·福克斯擔任顧問。他的論文專注於構建允許開發人員快速構建可伸縮交互應用程序的係統,並特別定義了規模獨立性的概念。他的興趣廣泛包括分布式係統、大規模結構化存儲和查詢優化。

係列的細節

本次會議是丹尼·李(Denny Lee)和三角洲湖團隊“三角洲湖入門”係列的一部分。

會議摘要

加入Delta Lake工程團隊負責人Michael Armbrust,了解他的團隊如何基於Apache Spark將ACID事務和其他數據可靠性技術從數據倉庫世界引入雲數據湖。

Apache Spark是大數據的主要處理框架。Delta Lake為Spark增加了可靠性,因此您的分析和機器學習計劃可以隨時訪問高質量、可靠的數據。本次網絡研討會將介紹如何使用Delta Lake增強Spark環境中的數據可靠性。

主題領域包括

  • Apache Spark在大數據處理中的作用
  • 使用數據湖作為數據體係結構的重要組成部分
  • 數據湖可靠性挑戰
  • Delta Lake如何為Spark處理提供可靠數據
  • Delta Lake增加的具體改進
  • 采用Delta Lake為數據湖供電的便利性

你需要:
注冊社區版在這裏並獲得研討會演示材料和樣本筆記本。

視頻記錄

- [Denny]大家好。歡迎參加我們今天的網絡研討會,用Delta Lake讓Apache Spark變得更好。

在我們開始今天的演示之前,我們想回顧一下一些內務事項,以確保您有最好的體驗。請注意,為了讓大家觀看時更舒適,您的音頻連接將被靜音。如果您有任何顧慮或問題,請在問題小組或聊天中提出。在小組討論中,我們鼓勵你們利用這段時間盡可能多地提出問題,並澄清對今天話題的任何疑問。我們今天的主要主講人Michael Armbrust是Spark SQL和Structured Streaming的最初創建者,也是Delta Lake的主要創建者之一。他是數據庫公司的首席工程師,所以不要再拖延了,把邁克爾帶走吧。-謝謝你,Denny。今天我非常高興能在這裏談論如何通過使用Delta Lake使Apache Spark變得更好。然而,在我開始之前,我想先談談數據湖的概念,以及為什麼這麼多人對它感到興奮,以及為什麼當他們試圖設置這些東西時,會有很多挑戰。

數據的承諾

首先,什麼是數據湖,它對我意味著什麼?所以數據湖的承諾基本上是這樣的,組織有很多數據。它可能是OLTP係統中精心策劃的客戶數據。它可能是來自web服務器的原始點擊流,也可能是來自一堆傳感器的非結構化數據。而數據湖的承諾就是你可以把所有的數據都倒進數據湖裏。當你將它與傳統數據庫相比時,這實際上是非常強大的,因為在傳統數據庫中,你必須從想出一個模式開始,並做大量的清理工作。這通常被稱為寫時模式,數據湖允許你做的是它允許你放棄這個過程,隻是開始收集所有東西,因為有時你直到很久以後才知道數據為什麼有價值,如果你還沒有存儲它,那麼你已經失去了它。數據湖就是文件係統中的一堆文件。它可以是S3或HDFS或Azure Blob存儲,你可以把所有東西都轉儲到那裏,然後再回來查看。我們的想法是,當你完成了,一旦你收集了所有的信息,你就可以從中得到真正的見解。 You can do data science and machine learning. You can build powerful tools for your business, like recommendation engines or fraud detection algorithms. You can even do crazy things like cure cancer using genomics and DNA sequencing. However, I’ve seen this story many, many times, and typically what happens is unfortunately the data at the beginning is garbage. And so the data that you store in your data lake is garbage, and as a result, you get garbage out from these kind of more advanced processes that you try to do at the end. And why does that happen? Why is it so difficult to get quality and reliability out of these data lakes? And what does this kinda typical project look like? So I wanna walk you through a story that I’ve seen kind of happen over and over again at many organizations, when they sit down and try to extract insights from their data.

前沿數據湖的演變

它通常是這樣的,這在今天被認為是最先進的,但在三角洲湖之前。一個很常見的模式是你有一連串的事件。它們進入到像Apache Kafka這樣的係統中,你的任務是做兩件事。你需要做流分析,這樣你就可以實時了解你的業務中發生了什麼。你也想做人工智能和報告,你可以看更長的一段時間,做縱向分析,實際上是看曆史和趨勢,對未來做出預測。那我們要怎麼做?所以第一步,我坐在我的電腦前,我知道Spark有很好的api可以從Apache Kafka讀取數據。你可以使用數據幀,數據集,SQL和Spark SQL來處理,做聚合,時間窗口和各種各樣的事情,然後得出你的流分析。所以,我們從那開始,很快,它工作得很好,但這給我們帶來了第一個挑戰,那就是曆史查詢。

挑戰#1:曆史查詢?

Kafka非常適合進行實時分析,但它隻能存儲一天或一周的數據。你不希望在Kafka中存儲年複一年的數據。所以我們也要解決這個問題。實時技術對於當前正在發生的事情很有幫助,但是對於尋找曆史趨勢就不太好了。所以我讀了很多博客文章,這裏發生的一個非常常見的模式是有一種叫做Lambda架構的東西,據我所知,基本上你隻需要做兩次所有的事情。你有一個實時的東西,它在做一個近似,給你在這個時刻確切地發生了什麼,你有另一個管道,它可能更有策劃。它運行得稍微慢一點,但它會將所有數據存檔到你的數據湖中。這是第一步。如果我們想解決這個曆史查詢問題,我們還要在Apache Spark之上建立Lambda架構,一旦我在數據湖中有了所有的數據,我的想法是現在我也可以在上麵運行Spark SQL查詢,現在你可以做人工智能和報告。這是一些額外的工作,一些額外的協調,但幸運的是Spark有統一的API用於批處理和流處理。 And so it’s possible to do it and we get it set up, but that brings us to challenge number two.

挑戰#2:混亂的數據?

就像我之前說的,現實世界中的數據通常是混亂的。上遊的一些團隊在沒有通知您的情況下更改了模式,現在您遇到了問題。我在這裏看到的一個模式是你需要添加驗證。所以你需要編寫額外的Spark SQL程序來檢查你對數據的假設是否正確。如果他們去了,如果他們錯了,它會發送一封電子郵件讓你改正。現在,當然,因為我們已經做了Lambda架構,我們必須在兩個不同的地方做驗證,但那是,那是我們可以做的事情。我們可以用火花來做。現在我們設置驗證來處理混亂的數據。不幸的是,我們要麵對第三個挑戰,那就是錯誤和失敗。這些驗證很好,但有時你忘記了,或者你的代碼中有錯誤,或者更困難的是你的代碼在中間崩潰,因為你在EC2上運行,你的Spot實例死了,或者其他什麼,現在你必須擔心,“我如何清理它?” The real problem with using these kinds of distributed systems and kinda distributed file systems is if a job crashes in the middle, it leaves garbage results out there that need to be cleaned up. And so you’re kind of forced to do all of the reasoning about correctness yourself. The system isn’t giving you a lot of help here. And so a pretty common pattern is people, rather than working on an entire table at a time, because if something goes wrong, we need to recompute the entire table. They’ll instead break it up into partitions. So you have a different folder. Each folder stores a day or an hour or a week, whatever kind of granularity makes sense for your use case. And you can build a lot of, kinda scripting around it, so that it’s easy for me to do recomputation. So if one of those partitions gets corrupted for any reason, whether it was a mistake in my code or just a job failure, I can just delete that entire directory and reprocess that data for that one partition from scratch. And so kind of by building this partitioning and reprocessing engine, now I can handle these mistakes and failures. There was a little bit of extra code to write, but now I can kind of sleep safe and sound knowing that this is gonna work.

挑戰4:更新?

這就帶來了第四個挑戰,更新。做點更新是非常困難的。添加數據非常容易,但在這個數據湖中更改數據以及如何正確地進行更改非常困難。出於GDPR的原因,你可能需要這樣做。你可能需要留住玩家。你可能需要做一些匿名化或者其他的事情,或者你可能隻是在數據中有一些錯誤。因此,現在你必須編寫一個完整的其他類的Spark作業來執行更新和合並。這是非常困難的,通常因為它是如此的困難,我看到人們做的不是單獨的更新,這將是非常便宜的,他們實際上隻要他們需要做一些事情,每當他們每月得到一組dsr,他們就會複製整個表,刪除任何由於GDPR而被要求遺忘的人。他們可以這樣做,但這是另一個要運行的Spark作業。這非常昂貴。 And there’s kind of a subtlety here that makes it extra difficult, which is if you modify a table while somebody is reading it, generating a report, they’re gonna see inconsistent results and that report will be wrong. So you’ll need to be very careful to schedule this to avoid any conflicts, when you’re performing those modifications. But these are all problems that people can solve. You do this at night and you run your reports during the day or something. And so now we’ve got a mechanism for doing updates. However, the problem here is this has become really complicated. And what that means is you’re wasting a lot of time and money solving systems problems rather than doing what you really want to be doing, which is extracting value from your data. And the way I look at this is these are all distractions of the data lake that prevents you from actually accomplishing your job at hand.

數據湖幹擾

總結一下我的想法,這裏最重要的一點是沒有原子性。當你運行一個分布式計算時,如果作業在中間失敗了,你仍然有一些部分的結果。不是全有或全無。因此原子性意味著當一個作業運行時,它要麼完全正確地完成,要麼如果出現任何錯誤,它完全回滾,什麼都不會發生。這樣你就不用再讓數據處於損壞狀態需要你費力地構建這些工具來進行手動恢複。另一個關鍵問題是沒有質量執行。在每一項工作中,你都要根據你的假設手動檢查輸入的數據的質量。係統沒有提供幫助,就像傳統數據庫中的變體一樣,您可以說,“不,這一列是必需的”或“這必須是這種類型的模式”。所有這些東西都是留給程序員來處理的。最後,沒有對一致性或隔離性的控製。 And this means you can really only do one right operation to any data lake table at a time, and it makes it very difficult to mix streaming and batch to do operations while people are reading from it. And these are all things that you kind of, you would expect from your data storage system. You would want to be able to do these things, and people should always be able to see a consistent snapshot automatically.

現在讓我們退一步看看Delta Lake的過程是怎樣的。

數據湖的挑戰

Delta Lake的想法是,我們采用了這個相對複雜的架構,其中很多正確性和其他東西都是由你手動編寫Spark程序來完成的。我們把它改變成這樣,你隻考慮數據流,你從你的組織中引入所有的數據,讓它流動,不斷提高質量,直到它準備好供消費。

的一個

這種架構的特點是,首先,Delta Lake為Apache Spark帶來了完整的ACID事務。這意味著現在運行的每個Spark作業要麼完成整個作業,要麼什麼都不完成。同時讀寫的人可以保證看到一致的快照。當一些東西被寫出來的時候,它肯定是被寫出來的,它不會丟失。這些都是ACID的特征。這樣你就可以專注於你實際的數據流而不是考慮所有這些額外的係統問題並一遍又一遍地解決這些已知的問題。Delta Lake的另一個關鍵方麵是它基於開放標準,而且是開源的。所以這是一個完整的Apache許可證,沒有什麼愚蠢的公共條款之類的。你可以完全免費地將它用於任何你想要的應用程序。就我個人而言,這對我來說非常重要如果我要存儲pb級的數據,對吧? Data has a lot of gravity. There’s a lot of inertia when you collect a lot of data and I wouldn’t want to put it in some black box where it’s very difficult for me to extract it. And this means that you can store that mass amount of data without worrying about lock-in. So both is it open source, but it’s also based on open standards. So I’ll talk about this in more detail later in the talk, but underneath the covers, Delta is actually storing your data in parquet. So you can read it with other engines and there’s kind of a growing community around Delta Lake building this native support in there. But worst case scenario, if you decide you want to leave from Delta Lake all you need to do is delete the transaction log and it just becomes a normal parquet table. And then finally, Delta Lake is deeply powered by Apache Spark. And so what this means is if you’ve got existing Spark jobs, whether they’re streaming or batch, you can easily convert those to getting all kinds of benefits of Delta without having to rewrite those programs from scratch. And I’m gonna talk exactly about what that looks like later in the talk. But now I want to take this picture and simplify it a little to talk about some of the other hallmarks I see of the Delta Lake architecture, and where I’ve seen people be very successful. So first of all, I wanna kind of zone in on this idea of data quality levels. These are not fundamental things of Delta Lake. I think these are things that people use a variety of systems, but I’ve seen people very successful with this pattern, alongside the features of Delta.

A三角洲湖

這些隻是數據質量的一般類別,這裏的想法是,當你把數據帶入數據湖,而不是試圖讓它一次完美,你會逐步提高數據的質量,直到它可以消費。我會講一下為什麼我認為這實際上是一個非常強大的模式,可以幫助你更有效率。所以從最開始就是你的青銅等級數據。這是原始數據的傾倒場。它仍然在燃燒,我實際上認為這是一件好事,因為這裏的核心思想是,如果您捕獲了所有東西,而不進行大量的解析或解析,那麼在解析和解析代碼中就不可能有錯誤。你從一開始就保留了所有內容你通常可以保留一年的用戶留存。我會講一下為什麼我認為這真的很重要,但這意味著你可以收集所有東西。你不需要提前花很多時間來決定哪些數據有價值,哪些數據沒有價值。你可以在做分析的過程中弄清楚。從青銅開始,我們來看銀的水平數據。 This is data that is not yet ready for consumption. It’s not a report that you’re gonna give to your CEO, but I’ve already done some cleanup. I filtered out one particular event type. I’ve parsed some JSON and given it a better schema or maybe I’ve joined and augmented different data sets. I kinda got all the information I want in one place. And you might ask, if this data isn’t ready for consumption, why am I creating a table, taking the time to materialize it? And there’s actually a couple of different reasons for that. One is oftentimes these intermediate results are useful to multiple people in your organizations. And so by creating these silver level tables where you’ve taken your domain knowledge and cleaned the data up, you’re allowing them to benefit from that kind of automatically without having to do that work themselves. But a more interesting and kind of more subtle point here is it also can really help with debugging. When there’s a bug in my final report, being able to query those intermediate results is very powerful ’cause I can actually see what data produced those bad results and see where in the pipeline it made sense. And this is a good reason to have multiple hops in your pipeline. And then finally, we move on to kind of the gold class of data. This is clean data. It’s ready for consumption at business-level aggregates, and actually talk about kind of how things are running and how things are working, and this is almost ready for a report. And here you start using a variety of different engines. So like I said, Delta Lake already works very well with Spark, and there’s also a lot of interest in adding support for Presto and others, and so you can do your kind of streaming analytics and AI and reporting on it as well.

現在我想談談人們是如何通過三角洲湖,通過這些不同的質量等級來移動數據的。我反複看到的一個模式是流媒體實際上是一個非常強大的概念。在我深入研究流媒體之前,我想糾正一些我經常聽到的誤解。當人們聽到流媒體時,他們通常會想到一件事,他們認為它必須非常快。它必須非常複雜,因為你想要它非常快。Spark實際上是支持這種模式的如果你有這個應用的話。有持續的處理,你不斷地拉服務器獲取新數據,有點像保持核心,它支持毫秒延遲,但這實際上不是唯一的應用程序,流可以有意義。流媒體對我來說實際上是增量計算。它是關於一個我想在新數據到達時持續運行的查詢。因此,與其把它看作是一堆離散的工作,並把這些離散工作的所有管理都放在我或某個工作流引擎上,流媒體可以消除這種情況。 You write a query once. You say, “I want to read from the bronze table, I’m gonna do these operations, I went right to the silver table,” and you just run it continuously. And you don’t have to think about the kind of complicated bits of what data is new, what data has already been processed. How do I process that data and commit it downstream transactionally? How do I checkpoint my state, so that if the job crashes and restarts, I don’t lose my place in the stream? Structured streaming takes care of all of these concerns for you. And so, rather than being more complicated, I think it can actually simplify your data architecture. And streaming in Apache Spark actually has this really nice kind of cost-latency tradeoff that you can too. So at the far end, you could use continuous processing mode. You can kind of hold onto those cores for streaming persistently, and you can get millisecond latency. In the middle zone, you can use micro-batch. And the nice thing about micro-batch is now you can have many streams on the cluster and they’re time-multiplexing those cores. So you run a really quick job and then you give up that core and then someone else comes in and runs it. And with this, you can get seconds to minutes latency. This is kind of a sweet spot for many people, ’cause it’s very hard to tell if one of your reports is up to date within the last minute, but you do care if it’s up to date within the last hour. And then finally, there’s also this thing called trigger once mode in Structured Streaming. So if you have a job where data only arrives once a day or once a week or once a month, it doesn’t make any sense to have that cluster up and running all the time, especially if you’re running in the cloud where you can give it up and stop paying for it. And Structured Streaming actually has a feature for this use case as well. And it’s called trigger once where basically rather than run the job continuously, anytime new data arrives, you boot it up. You say trigger once. It reads any new data that has arrived, processes it, commits a downstream transaction and shuts down. And so this can give you the benefits of streaming, kind of the ease of coordination, without any of the costs that are traditionally associated with an always running cluster. Now, of course, streams are not the only way to move data through a Delta Lake. Batch jobs are very important as well. Like I mentioned before, you may have GDPR or kind of these corrections that you need to make. You may have changed data capture coming from some other system where you’ve got a set of updates coming from your operational store, and you just want to reflect that within your Delta Lake and for this, we have UPSERTS. And of course, we also support just standard insert, delete, and those kinds of commands as well. And so the really nice thing about Delta Lake is it supports both of these paradigms, and you can use the right tool for the right job. And so, you can kind of seamlessly mix streaming and batch without worrying about correctness or coordination.

我想講的最後一種模式是重新計算的概念。所以當你有了這個早期的表格保存了你所有的原始結果當你有了很長時間的保留,也就是幾年的原始數據。當你在Delta Lake數據圖的不同節點之間使用流時,你很容易進行重新計算。你可能想要重新計算因為你的代碼中有一個錯誤,或者你可能想要重新計算因為你決定要提取一些新的東西。這裏真正好的事情是由於流媒體的工作方式是非常簡單的。為了讓你們了解結構化流在Apache Spark中是如何工作的,我們基本上有這樣一個模型,流查詢應該總是返回與批查詢相同數量數據的相同結果。這意味著當你對Delta表啟動一個新流時,它會在流開始的那一刻對那個表進行快照。你做這個回填操作你處理快照中的所有數據,把它分解成漂亮的小塊,沿途檢查你的狀態,向下遊提交它。當您到達快照的末尾時,我們切換到跟蹤事務日誌,隻處理自查詢開始以來到達的新數據。這意味著您得到的結果與您在最後運行查詢相同,但是比從頭開始一遍又一遍地運行查詢工作量要少得多。 So if you want to do recomputation under this model, all you need to do is clear out the downstream table, create a new checkpoint, and start it over. And it will automatically process from the beginning of time and catch up to where we are today.

這實際上是一個非常強大的模式用於糾正錯誤和做其他事情。現在我們已經講過了高層次的內容,我想談談Delta Lake在降低成本和簡化在這些數據湖上使用Apache Spark的管理方麵發揮了重要作用的一些具體用例。我想講一下德爾塔湖的曆史。

被全世界上千個組織使用

三角洲湖已經有兩年的曆史了。我們在數據庫裏存了兩年了。這是一個專有的解決方案,我們的一些大客戶正在使用它。beplay体育app下载地址所以我要特別講講康卡斯特,還有拳頭遊戲,Jam City,還有英偉達,這些都是你們知道的大公司。他們已經用了很多年了。大約兩個月前,在Spark峰會上,我們決定開源它,這樣每個人,甚至是使用prem或在其他地方運行的人都可以使用Delta Lake的電力。我想講一個我認為很酷的特殊用例。這是康卡斯特。所以他們的問題是他們在世界各地都有機頂盒,為了了解人們如何與他們的程序交互,他們需要對這些信息進行會話。所以你看這個電視節目,你換台,你到這裏,你回到另一個電視節目。 And with this they can create better content by understanding how people consume it. And as you can imagine, Comcast has many subscribers, so there’s petabytes of data. And before Delta Lake, they were running this on top of Apache Spark. And the problem was the Spark job to do this sessionization was so big that the Spark job, the Spark scheduler would just tip over. And so, rather than run one job, what they actually had to do was they had to take this one job, partition it by user ID. So they kind of take the user ID, they hash it, they mod it by, I think, by 10. So they break it into kind of 10 different jobs, and then they run each of those jobs independently. And that means that there’s 10x, the overhead, in terms of coordination. You need to make sure those are all running. You need to pay for all of those instances. You need to handle failures and 10 times as many jobs, and that’s pretty complicated. And the really cool story about switching this to Delta was they were able to switch a bunch of these kinds of manual processes to streaming. And they were able to dramatically reduce their costs by bringing this down into one job, running on 1/10 of the hardware. So they’re now computing the same thing, but with 10x less overhead and 10x less cost. And so that’s a pretty kind of powerful thing here that what Delta’s scalable metadata can really bring to Apache Spark. And I’m gonna talk later in the talk exactly how that all works.

但在開始之前,我想說,我想向您展示,如果您已經在Delta Lake中使用Apache Spark,那麼入門是多麼容易。

使用Spark api開始了解Delta

所以開始是微不足道的。所以它發布在Spark Packages上。在Spark集群上安裝Delta Lake所需要做的就是使用Spark包。如果你在用PySpark,你可以做破折號包,然後是Delta。如果你用的是火花彈,也是一樣的。如果您正在構建一種Java或Scala jar,並且希望依賴於Delta,那麼您所需要做的就是添加一個Maven依賴項,然後更改代碼也同樣簡單。如果你在spark SQL中使用數據幀讀取器和寫入器,你所需要做的就是將數據源從parquet或JSON或CSV或任何你現在使用的東西更改為Delta,其他一切都應該是一樣的。唯一不同的是,現在所有的東西都是可擴展和事務性的,正如我們之前看到的,這是非常強大的。

數據質量

到目前為止,我講的大部分都是關於正確性的係統問題。如果我的工作崩潰了,我不希望它破壞這個表。如果兩個人同時向表寫入數據,我希望他們都能看到一致的快照,但數據質量實際上不止於此。您可以編寫正確運行的代碼,但代碼中可能存在錯誤並得到錯誤的答案。這就是為什麼我們要擴展數據質量的概念,讓你可以聲明性地談論質量約束。這是下個季度左右的工作,但這裏的想法是,我們允許你,在一個地方,指定你的三角洲湖的布局和限製。首先我們可以看到一些重要的東西,比如數據存儲在哪裏。您可以選擇啟用嚴格的模式檢查。Delta Lake在這裏有兩種不同的模式,我經常看到人們在進行數據質量測試時同時使用這兩種模式。在之前的表格中,你會使用模式印記,你可能隻是讀取了一堆JSON,然後把它原樣放入Delta Lake中。 We have nice tools here where we will automatically perform safe schema migrations. So if you’re writing data into Delta Lake, you can flip on the merge schema flag, and it will just automatically add new columns that appear in the data to the table, so that you can just capture everything without spending a bunch of time writing DDL. We, of course, also support kinda standard strict schema checking where you say, create table with the schema, reject any data that doesn’t match that schema, and you can use alter table to change the schema of a table. And often I see this use kind of down the road in kind of the gold level tables where you really want strict enforcement of what’s going in there. And then finally, you can register tables in the Hive Metastore. That support is coming soon, and also put human readable descriptions, so people coming to this table can see things, like this data comes from this source and it’s parsed in this way, and it’s owned by this team. These kind of extra human information that you can use to understand what data will get you the answers you want. And then finally, the feature that I’m most excited about is this notion of expectations. An expectation allows you to take your notion of data quality and actually encode it into the system. So you can say things like, for example, here, I said, I expect that this table is going to have a valid timestamp. And I can say what it means to be a valid timestamp for me and from my organization. So, I expected that the timestamp is there and I expect that it happened after 2012 because my organization started in 2012, and so if you see data from, say, 1970 due to a date parsing error, we know that’s incorrect and we want to reject it. So this is very similar to those of you who are familiar with a traditional database. This sounds a lot like a variant where you could say not null or other things on a table, but there’s kind of a subtle difference here. I think if you, so the idea of invariants are, you can say things about tables, and if one of those invariants is violated, the transaction will be aborted, will automatically fail. And I think the problem with big data, why invariants alone are not enough is if you stop processing every single time you see something unexpected, especially in those earlier bronze tables, you’re never going to process anything. And that can really hurt your agility. And so the cool thing about expectations is we actually have a notion of tuneable severity. So we do support this kind of fail stop, which you might want to use on a table that your finance department is consuming ’cause you don’t want them to ever see anything that is incorrect. But we also have these kinds of weaker things where you can just monitor how many records are valid and how many are failing to parse and alert at some threshold. Or even more powerful, we have this notion of data quarantining where you can say any record that doesn’t meet my expectations, don’t fail the pipeline, but also don’t let it go through. Just quarantine it over here in another table, so I can come and look at it later and decide what I need to do to kind of remediate that situation. So this allows you to continue processing, but without kind of corrupting downstream results with this invalid record. So like I said, this is a feature that we’re actively working on now. Stay tuned to GitHub for more work on it. But I think this kind of fundamentally changes the way that you think about data quality with Apache Spark and with your data lake.

現在我已經講過了,什麼是,為什麼要關心它?我想深入了解Delta是如何工作的。因為這聽起來太好了,我們可以把這些完整的ACID事務帶到像Apache Spark這樣的分布式係統中,並且仍然保持良好的性能。

磁盤增量

首先,讓我們看一下當一個Delta表被存儲在磁盤上時是什麼樣子的。對於那些已經有數據湖的人來說,這看起來很熟悉。它隻是存儲在文件係統中的一個目錄,S3, HDFS, Azure Blob存儲,ADLS。它隻是一個目錄,裏麵有一堆拚花文件。還有一點非常重要,那就是我們還要存儲這個事務日誌。在事務日誌內部,有不同的表版本。我一會兒會講到這些表的版本,但我們仍然把數據存儲在分區目錄中。然而,這實際上主要用於調試。它們也是Delta的一種模式,我們可以以最優的方式直接使用存儲係統。例如,在S3上,他們建議如果你要定期寫大量數據,而不是創建日期分區,那會產生時間局部性的熱點,而不是隨機散列分區,因為Delta元數據的強大,我們也可以這樣做。 And then finally, standard data files, which are just normal and coded parquet that can be read by any system out there.

Table =一係列操作的結果

那麼這些表格版本裏到底有什麼呢?我們如何推斷表的當前狀態是什麼?每個版本的表都有一組操作應用於表並以某種方式改變它。在這個時刻,表的當前狀態,是所有這些動作的和的結果。我說的是什麼樣的行為呢?舉個例子,我們可以改變元數據。我們可以說,這是表的名字。這是表的模式。您可以向表中添加列或其他內容。您可以設置表的分區。 So one action you can take is change the metadata. The other actions are add a file and remove a file. So we write out a parquet file, and then to actually make it visible in the table, it needs to also be added to the transaction log. And I’ll talk about why that kind of extra level of indirection is a really powerful trick in a moment. And another kind of detail here is when we add files into Delta, we can keep a lot of optional statistics about them. So in some versions we can actually keep the min and max value for every column, which we can use to do data skipping or quickly compute aggregate values over the table. And then finally you can also remove data from the table by removing the file. And again, this is kind of a lazy operation. This level of indirection is really powerful. When we remove a file from the table, we don’t necessarily delete that data immediately, allowing us to do other cool things like time travel. And so the result here of taking all these things is you end up with the current metadata, a list of files, and then also some details, like a list of transactions that have committed, the protocol version for that.

實現原子性

那麼這是如何讓我們得到ACID的呢?來獲得事務性數據庫的這些優良屬性?這裏有一個細節,當我們創建這些表版本時,我們將它們存儲為有序的原子單元,稱為提交。我之前講過這個。我們通過創建這個文件0.json創建表的0版本。這裏的思想是,當Delta在文件係統上構造那個文件時,我們將使用底層原子原語。因此,在S3上,為了保證原子性,您所需要做的就是上傳到係統。他們這樣做的方式是你在上傳開始時說,我希望上傳這麼多字節。除非你真的成功上傳了那麼多字節,否則S3不會接受寫操作。所以你可以保證要麼得到整個文件,要麼一個文件都得不到。 On other systems like Azure or HDFS, what we’ll do is we’ll create a temporary file with the whole contents and then we’ll do an atomic rename, so that the entire file is created or not. So then you can kind of have successive versions. So version one, we added these two files or sorry, in version zero, we added these two files. In version one, we removed them and put in a third. So for example, you could be doing compaction here where you atomically take those two files and compact them into one larger file.

確保Serializablity

現在,另一個重要的細節是我們想要每個提交都具有原子性,但我們也想要可序列化性。我們希望每個人都同意這個表的變化順序,這樣我們就可以正確地做一些事情,比如合並到變化數據捕獲和其他需要這個屬性的事情。為了在有多個作者的情況下達成一致,我們需要這個叫做互斥的性質。如果兩個人嚐試創建同一個版本的Delta表,那麼隻有一個人能夠成功。為了讓這個更清楚一點,用戶1可以寫表的0版本,用戶2可以寫表的1版本,但是如果他們都嚐試寫表的2版本,那麼其中一個可以成功。但另一個必須得到一個錯誤消息,說,對不起,您的交易沒有通過。

樂觀地解決衝突

現在你可能會說,等一下,如果兩個人同時做一件事,它就失敗了。聽起來我浪費了很多時間和很多工作。這聽起來對我來說太複雜了。幸運的是,在這裏我們使用了第三種很酷的技巧,稱為樂觀並發。樂觀並發的思想是當你在表上執行一個操作時,你隻是樂觀地假設它將會工作。如果你有衝突,你就會看看這個衝突對你來說是否重要。如果沒有,你可以樂觀地再試一次。在大多數情況下,實際上這些事務並沒有重疊你可以自動進行補救。這裏給你一個具體的例子,假設我們有兩個用戶這兩個用戶都流進了同一個表。因此,當它們都開始流寫時,它們從讀取當時的表版本開始。 They both read in version zero. They read in the schema of the table. So they make sure that the data that they’re appending has the correct format. And then they write some data files out for the contents of the stream that are gonna be recorded in this batch. And they record what was read and what was written from the table. Now they both try to commit, and in this case, user one wins the race and user two loses. But what user two will do is they’ll check to see if anything has changed. And because the only thing they read about the schema, of the table with the schema and the schema has not changed, they’re allowed to automatically try again. And this is all kind of hidden from you as the developer. This all happens automatically under the covers. So they’ll both try to commit, and they’ll both succeed.

處理海量元數據

現在,我們的最後一個技巧是表可以有大量的元數據。那些嚐試過在Hive Metastore中放置數百萬個分區的人可能對這個問題很熟悉。實際上,一旦你有了這些數據大小,元數據本身就會讓係統崩潰。所以我們有一個技巧,實際上,我們已經有了一個分布式處理係統能夠處理大量的數據。我們隻使用Spark。因此,我們獲取事務日誌及其操作集。我們和斯帕克一起讀的。我們可以把它編碼為parquet中的檢查點。檢查點基本上是某個版本的表的整個狀態。因此,在讀取事務日誌時,不必讀取整個事務日誌,隻需從檢查點開始,然後從檢查點之後發生的任何後續更改。 And then this itself can be processed with Spark. So when you come to a massive table that has millions of files, and you ask the question like, “How many records were added yesterday?” What we’ll do is we’ll run two different Spark jobs. The first one queries the metadata and says, “Which files are relevant to yesterday?” And it’ll get back that list of files, and then you’ll run another Spark job that actually processes them and does the count. And by doing this in two phases, we can drastically reduce the amount of data that needs to be processed. We’ll only look at the files that are relevant to the query, and we’ll use Spark to do that filtering.

路線圖

在我們結束進入提問環節之前,我想談談路線圖。就像我之前說的,雖然這個項目已經存在了幾年,但它隻是最近才開源的。因此,今年剩下的時間裏,我們有一個非常令人興奮的路線圖。基本上,我們的目標是讓開源的Delta Lake項目與Databricks內部可用的API完全兼容,所以我們在本季度剩餘時間的路線圖基本上是開源我們擁有的許多很酷的功能。實際上,我們在幾周前發布了0.2.0版本,增加了從S3讀取數據的支持,也支持從Azure Blob Store和Azure Data Lake讀取數據。然後這個月,我們計劃發布一個0.3.0版本。它將為UPDATE、DELETE、MERGE和VACUUM添加Scala api,隨後將添加Python api。在這個季度剩下的時間裏,我們計劃做一些事情。我們希望添加完全的DDL支持,也就是創建表和修改表。我們還想讓你能夠在Hive Metastore中存儲Delta表,我認為這對於不同組織中的數據發現是非常重要的。 And we want to take those DML commands from before, UPDATE, DELETE, and MERGE, and actually hook them into the Spark SQL parser, so you can use standard SQL to do those operations as well. And then moving forward kind of, let us know what you want. So if you’re interested in doing more, I recommend you to check out our website at delta.io, and it has kind of a high level overview of the project. There’s a quick start guide on how you can get started, and it also has links to GitHub where you can watch the progress and see what our roadmap is, and submit your own issues on where you think the project should be going. So I definitely encourage you to do that, but with that, I think we’ll move over to questions. So let me just pull those up and see what we got.

好的。第一個問題是,之後材料和錄音是否可用?關於這一點,我希望丹尼能告訴我們。丹尼,你在嗎?- [Denny]沒問題。是的,作為一個簡短的號召,對於所有報名參加本次網絡研討會的人,我們會把幻燈片和錄音都發出去。整個過程大約需要12到24小時。所以你應該會在今天晚些時候或明天早些時候收到這封郵件。

-太棒了,非常感謝。是的,所有這些都應該在這裏,你們可以稍後再看。YouTube上也有視頻。所以請繼續關注三角洲湖的更多內容。接下來是其他問題。第一個問題是,Delta Lake是否增加了性能開銷?這是一個非常有趣的問題。我想把它分解一下。首先,三角洲湖被設計成一個高通量係統。所以每一個單獨的操作,都會有一些開銷。 So you’d basically because rather than just write out the files, we need to write out the files and also write out the transaction log. So that adds a couple of seconds to your Spark job. Now, the important thing here is we designed Delta to be massively parallel and very high throughput. So you get a couple of seconds added to your Spark job, but that is mostly independent of the size of your Spark job. So what Delta Lake is really, really good at is ingesting trillions of records of data or petabytes of data or gigabytes of data. What Delta is not good at is inserting individual records. If you run one Spark job, one record per Spark job, there’ll be a lot of overhead. So kind of the trick here is you want to use Delta in the places where Spark makes the most sense, which are relatively large jobs spread out across lots of machines. And in those cases, the overhead is negligible.

下一個問題是,既然它具有ACID屬性,那麼我的係統是否也具有高可用性呢?這是一個非常好的問題,我想稍微解釋一下。Delta是專門設計來利用雲計算利用這些很好的特性。對我來說,雲有幾個很好的特性。一是雲非常穩定。您可以將大量數據放入S3中,它可以任意地處理這些數據。它通常是高度可用的。你總是可以從S3讀取數據,無論你在哪裏。如果你真的很在乎,甚至還有像複製這樣的東西,你可以把數據複製到多個區域,Delta在這方麵做得很好。所以從Delta表中讀取應該是高度可用的,因為它實際上隻是底層存儲係統的可用性。 Now, those of you who are familiar with the CAP theorem might be saying, “But wait a second.” So for writes, when we think about consistency, availability, and partition tolerance, Delta chooses consistency. So we will, if you cannot talk to kind of the central coordinator, depending on whether you’re on S3, that might be kind of your own service that you’re running on Azure. They’ve taken kind of the consistency approach (indistinct) we use an atomic operation there. The system will pause. But the nice thing here is because of that kind of optimistic concurrency mechanism, that doesn’t necessarily mean you lose that whole job that you might’ve been running for hours. It just means you’ll have to wait until you’re able to talk to that service. So I would say in terms of reads, very highly available, in terms of writes, we choose consistency, but in general, that actually still works out pretty well.

下一件事是你要保存所有級別的數據。好吧,我想澄清一下銅,銀,金背後的概念。不是每個人都保留原始數據。不是每個人都保存所有的數據。你可能有保留要求,說你隻允許保留兩年的數據。所以,我認為應該由你來決定哪些數據是有意義的。我想說的唯一一件事是,我認為數據湖的好處,以及Delta對它們的應用方式,是你有權保留原始數據,盡可能多地保留。因此,沒有技術限製允許你保存所有的數據,因此,我工作過的許多組織實際上保留了法律允許他們保存很長時間的所有數據。隻有在他們不得不處理的時候才會移除。

下一個問題是你用什麼來寫這個邏輯?我們能用Scala寫邏輯嗎?Delta Lake插入到所有現有api, Apache Spark,這意味著你可以使用其中任何一個。所以如果你是一個Scala程序員,你可以使用Scala。如果你是一個Java程序員,這也可以。我們在Python中也有綁定,如果你是分析師,你根本不想編程,我們也支持純SQL。所以我們的想法是底層引擎是用Scala寫的,Delta也是用Scala寫的,但是你的邏輯可以用任何你喜歡的語言寫。這是另一種情況,我認為你需要正確的工具來做正確的工作。所以就我個人而言,我在Scala中做了很多事情,但當我需要製作圖形時,我切換到Python並使用該平台。Beplay体育安卓版本但Delta仍然給了我過濾大量數據的能力,將其縮小到適合Pandas的東西,然後我用它做一些圖形。

那麼下一個問題是,普雷斯托是三角洲湖的一部分還是隻有斯帕克?這是一個很好的問題。這實際上是現在發展得非常快的東西。這個問題有幾個不同的答案。我來告訴你們倆我們現在在哪,要去哪。現在,在Databricks中有一個特性,我們正在開源中工作,它允許你有Delta的寫入器,寫出這些被稱為清單文件的東西,允許你以一致的方式從Presto或Athena或任何其他基於Presto的係統中查詢Delta表。然而,我們正在與Presto背後的公司之一Starburst深入合作,為Presto構建一個本地連接器。我們也從Hive社區和Scalding社區得到了積極的興趣,所以有很多人對構建連接器感興趣。所以今天,Delta的核心是在Spark中構建的,但我認為開源和開放標準的真正強大之處在於,任何人都可以與它集成。同時,這個項目我們致力於發展生態係統並與任何人合作。 So if you’re a committer on one of those projects, please join our mailing list, join our Slack channel, check it out, and let us know how we can help you build these additional connectors.

下一個問題,我們可以在社區版的Databricks上試驗Delta Lake嗎?是的,你可以。Delta Lake有社區版,看看吧。所有東西都應該在那裏。讓我們知道你的想法。

下一個問題是,可以用Hive查詢Delta表嗎?是的,基本上和Presto的答案一樣。社區對建立這種支持很感興趣。現在還沒有,但這絕對是我們想要開發的東西。下一個問題,Delta Lake如何處理從原始到黃金的緩慢變化?

這是個好問題,在www.eheci.com網站上有一篇博客文章。如果你慢慢改變維度,Delta,它會告訴你所有的細節,但我認為真正正確的答案是合並運算符加上Spark的功能,它實際上很容易構建所有不同類型的慢慢改變維度。Delta在Spark之上添加的神奇之處就是這些事務。在沒有事務的情況下修改表是非常危險的,而Delta使這成為可能,因此在某種程度上支持這種用例。

下一個是,我們通常處理Azure。我們想知道Delta Lake在Azure事件中心而不是Kafka上運行時是否有任何不同的行為。是的,我會更籠統地回答這個問題。我講過Delta的一個強大之處就是它與Spark的整合。其中一個重要原因是,我把Spark當作大數據生態係統的廢物。世界上幾乎每個大數據係統都有Spark連接器。如果星火能讀取數據,它就能和德爾塔湖一起工作。因此,Event Hub有一個通過Spark數據源插入的本地連接器,也有一個與Spark Kafka一起工作的Kafka API。因此,你可以很容易地從事件中心讀取數據,並使用事件中心而不是Kafka來完成我今天談到的所有事情。這確實適用於Spark可以讀取的任何係統。

總的來說,為了回答Azure的問題,Delta完全支持Azure,包括ADLS。我們最近剛剛改進了對ADLS的支持,第2代。所以你可以下載它,它也是Azure數據庫的一部分,開箱即用。

下一個問題是,用於DML命令(如更新)的Scala API到底是什麼?答案是,它看起來像Spark SQL嗎,在Spark SQL中,你傳遞一個字符串來進行更新?答案是,我們兩者都支持。所以如果你真的去GitHub存儲庫,我相信這段代碼已經被合並了。所以你可以看到Scala API,如果沒有,有一個設計文檔討論了添加更新的細節。這裏有一個Scala函數叫做Update,你可以通過編程方式使用它而不需要做字符串插值,還有一種SQL方法來做它。你可以創建一個SQL字符串並傳遞進去。這就像,你使用你最熟悉的語言,它已經是你工具箱的一部分,Delta應該自動地處理它。

下一個問題是,Delta Lake能和HDFS一起工作嗎?是的,它完全與HDFS一起工作。HDFS有我們需要的所有原語,所以你不需要任何額外的細節,我說的是HDFS支持原子重命名,如果目標已經存在,它就會失敗。所以隻要你運行一個足夠新版本的HDFS,它甚至不是那麼新,它就應該自動工作。如果你查看Delta文檔中的入門指南。Io,它有我們支持的所有不同的存儲係統,以及你需要做什麼來設置的詳細信息。

下一個問題,更新、刪除是單行級還是記錄級?這個問題有兩個答案。是的,Delta允許你進行細粒度的,單獨的行更新。因此,您不必在分區級別上進行更新或刪除。如果您在分區級別上執行它們,那麼它們就很重要。例如,如果在分區級別上執行刪除操作,效率會顯著提高,因為我們可以直接刪除元數據。實際上我們不需要手動重寫。但如果它們不在分區級別,如果你做的是細粒度的單行更新或刪除,我們要做的是找到相關的parquet文件,重寫它們,提交添加和刪除以使操作發生,然後這是一種完成操作的事務。所以它確實支持它,但它涉及到重寫單個文件。我在這裏要說的是,Delta顯然不是被設計成OLTP係統的。 You should not use it if you have lots of individual row updates, but we do support that fine granularity use case.

你知道Delta Lake的Scala api什麼時候可以用嗎?這個問題有幾個答案。Delta Lake讀寫、流處理和批處理已經可以在Scala中工作了。今天就能買到。如果您具體談論的是更新、刪除和合並,那麼我相信大部分代碼已經放到存儲庫中了。如果你下載並自己構建,它就在那裏。我們希望能在7月發布。所以希望這個月,下一個版本會包含這些額外的Scala api。

讓我們來看看。

下一個問題是關於數據質量的。除了時間戳之外,我們還能有其他字段用於驗證嗎?是的,所以我們之前討論的期望隻是一般的SQL表達式。因此,任何可以在SQL中編碼的期望都是允許的。所以在那個例子中,它可以是,一個非常簡單的比較操作與某個特定的日期,但它可以是你想要的任何東西。它甚至可以是一個檢查數據質量的UDF。這裏最重要的是我們允許你把它們作為數據流的屬性,而不是作為你需要記住自己去做的手動驗證。這就強製了所有使用這個係統的人。

Delta Lake是否支持從數據幀而不是臨時表進行合並?是的,一旦Scala和Python api可用,你就可以傳入一個數據幀。如今在Databricks內部,唯一可用的是SQL DML,在這種情況下,你需要把它注冊為一個臨時表。但就像我說的,請繼續關注本月底。我們會有一個帶有Scala api的版本,然後你就可以自己傳遞數據幀了。

這個問題我已經見過幾次了,所以我再回答一次。我們同時支持ADLS第一代和第二代,雖然第二代會更快,因為我們有一些額外的優化。

下一個是檢查點示例,計算Delta Lake檢查點的Spark作業是內部的還是需要手寫的?這是個好問題。所以當你使用流來讀取或寫入一個Delta表時,如果你隻是在兩個不同的Delta表之間使用它,檢查點是由結構化流處理的。因此,您不需要做任何額外的工作來構造檢查點。這是引擎內置的。結構化流在Spark中的工作方式是每個源和每個同步,都有一個合約,允許我們自動地做檢查點。因此源需要能夠說,我正在處理從這裏到這裏的數據,而那些關於它們在流中的位置的概念,我們稱之為偏移量,這些需要是可序列化的。我們隻是把它們存儲在檢查點。我們基本上將檢查點用作提前寫日誌。所以我們說,第10批將是這個數據。 Then we attempt to process batch number 10, then we write it to the sync, and the guarantee here is the sync must be idempotent. So it must only accept batch number 10 once, and if we try to write it twice due to a failure, it must reject that and kind of just skip over it. And by putting all of these kind of constraints together, you actually get exactly once processing with automatic checkpointing without you needing to do any extra work.

問得好。為什麼不使用多語言持久性並使用RDBMS來存儲資產事務呢?這是個很好的問題,我們也試過。事實上,Delta的一個早期版本使用了MySQL,這裏的問題是MySQL是一台機器,所以僅僅是為一個大表獲取文件列表實際上就會成為瓶頸。然而,當您以Spark本身可以本地處理的形式存儲此元數據時,您可以利用Spark來進行處理。因此,沒有什麼可以阻止您在存儲係統之上實現Delta類型的事務協議。事實上,現在在GitHub存儲庫上有一個相當長的對話,這有點來回討論構建Delta的基礎DB版本需要什麼,這當然是可能的,但在我們最初的可伸縮性測試中,我們發現Spark是最快的方法,至少在我們測試的係統中,這就是為什麼我們決定這樣做。

另一個問題,這是否意味著我們不需要數據幀,而是可以在Delta Lake上進行所有的轉換?我會說沒有。我認為你隻能使用更新、刪除和合並,而不能使用任何實際的數據幀代碼。你可以使用純SQL,但實際上,我認為這是用於正確工作的正確工具。Delta Lake確實與Spark數據幀進行了深度集成。就我個人而言,我覺得這是一個非常強大的轉換工具。它有點像SQL plus plus,因為你有所有這些關係概念,但嵌入在一個完整的編程語言中。我認為這是一種非常有效的方式來編寫數據管道。

Delta Lake如何管理Spark的新版本?是的,Delta Lake需要Spark 2.4.3,這是一個最近發布的版本。這是因為在Spark的早期版本中,實際上存在一些錯誤,這些錯誤阻止了數據源正確地插入其中。總的來說,我們致力於Spark兼容性。這實際上是我們這個季度的核心項目之一,就是確保Delta中的所有東西都能插入Spark的良好公共穩定api,這樣我們就可以在未來使用多個版本。

還有一個問題,Delta Lake支持ORC嗎?是的,這是一個非常好的問題,我經常被問到。同樣,在GitHub上有關於添加支持的討論。如果這對你來說很重要,我鼓勵你去查一下,並就這個問題投票。這個問題有兩個答案。一個是Delta Lake事務協議。事務日誌中實際存在的內容實際上支持指定所存儲數據的格式。它可以用於任何不同的文件格式,txt, JSON, CSV。這已經是協議的一部分了。今天,我們不揭露這是一種選擇。 When you’re creating a Delta table, we only do parquet. And the reason for that is pretty simple. I just think less tuning knobs is generally better, but for something like ORC, if there’s a good reason why your organization can switch, I think that support would be really, really easy to add and that’s something that we’re discussing in the community. So please go over to GitHub, find that issue, and fill it in. And then I’m going to take one final question since we’re getting close to time. And the question here is, what is the difference between the Delta Lake that’s included with Databricks versus the open source version? And that’s a question I get a lot. And I think, the way to think about this is I’d like to kind of talk about what my philosophy is behind open source. And that is that I think APIs in general need to be open. So any program you can run correctly inside of Databricks should also work in open source. Now that’s not entirely true today because Delta Lake is only, the open source version of Delta Lake is only two months old. And so what we’re doing is we are working hard to open source all of the different APIs that exist. So update, delete, merge, history, all of those kinds of things that you can do inside of Databricks will also be available in the open source version. Managed Delta Lake is the version that we provide. It’s gonna be easier to set up. It’s gonna integrate with all of the other pieces of Databricks. So we do caching, we have a kind of significantly faster version of Spark, and so that runs much faster, but in terms of capabilities, our goal is for there to be kind of complete feature parity here ’cause we’re kinda committed to making this open source project successful. I think open APIs is the correct way to do that. So with that, I think we’ll end it. Thank you very much for joining me today. And please check out the website, join the mailing list…

高級:潛入三角洲湖

深入了解Delta Lake的內部結構,這是一種流行的開源技術,在您的數據湖之上支持ACID事務、時間旅行、模式強製等。

立即觀看

Baidu
map