訂閱穀歌Pub / Sub

磚連接器提供了一個內置的訂閱穀歌在磚Pub / Sub運行時13.1及以上。這個連接器提供了記錄的用戶隻有一次處理語義。

請注意

Pub / Sub可能發布重複的記錄,可能到達用戶的訂單記錄。你應該寫磚代碼來處理的重複和無序的記錄。

語法的例子

下麵的代碼示例演示了配置的基本語法結構流讀取Pub / Sub:

瓦爾authOptions:地圖(字符串,字符串]=地圖(“clientId”- >clientId,“clientEmail”- >clientEmail,“privateKey”- >privateKey,“privateKeyId”- >privateKeyId)查詢=火花readStream格式(“pubsub”)/ /如果沒有,我們將創建一個Pubsub訂閱id選項(“subscriptionId”,“mysub”)/ /需要選項(“topicId”,“mytopic”)/ /需要選項(“projectId”,“。”)/ /需要選項(authOptions)/ /需要這些選項或服務帳戶負載()

更多的配置選項,見為Pub / Sub流讀取配置選項

配置訪問Pub / Sub

磚時建議使用秘密提供授權選項。以下選項需要授權連接:

  • clientEmail

  • clientId

  • privateKey

  • privateKeyId

下表描述了角色配置所需的憑證:

角色

必需的或可選的

如何使用它

角色/ pubsub.viewer角色/查看器

要求

檢查是否存在訂閱,訂閱

角色/ pubsub.subscriber

要求

獲取的數據訂閱

角色/ pubsub.editor角色/編輯器

可選

允許創建訂閱如果不存在,也可以使用deleteSubscriptionOnStreamStop刪除訂閱流終止

發布/訂閱模式

流的模式匹配從Pub / Sub獲取記錄,如下表所述:

類型

消息id

StringType

有效載荷

ArrayType [ByteType]

屬性

StringType

publishTimestampInMillis

LongType

為Pub / Sub流讀取配置選項

下表描述了支持發布/訂閱的選項。所有選項配置作為結構的一部分流閱讀使用.option (“< optionName >”,“<用optionValue >”)語法。

請注意

一些Pub / Sub配置選項使用的概念獲取而不是micro-batches。這反映了內部實現細節和選擇工作類似於推論其他結構化流連接器,除了記錄獲取,然後處理。

選項

默認值

描述

numFetchPartitions

初始化設置的執行人流

並行火花任務獲取記錄的數量從一個訂閱。

deleteSubscriptionOnStreamStop

如果真正的,訂閱傳遞到流流工作結束的時候被刪除。

maxBytesPerTrigger

沒有一個

批量大小的軟限製在每個處理micro-batch觸發。

maxRecordsPerFetch

1000年

在處理之前記錄的數量來獲取每個任務記錄。

maxFetchPeriod

10秒

每個任務的時間獲取之前處理記錄。磚建議使用默認值。

增量為Pub / Sub批處理語義

您可以使用Trigger.AvailableNow消費記錄可用Pub / Sub一批增量來源。

磚記錄時間戳的,當你開始閱讀的Trigger.AvailableNow設置。記錄由批處理包括所有之前獲取數據和任何新出版的記錄一個時間戳不到記錄流開始時間戳。

看到配置增量的批處理

監測流指標

結構化流進展量度報告獲取的記錄數量和準備過程,記錄的大小獲取並準備過程中,流開始以來和副本的數量。下麵是一個例子,這些指標:

“指標”:{“numDuplicatesSinceStreamStart”:“1”,“numRecordsReadyToProcess”:“1”,“sizeOfRecordsReadyToProcess”:“8”}

限製

投機執行(spark.speculation)不支持發布/訂閱。