read_kafka
表值函數
適用於:磚的SQL磚運行時13.1及以後
讀取數據從一個Apache卡夫卡集群並返回數據以表格形式。
可以讀取數據從一個或多個卡夫卡的話題。它同時支持批量查詢和流攝入。
參數
option_key
:的名稱選項配置。你必須使用引號(“)選項包含點(。
)。option_value
:一個常數表達式設置選項。接受文字和標量函數。
返回
記錄從一個Apache卡夫卡集群使用以下模式:
關鍵二進製
:卡夫卡記錄的關鍵。價值二進製不零
:卡夫卡的值記錄。主題字符串不零
:卡夫卡主題的名稱記錄被讀取。分區INT不零
:卡夫卡分區讀取記錄的ID。抵消長整型數字不零
:抵消卡夫卡的記錄的數量TopicPartition
。時間戳時間戳不零
:記錄時間戳值。的timestampType
列定義了這個時間戳對應。timestampType整數不零
:時間戳中指定的類型時間戳
列。頭數組< STRUCT <關鍵:字符串,值:二元> >
:頭值作為記錄的一部分提供(如果啟用)。
例子
——一個批處理查詢閱讀從一個主題。>選擇價值:從read_kafka字符串值(bootstrapServers = > kafka_server: 9092,訂閱= > '事件')限製10;——一個更高級的查詢與卡夫卡的安全憑據。> SELECT * FROM read_kafka (bootstrapServers = > kafka_server: 9092,訂閱= > '事件',startingOffsets = >“最早”、“kafka.security。' = > ' SASL_SSL協議”、“kafka.sasl。' = > '平原機製”、“kafka.sasl.jaas。配置' = > ' kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名= " {USER_NAME}”密碼={密碼}“;”);——流攝入從卡夫卡JSON解析。> catalog.schema創建或刷新流表。raw_events選擇價值::字符串:事件——提取字段“事件”to_timestamp(價值::字符串:ts) ts -提取字段“t”,從流read_kafka時間戳(bootstrapServers = > kafka_server: 9092,訂閱= > '事件');
選項
你可以找到詳細的選項列表Apache火花文檔。
需要選擇
為連接到您的卡夫卡集群提供下麵的選項。
選項 |
---|
bootstrapServers 類型: 一個以逗號分隔的主機/端口對指向卡夫卡集群。 默認值:無 |
提供以下選項中隻有一個配置,將數據從卡夫卡主題。
選項 |
---|
分配 類型: 一個JSON字符串,其中包含的具體topic-partitions消費。例如,對於 默認值:無 |
訂閱 類型: 卡夫卡主題閱讀的逗號分隔列表。 默認值:無 |
subscribePattern 類型: 一個正則表達式匹配主題訂閱。 默認值:無 |
雜項選項
read_kafka
可用於批量查詢以及在線查詢。下麵的選項指定它們適用於哪種類型的查詢。
選項 |
---|
endingOffsets 類型: 補償閱讀,直到一批查詢 默認值: |
endingOffsetsByTimestamp 類型: 一個JSON字符串指定一個時間戳為每個TopicPartition讀,直到結束。需要提供的時間戳作為長時間戳值以毫秒為單位 默認值:無 |
endingTimestamp 類型: 一個字符串值的時間戳以毫秒為單位 默認值:無 |
includeHeaders 類型: 是否包括卡夫卡標題行。 默認值: |
類型: 任何卡夫卡的消費者可以通過特定的選項 注意:你不應該用這個函數設置以下選項: 默認值:無 |
maxOffsetsPerTrigger 類型: 速度限製的最大數量補償或行處理每觸發間隔。指定的偏移量的總數將在TopicPartitions比例分割。 默認值:無 |
startingOffsets 類型: 查詢時的起點開始, 注意:對於批處理查詢,最新(隱式或通過使用1以JSON)是不允許的。對於流媒體查詢,這隻適用於當啟動一個新的查詢。重啟流查詢將繼續從查詢中定義的補償檢查站。新發現的分區在查詢將從最早開始。 默認值: |
startingOffsetsByTimestamp 類型: 一個JSON字符串指定為每個TopicPartition開始時間戳。需要提供的時間戳作為長時間戳值以毫秒為單位 注意:對於流媒體查詢,這隻適用於當啟動一個新的查詢。重啟流查詢將繼續從查詢中定義的補償檢查站。新發現的分區在查詢將從最早開始。 默認值:無 |
startingOffsetsByTimestampStrategy 類型: 抵消這一策略時使用指定的開始時間戳(全球或每個分區)和返回的抵消卡夫卡不匹配。可用的策略是:
默認值: |
startingTimestamp 類型: 一個字符串值的時間戳以毫秒為單位 注意:對於流媒體查詢,這隻適用於當啟動一個新的查詢。重啟流查詢將繼續從查詢中定義的補償檢查站。新發現的分區在查詢將從最早開始。 默認值:無 |
請注意
返回的偏移量為每個分區是最早的抵消的時間戳大於或等於給定的時間戳在相應的分區。在不同行為選擇之間如果卡夫卡不返回匹配的抵消,檢查每個選項的描述。
火花隻是通過時間戳信息KafkaConsumer.offsetsForTimes
,原因不解釋或價值。為更多的細節KafkaConsumer.offsetsForTimes
,請參考文檔)。此外,時間戳的意義在這裏可以根據卡夫卡不同配置(log.message.timestamp.type
)。有關詳細信息,請參見Apache卡夫卡文檔。