我使用腳本疾控中心合並引發流。我希望通過在selectExpr列值通過一個參數作為每個表的列的名稱將會改變。當我通過列和通過一個字符串變量結構體字段,我得到錯誤= = >不匹配輸入”、“期待
下麵是我在參數化的代碼段。
var filteredMicroBatchDF = microBatchOutputDF .selectExpr (“col1”、“col2”,“結構(抵消KAFKA_TS) otherCols”) .groupBy (“col1”、“col2”) .agg (max (“otherCols”)。as(最新)).selectExpr (“col1”、“col2”,“最新*”)。
引用腳本仿真:
https://docs.www.eheci.com/_static/notebooks/merge-in-cdc.html
我一直喜歡下麵通過列名在一個變量中,然後閱讀selectExpr從這些變量:
val keyCols = " col1”、“col2”
KAFKA_TS val structCols = "結構(抵消)otherCols”
var filteredMicroBatchDF = microBatchOutputDF。selectExpr (keyCols structCols) .groupBy (keyCols) .agg (max (“otherCols”)。as(最新)).selectExpr (keyCols,最新“。*”)
當我運行腳本我錯誤
org.apache.spark.sql.streaming.StreamingQueryException:
不匹配的輸入”、“期待< < EOF > >
嗨@shyamspr,
是的,我試過這樣的工作但我所希望的方式是通過內部的列名Seq通過閱讀從一個部件或一個參數文件,當我得到錯誤。
我上麵的帖子更新stackoverflow與代碼我嚐試和錯誤。將不勝感激如果你可以看一看,並建議如果你有任何的想法來解決這個問題。
謝謝你!