取消
顯示的結果
而不是尋找
你的意思是:

如何通過列名selectExpr通過一個或多個字符串參數使用scala火花?

SwapanSwapandee
新的貢獻者二世

我使用腳本疾控中心合並引發流。我希望通過在selectExpr列值通過一個參數作為每個表的列的名稱將會改變。當我通過列和通過一個字符串變量結構體字段,我得到錯誤= = >不匹配輸入”、“期待

下麵是我在參數化的代碼段。

var filteredMicroBatchDF = microBatchOutputDF .selectExpr (“col1”、“col2”,“結構(抵消KAFKA_TS) otherCols”) .groupBy (“col1”、“col2”) .agg (max (“otherCols”)。as(最新)).selectExpr (“col1”、“col2”,“最新*”)。

引用腳本仿真:

https://docs.www.eheci.com/_static/notebooks/merge-in-cdc.html

我一直喜歡下麵通過列名在一個變量中,然後閱讀selectExpr從這些變量:

val keyCols = " col1”、“col2”

KAFKA_TS val structCols = "結構(抵消)otherCols”

var filteredMicroBatchDF = microBatchOutputDF。selectExpr (keyCols structCols) .groupBy (keyCols) .agg (max (“otherCols”)。as(最新)).selectExpr (keyCols,最新“。*”)

當我運行腳本我錯誤

org.apache.spark.sql.streaming.StreamingQueryException:
不匹配的輸入”、“期待< < EOF > >

2回答2

shyam_9
價值貢獻
價值貢獻

嗨@Swapan Swapandeep Marwaha,

你能通過他們作為Seq在下麵代碼中,

keyCols = Seq (“col1”、“col2”), structCols = Seq(“結構(偏移量,KAFKA_TS)作為otherCols”)

SwapanSwapandee
新的貢獻者二世

嗨@shyamspr,

是的,我試過這樣的工作但我所希望的方式是通過內部的列名Seq通過閱讀從一個部件或一個參數文件,當我得到錯誤。

https://stackoverflow.com/questions/58576398/how-to-pass-column-names-in-selectexpr-through-one-or-m..。

我上麵的帖子更新stackoverflow與代碼我嚐試和錯誤。將不勝感激如果你可以看一看,並建議如果你有任何的想法來解決這個問題。

謝謝你!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map