read_kafka表值函數

適用於:檢查標記是的磚的SQL檢查標記是的磚運行時13.1及以後

讀取數據從一個Apache卡夫卡集群並返回數據以表格形式。

可以讀取數據從一個或多個卡夫卡的話題。它同時支持批量查詢和流攝入。

請注意

流隻能用於三角洲生活表。

語法

read_kafka([option_key= >option_value](,])

參數

  • option_key:的名稱選項配置。你必須使用引號(“)選項包含點()。

  • option_value:一個常數表達式設置選項。接受文字和標量函數。

返回

記錄從一個Apache卡夫卡集群使用以下模式:

  • 關鍵二進製:卡夫卡記錄的關鍵。

  • 價值二進製:卡夫卡的值記錄。

  • 主題字符串:卡夫卡主題的名稱記錄被讀取。

  • 分區INT:卡夫卡分區讀取記錄的ID。

  • 抵消長整型數字:抵消卡夫卡的記錄的數量TopicPartition

  • 時間戳時間戳:記錄時間戳值。的timestampType列定義了這個時間戳對應。

  • timestampType整數:時間戳中指定的類型時間戳列。

  • 數組< STRUCT <關鍵:字符串,值:二元> >:頭值作為記錄的一部分提供(如果啟用)。

例子

——一個批處理查詢閱讀從一個主題。>選擇價值:從read_kafka字符串值(bootstrapServers = > kafka_server: 9092,訂閱= > '事件')限製10;——一個更高級的查詢與卡夫卡的安全憑據。> SELECT * FROM read_kafka (bootstrapServers = > kafka_server: 9092,訂閱= > '事件',startingOffsets = >“最早”、“kafka.security。' = > ' SASL_SSL協議”、“kafka.sasl。' = > '平原機製”、“kafka.sasl.jaas。配置' = > ' kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名= " {USER_NAME}”密碼={密碼}“;”);——流攝入從卡夫卡JSON解析。> catalog.schema創建或刷新流表。raw_events選擇價值::字符串:事件——提取字段“事件”to_timestamp(價值::字符串:ts) ts -提取字段“t”,從流read_kafka時間戳(bootstrapServers = > kafka_server: 9092,訂閱= > '事件');

選項

你可以找到詳細的選項列表Apache火花文檔

需要選擇

為連接到您的卡夫卡集群提供下麵的選項。

選項

bootstrapServers

類型:字符串

一個以逗號分隔的主機/端口對指向卡夫卡集群。

默認值:無

提供以下選項中隻有一個配置,將數據從卡夫卡主題。

選項

分配

類型:字符串

一個JSON字符串,其中包含的具體topic-partitions消費。例如,對於“{”局部藥”:[0,1],“topicB”: (2、4)}”,局部藥是0號和1號分區將消耗。

默認值:無

訂閱

類型:字符串

卡夫卡主題閱讀的逗號分隔列表。

默認值:無

subscribePattern

類型:字符串

一個正則表達式匹配主題訂閱。

默認值:無

雜項選項

read_kafka可用於批量查詢以及在線查詢。下麵的選項指定它們適用於哪種類型的查詢。

選項

endingOffsets

類型:字符串查詢類型:批處理

補償閱讀,直到一批查詢“最新”指定最新的記錄,或一個JSON字符串指定為每個TopicPartition終結抵消。在JSON,1作為一個偏移量可以用來參考最新。2(早期)作為一個偏移量是不允許的。

默認值:“最新”

endingOffsetsByTimestamp

類型:字符串查詢類型:批處理

一個JSON字符串指定一個時間戳為每個TopicPartition讀,直到結束。需要提供的時間戳作為長時間戳值以毫秒為單位1970-01-01就是UTC例如,1686444353000。看到請注意下麵行為的細節和時間戳。endingOffsetsByTimestamp優先於endingOffsets

默認值:無

endingTimestamp

類型:字符串查詢類型:批處理

一個字符串值的時間戳以毫秒為單位1970-01-01就是UTC例如,“1686444353000”。如果卡夫卡不返回匹配的偏移,偏移量將被設置為最新。看到請注意下麵行為的細節和時間戳。注意:endingTimestamp優先於endingOffsetsByTimestampendingOffsets

默認值:無

includeHeaders

類型:布爾查詢類型:流媒體和批處理

是否包括卡夫卡標題行。

默認值:

卡夫卡。< consumer_option >

類型:字符串查詢類型:流媒體和批處理

任何卡夫卡的消費者可以通過特定的選項卡夫卡。前綴。這些選項時需要引號包圍,否則你會得到一個解析器錯誤。你可以找到在卡夫卡的選項文檔

注意:你不應該用這個函數設置以下選項:key.deserializer,value.deserializer,bootstrap.servers,group.id

默認值:無

maxOffsetsPerTrigger

類型:查詢類型:流媒體

速度限製的最大數量補償或行處理每觸發間隔。指定的偏移量的總數將在TopicPartitions比例分割。

默認值:無

startingOffsets

類型:字符串查詢類型:流媒體和批處理

查詢時的起點開始,“最早”從最早的偏移量,“最新”也就是從最新的補償,或為每個TopicPartition JSON字符串指定的起始偏移量。在JSON,2作為一個抵消最早可以用來參考,1最新的。

注意:對於批處理查詢,最新(隱式或通過使用1以JSON)是不允許的。對於流媒體查詢,這隻適用於當啟動一個新的查詢。重啟流查詢將繼續從查詢中定義的補償檢查站。新發現的分區在查詢將從最早開始。

默認值:“最新”流,“最早”對批處理

startingOffsetsByTimestamp

類型:字符串查詢類型:流媒體和批處理

一個JSON字符串指定為每個TopicPartition開始時間戳。需要提供的時間戳作為長時間戳值以毫秒為單位1970-01-01就是UTC例如,1686444353000。看到請注意下麵行為的細節和時間戳。如果卡夫卡不返回匹配的抵消,這種行為將遵循的價值選擇startingOffsetsByTimestampStrategystartingOffsetsByTimestamp優先於startingOffsets

注意:對於流媒體查詢,這隻適用於當啟動一個新的查詢。重啟流查詢將繼續從查詢中定義的補償檢查站。新發現的分區在查詢將從最早開始。

默認值:無

startingOffsetsByTimestampStrategy

類型:字符串查詢類型:流媒體和批處理

抵消這一策略時使用指定的開始時間戳(全球或每個分區)和返回的抵消卡夫卡不匹配。可用的策略是:

  • “錯誤”:失敗的查詢

    • “最新”:分配的最新抵消這些分區火花可以閱讀從這些分區後micro-batches更新記錄。

默認值:“錯誤”

startingTimestamp

類型:字符串查詢類型:流媒體和批處理

一個字符串值的時間戳以毫秒為單位1970-01-01就是UTC例如,“1686444353000”。看到請注意下麵行為的細節和時間戳。如果卡夫卡不返回匹配的抵消,這種行為將遵循的價值選擇startingOffsetsByTimestampStrategystartingTimestamp優先於startingOffsetsByTimestampstartingOffsets

注意:對於流媒體查詢,這隻適用於當啟動一個新的查詢。重啟流查詢將繼續從查詢中定義的補償檢查站。新發現的分區在查詢將從最早開始。

默認值:無

請注意

返回的偏移量為每個分區是最早的抵消的時間戳大於或等於給定的時間戳在相應的分區。在不同行為選擇之間如果卡夫卡不返回匹配的抵消,檢查每個選項的描述。

火花隻是通過時間戳信息KafkaConsumer.offsetsForTimes,原因不解釋或價值。為更多的細節KafkaConsumer.offsetsForTimes,請參考文檔)。此外,時間戳的意義在這裏可以根據卡夫卡不同配置(log.message.timestamp.type)。有關詳細信息,請參見Apache卡夫卡文檔