第一個結構化流負載運行

本文提供了代碼示例所需的基本概念和解釋數據磚上運行第一個結構化流查詢。您可以使用結構化流進行實時和增量處理工作負載。

結構化流是幾種技術之一,功率流表在三角洲住表。磚建議使用三角洲生活表所有新ETL、攝取和結構化流工作負載。看到δ生活是什麼表?

請注意

而三角洲生活表提供了一個稍微修改語法聲明流表、配置流讀取和轉換的一般語法適用於所有用例流磚。三角洲生活表也簡化了流媒體管理狀態信息,元數據,和大量的配置。

讀取數據流

您可以使用結構化的不斷攝取來自支持數據源的數據流。一些最常見的數據源中使用磚結構的流負載包括以下:

  • 在雲對象存儲數據文件

  • 消息總線和隊列

  • 三角洲湖

磚建議使用自動加載程序從雲對象存儲流媒體攝入。自動加載程序支持大多數文件格式支持結構化流。看到自動加載器是什麼?

每個數據源提供了許多選項來指定如何加載批次的數據。在讀者的配置,主要選項可能需要設置分為以下幾類:

  • 選項指定數據源或格式(例如,文件類型、分隔符和模式)。

  • 選項配置訪問源係統(例如,端口設置和憑證)。

  • 選項指定從哪裏開始在流(例如,卡夫卡補償或閱讀所有現有文件)。

  • 選項控製多少數據在每一批處理(例如,馬克斯補償、文件或字節每批)。

使用自動加載器讀取流數據對象存儲

下麵的例子演示了與自動加載程序加載JSON數據,它使用cloudFiles表示格式和選項。的schemaLocation選項允許模式推理和演化。粘貼以下代碼在磚的筆記本電池和運行單元創建一個流DataFrame命名raw_df:

file_path=“/ databricks-datasets / structured-streaming /事件”checkpoint_path=“/ tmp / ss-tutorial / _checkpoint”raw_df=(火花readStream格式(“cloudFiles”)選項(“cloudFiles.format”,“json”)選項(“cloudFiles.schemaLocation”,checkpoint_path)負載(file_path))

像其他磚上讀取操作,配置一個流讀取並不實際負荷數據。你必須觸發一個動作開始前的數據流。

請注意

調用顯示()在流媒體DataFrame開始流工作。對於大多數結構化流用例,觸發的動作應該寫入數據流水槽。看到為生產準備的結構化流代碼

執行流轉換

結構化流支持大多數轉換可用在磚和火花SQL。你甚至可以負載MLflow模型作為udf和做出流預測轉換。

以下代碼示例完成一個簡單的轉換來豐富攝取JSON數據附加信息使用火花SQL函數:

pyspark.sql.functions進口input_file_name,current_timestamptransformed_df=(raw_df選擇(“*”,input_file_name()別名(“source_file”),current_timestamp()別名(“processing_time”)))

由此產生的transformed_df包含查詢指令加載和轉換每個記錄到達數據源。

請注意

結構化流將數據源視為無限或無限的數據集。因此,一些結構化流轉換不支持工作負載,因為他們需要排序無限條目的數量。

大多數聚合和許多連接需要管理狀態與水印信息,窗戶,和輸出模式。看到水印應用於控製數據處理的閾值

寫數據接收器

一個數據接收器流寫操作的目標。常用沉在磚流負載包括以下:

  • 三角洲湖

  • 消息總線和隊列

  • 鍵-值數據庫

與數據源,大多數數據彙提供許多選項來控製數據寫入到目標係統。在作家的配置,主要選項可能需要設置分為以下幾類:

  • 默認輸出模式(附加)。

  • (要求每一個檢查點位置作家)。

  • 觸發間隔;看到配置結構化流觸發間隔

  • 選項指定數據接收器或格式(例如,文件類型、分隔符和模式)。

  • 選項配置訪問目標係統(例如,端口設置和憑證)。

執行增量批寫三角洲湖

下麵的例子寫到三角洲湖使用一個指定的文件路徑和檢查站。

重要的

總是確保你指定一個獨一無二的檢查點位置為每個流作家配置。檢查站提供獨特的身份為你流,跟蹤所有記錄和狀態信息與流查詢處理。

availableNow設置觸發指示結構化流處理所有先前未處理記錄從源數據集,然後關閉,所以您可以安全地執行下麵的代碼,而不用擔心離開流運行:

target_path=“/ tmp / ss-tutorial /”checkpoint_path=“/ tmp / ss-tutorial / _checkpoint”transformed_dfwriteStream觸發(availableNow=真正的)選項(“checkpointLocation”,checkpoint_path)選項(“路徑”,target_path)開始()

在這個例子中,沒有新記錄到我們的數據來源,所以重複執行這段代碼不攝取新記錄。

警告

結構化流執行可以阻止汽車終止關閉計算資源。為了避免意想不到的成本,一定要終止流查詢。

為生產準備的結構化流代碼

磚建議使用三角洲生活表對於大多數結構化流工作負載。下麵的建議提供了一個起點準備結構化流生產工作負載:

  • 刪除不必要的代碼從返回結果的筆記本,等顯示

  • 不要運行結構化流互動集群工作負載;總是安排流工作。

  • 幫助流自動恢複工作,與無限重試配置工作。

  • 不使用自動伸縮與結構化流工作負載。

更多的建議,看看生產注意事項結構化流

讀取數據從三角洲湖、轉換和寫入三角洲湖

三角洲湖擁有廣泛的支持與結構化流作為一個源和一個水槽。看到表流讀取和寫入

下麵的例子展示了示例語法增量加載所有新記錄從三角洲表,與另一個三角洲表的快照加入他們,和寫一個增量表:

(火花readStream(“< table-name1 >”)加入(火花(“< table-name2 >”),=“< id >”,如何=“左”)writeStream觸發(availableNow=真正的)選項(“checkpointLocation”,“< checkpoint-path >”)toTable(“< table-name3 >”))

你必須有適當的權限配置為讀取源表和寫入目標表和指定檢查點位置。填寫所有參數用尖括號(< >)使用相關的數據源和下沉值。

請注意

三角洲生活表提供了一個完全聲明性語法創建三角洲湖管道和自動管理屬性觸發器和檢查點。看到δ生活是什麼表?

讀取數據從卡夫卡、轉換和寫信給卡夫卡

Apache卡夫卡和其他消息傳遞總線提供了一些最低的延遲可用於大型數據集。您可以使用磚應用轉換的數據從卡夫卡攝取,然後回到卡夫卡寫入數據。

請注意

寫數據到雲對象存儲增加了額外的延遲開銷。如果您希望存儲數據從一個消息總線三角洲湖但要求流負載最低的延遲,磚建議配置單獨的流媒體工作向lakehouse攝取數據和應用實時轉換為下遊消息總線下沉。

下麵的代碼示例演示了一個簡單的模式從卡夫卡通過加入豐富數據增量表中的數據,然後寫回到卡夫卡:

(火花readStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,“<服務器:ip >”)選項(“訂閱”,“<主題>”)選項(“startingOffsets”,“最新”)負載()加入(火花(“<表名稱>”),=“< id >”,如何=“左”)writeStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,“<服務器:ip >”)選項(“主題”,“<主題>”)選項(“checkpointLocation”,“< checkpoint-path >”)開始())

你必須有適當的權限配置為訪問卡夫卡服務。填寫所有參數用尖括號(< >)使用相關的數據源和下沉值。看到與Apache卡夫卡和磚流處理