指定初始狀態mapGroupsWithState在結構化流

您可以指定一個用戶定義的初始狀態為結構化流狀態處理使用flatMapGroupsWithStatemapGroupsWithState。這可以讓你避免再處理數據時開始有狀態流沒有一個有效的檢查點。

defmapGroupsWithState(年代:編碼器,U:編碼器)(timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函數:(K,迭代器(V),GroupState(年代])= >U):數據集(U]defflatMapGroupsWithState(年代:編碼器,U:編碼器)(outputMode:OutputMode,timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函數:(K,迭代器(V),GroupState(年代])= >迭代器(U])

指定一個初始狀態的示例用例flatMapGroupsWithState接線員:

瓦爾fruitCountFunc=(關鍵:字符串,:迭代器(字符串),狀態:GroupState(RunningCount])= >{瓦爾=狀態getOption地圖(_)。getOrElse(0 l)+valList大小狀態更新(RunningCount())迭代器((關鍵,toString))}瓦爾fruitCountInitialDS:數據集((字符串,RunningCount)]=Seq((“蘋果”,RunningCount(1)),(“橙色”,RunningCount(2)),(“芒果”,RunningCount(5)),)。托托()瓦爾fruitCountInitial=initialStategroupByKey(x= >x_1)。mapValues(__2)fruitStreamgroupByKey(x= >x)flatMapGroupsWithState(更新,GroupStateTimeoutNoTimeout,fruitCountInitial)(fruitCountFunc)

指定一個初始狀態的示例用例mapGroupsWithState接線員:

瓦爾fruitCountFunc=(關鍵:字符串,:迭代器(字符串),狀態:GroupState(RunningCount])= >{瓦爾=狀態getOption地圖(_)。getOrElse(0 l)+valList大小狀態更新(RunningCount())(關鍵,toString)}瓦爾fruitCountInitialDS:數據集((字符串,RunningCount)]=Seq((“蘋果”,RunningCount(1)),(“橙色”,RunningCount(2)),(“芒果”,RunningCount(5)),)。托托()瓦爾fruitCountInitial=initialStategroupByKey(x= >x_1)。mapValues(__2)fruitStreamgroupByKey(x= >x)mapGroupsWithState(GroupStateTimeoutNoTimeout,fruitCountInitial)(fruitCountFunc)